|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import Callable |
|
|
|
|
|
|
|
|
NTs = 'ACGT' |
|
|
|
|
|
AAs = 'ACDEFGHIKLMNPQRSTVWY' |
|
|
|
|
|
AA_TO_CODON = { |
|
|
'*': ['TAA', 'TAG', 'TGA'], |
|
|
'A': ['GCT', 'GCC', 'GCA', 'GCG'], |
|
|
'C': ['TGT', 'TGC'], |
|
|
'D': ['GAT', 'GAC'], |
|
|
'E': ['GAA', 'GAG'], |
|
|
'F': ['TTT', 'TTC'], |
|
|
'G': ['GGU', 'GGC', 'GGA', 'GGG'], |
|
|
'H': ['CAT', 'CAC'], |
|
|
'I': ['ATT', 'ATC', 'ATA'], |
|
|
'K': ['AAA', 'AAG'], |
|
|
'L': ['TTA', 'TTG', 'CTT', 'CTC', 'CTA', 'CTG'], |
|
|
'M': ['ATG'], |
|
|
'N': ['AAT', 'AAC'], |
|
|
'P': ['CCT', 'CCC', 'CCA', 'CCG'], |
|
|
'Q': ['CAA', 'CAG'], |
|
|
'R': ['CGT', 'CGC', 'CGA', 'CGG', 'AGA', 'AGG'], |
|
|
'S': ['TCT', 'TCC', 'TCA', 'TCG', 'AGT', 'AGC'], |
|
|
'T': ['ACT', 'ACC', 'ACA', 'ACG'], |
|
|
'V': ['GTT', 'GTC', 'GTA', 'GTG'], |
|
|
'W': ['TGG'], |
|
|
'Y': ['TAT', 'TAC'], |
|
|
} |
|
|
|
|
|
CODON_TO_AA = { |
|
|
codon: aa |
|
|
for aa, codon_list in AA_TO_CODON.items() |
|
|
for codon in codon_list |
|
|
} |
|
|
|
|
|
AA_3_TO_1 = { |
|
|
"Ala": "A", |
|
|
"Arg": "R", |
|
|
"Asn": "N", |
|
|
"Asp": "D", |
|
|
"Cys": "C", |
|
|
"Gln": "Q", |
|
|
"Glu": "E", |
|
|
"Gly": "G", |
|
|
"His": "H", |
|
|
"Ile": "I", |
|
|
"Leu": "L", |
|
|
"Lys": "K", |
|
|
"Met": "M", |
|
|
"Phe": "F", |
|
|
"Pro": "P", |
|
|
"Ser": "S", |
|
|
"Thr": "T", |
|
|
"Trp": "W", |
|
|
"Tyr": "Y", |
|
|
"Val": "V" |
|
|
} |
|
|
|
|
|
|
|
|
def nucleotide_deep_mutational_scan(sequence: str, ignore_wt: bool = True): |
|
|
for idx, wt in enumerate(sequence): |
|
|
for mt in NTs: |
|
|
if ignore_wt and wt == mt: |
|
|
continue |
|
|
yield (wt, mt, idx) |
|
|
|
|
|
|
|
|
def parse_blast_output(output_path: str) -> pd.DataFrame: |
|
|
""" |
|
|
Parses standard blast output with `-outfmt 6`. |
|
|
""" |
|
|
|
|
|
blast_table_header = [ |
|
|
'qacc', 'sacc', 'pident', 'length', 'mismatch', 'gapopen', 'qstart', |
|
|
'qend', 'sstart', 'send', 'evalue', |
|
|
] |
|
|
|
|
|
data = [] |
|
|
with open(output_path, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith("#"): |
|
|
continue |
|
|
if line.strip() == '': |
|
|
continue |
|
|
line = line.strip().split() |
|
|
data.append(dict(zip(blast_table_header, line))) |
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
if len(df) == 0: |
|
|
return df |
|
|
df['evalue'] = df['evalue'].astype(float) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def parse_erpin_output(output_path: str, name: str) -> pd.DataFrame: |
|
|
""" |
|
|
Parses ERPIN output. For an example, see `eval/data/example_rho_output.txt`. |
|
|
""" |
|
|
|
|
|
output_fields = [ 'strand', 'index', 'interval', 'score', 'evalue' ] |
|
|
|
|
|
data = [] |
|
|
with open(output_path, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith(f'>{name}'): |
|
|
meta = dict(zip(output_fields, f.readline().rstrip().split())) |
|
|
sequence = f.readline().rstrip() |
|
|
start, end = meta['interval'].split('..') |
|
|
data.append([ |
|
|
f"{name}_{meta['index']}_{meta['strand']}", |
|
|
sequence, |
|
|
int(start), |
|
|
int(end), |
|
|
'+' if meta['strand'] == 'FW' else '-', |
|
|
meta['score'], |
|
|
float(meta['evalue']), |
|
|
]) |
|
|
|
|
|
return pd.DataFrame( |
|
|
data, |
|
|
columns=[ |
|
|
'id', |
|
|
'seq', |
|
|
'start', |
|
|
'end', |
|
|
'strand', |
|
|
'score', |
|
|
'evalue', |
|
|
], |
|
|
) |
|
|
|
|
|
|
|
|
def parse_hmmsearch_output(output_path: str) -> pd.DataFrame: |
|
|
""" |
|
|
Parses standard hmmsearch output. |
|
|
""" |
|
|
|
|
|
hmmsearch_table_header = [ |
|
|
'target', 'target_acc', 'tlen', 'query', 'query_acc', 'qlen', |
|
|
'evalue', 'score', 'bias', 'num', 'of', 'cevalue', 'ievalue', |
|
|
'dscore', 'dbias', 'hmm_from', 'hmm_to', 'ali_from', 'ali_to', |
|
|
'env_from', 'env_to', 'acc', 'desc', |
|
|
] |
|
|
|
|
|
data = [] |
|
|
with open(output_path, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith("#"): |
|
|
continue |
|
|
line = line.strip().split() |
|
|
data.append(dict(zip(hmmsearch_table_header, line))) |
|
|
|
|
|
return pd.DataFrame(data) |
|
|
|
|
|
|
|
|
def permutation_test( |
|
|
score_func: Callable[[np.array, np.array], float], |
|
|
x1: np.array, |
|
|
x2: np.array, |
|
|
n_permutations: int = 100_000, |
|
|
) -> float: |
|
|
""" |
|
|
Returns a permutation-based P value. Computes the null distribution by |
|
|
shuffling the provided data and recomputing the `score_func`. |
|
|
""" |
|
|
if n_permutations < 1: |
|
|
raise ValueError('Number of permutations must be positive.') |
|
|
|
|
|
x1, x2 = np.array(x1), np.array(x2) |
|
|
|
|
|
observed_score = score_func(x1, x2) |
|
|
|
|
|
null_distribution = np.array([ |
|
|
score_func(x1, np.random.permutation(x2)) |
|
|
for _ in range(n_permutations) |
|
|
]) |
|
|
|
|
|
return np.mean(null_distribution >= observed_score) |
|
|
|