diff --git a/src/db_utils/frequency_scripts/config_freq.py b/src/db_utils/frequency_scripts/config_freq.py new file mode 100644 index 0000000000000000000000000000000000000000..4f70c3c4c8feb7663e9ee9b87e91210df02b1fdb --- /dev/null +++ b/src/db_utils/frequency_scripts/config_freq.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: Contain configuration variables used for computing \ +frequencies of exons, and gene +""" + +from typing import List, Dict +from ..config import Config +import pandas as pd +from pathlib import Path + + +def full_defined(sequence: str) -> bool: + """ + Says if all the nucleotide are well defined within the sequence + + :param sequence: (string) nucleotide sequence + :return: (boolean) True if all nucleotide are well defined, False else + """ + return "N" not in sequence + + +def coding_full_defined(sequence: str) -> bool: + """ + Says if sequence is an amino adic sequenc fully defined. + + :param sequence: An amino acid sequence + :return: True if all amino acid are well defined, False else + """ + for aa in sequence: + if aa in ['B', 'X', 'Z', 'J', 'U', 'O']: + return False + return True + + +def compute_nt_list() -> List[str]: + """ + compute the list of nucleotides, di-nucelotides and tri-nucleotides of \ + interest. + + :return: list of nucleotides, di-nucelotides and tri-nucleotides + """ + nt = ["A", "C", "G", "T", "S", "W", "R", "Y"] + nt += [x + y for x in nt[0:4] for y in nt[0:4]] + nt += [x + y + z for x in nt[0:4] for y in nt[0:4] for z in nt[0:4]] + return nt + + +def get_property(property_file: Path) -> Dict[str, Dict[str, float]]: + """ + + :param property_file: a file containing the scale + :return: link each amino_acid \ + to it's score for each property scale + """ + df = pd.read_csv(property_file, sep="\t", index_col=0) + if not df.empty: + dic = df.to_dict() + return dic + else: + raise ValueError(f"The file {property_file} is empty !") + + +class Config_freq: + """A class containing configurations variables used in this subfolder.""" + property_file = Config.data / 'chosen_scales.csv' + exon_orf_bed = Config.data / 'bed' / 'exon_orf.bed' + output = Config.results / 'frequency_table' + output_exon_file = output / 'exon_freq.txt' + output_gene_file = output / 'gene_freq.txt' + iupac_dic = {"S": ["G", "C"], "W": ["A", "T"], "K": ["G", "T"], + "M": ["A", "C"], "R": ["A", "G"], "Y": ["C", "T"]} + nt_list = compute_nt_list() + aa_list = list("RHKDESTNQCGPAVILMFYW") + codon_list = [nt for nt in nt_list if len(nt) == 3] + property_dic = get_property(property_file) + hg19 = Config.data / "Homo_sapiens.GRCh37.dna.primary_assembly.fa" diff --git a/src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py b/src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py new file mode 100644 index 0000000000000000000000000000000000000000..4b6c8d382acd1a39a29079ac6e607f7cab32800d --- /dev/null +++ b/src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: Create +""" + +from Bio import SeqIO +from Bio.Seq import Seq +import logging +import logging.config +from typing import Dict, List, Tuple +from .config_freq import Config_freq, Config +from pathlib import Path +import pandas as pd +from .frequency_function import concat_dic, compute_dic, get_ft_type +from .orf_frequency_functions import mytranslate, get_codon_freq, \ + get_aa_freq, get_mean_propensity_score, get_orf_ft_type +from ...logging_conf import logging_def +from ..populate_database import populate_df +from tqdm import tqdm + + +def get_freq_table(bed_file: Path, dic_seq: Dict[str, Seq]) -> pd.DataFrame: + """ + Create a table with new frequency files. + + :param bed_file: a bed file + :param dic_seq: Dictionary of sequences + """ + logging.info(f"Files {bed_file}") + id_col = f'id_{bed_file.name.replace(".bed", "")}' + dic = {id_col: []} + num_lines = sum(1 for _ in bed_file.open("r")) + with bed_file.open("r") as inbed: + pbar = tqdm(inbed, desc=f'Processing {bed_file.name}', total=num_lines) + for line in pbar: + if "#" not in line[0]: + line = line.replace("\n", "") + sline: List[str] = line.split("\t") + if int(sline[2]) - int(sline[1]) > 2: + coord = [sline[0], sline[1], sline[2], sline[5]] + dic_freq = compute_dic(dic_seq, coord, Config_freq.nt_list) + dic = concat_dic(dic, dic_freq, sline[3], id_col) + df = pd.DataFrame(dic) + df = df.melt(id_vars=[id_col], var_name="ft", value_name='frequency') + df['ft_type'] = df['ft'].apply(get_ft_type) + return df + + +def get_orf_freq_table(bed_orf: Path, dic_seq: Dict[str, Seq]): + """ + Write an orf bed exon file. + + :param bed_orf: a bed file containing exons orf + :param dic_seq:a dictionary containing chromosome sequences. + """ + id_col = 'id_exon' + dic = {id_col: []} + num_lines = sum(1 for _ in bed_orf.open("r")) + with bed_orf.open("r") as infile: + pbar = tqdm(infile, desc=f'Processing {bed_orf.name}', total=num_lines) + for line in pbar: + exon: List = line.replace('\n', '').split('\t') + exon[1] = int(exon[1]) + exon[2] = int(exon[2]) + if (exon[2] - exon[1]) % 3 != 0: + msg = f"The length of {exon[3]} is not a multiple of 3 !" + logging.exception(msg) + raise IndexError(msg) + nt_seq, aa_seq = mytranslate(dic_seq, [exon[0], exon[1], + exon[2], exon[5]]) + if len(aa_seq) > 0 and "*" not in aa_seq: + dic_codon = get_codon_freq(nt_seq, Config_freq.codon_list) + dic_aa = get_aa_freq(aa_seq, Config_freq.aa_list) + dic_prop = get_mean_propensity_score(aa_seq, + Config_freq.property_dic) + dic_freq = dict(dic_codon, **dic_aa, **dic_prop) + dic = concat_dic(dic, dic_freq, exon[3], id_col) + df = pd.DataFrame(dic) + df = df.melt(id_vars=[id_col], var_name="ft", value_name='frequency') + df['ft_type'] = df['ft'].apply(get_orf_ft_type) + return df + + +def load_hg19(hg19: Path) -> Dict[str, Seq]: + """ + Load hg19 genome. + + :param hg19: The human genome + :return: The sequence by chromosome in hg19 + """ + dic = {} + for record in SeqIO.parse(hg19, 'fasta'): + dic[record.id] = record.seq + return dic + + +def create_or_load_freq_table() -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Create frequency tables for gene and exons. + """ + if not Config_freq.output_exon_file.is_file() \ + or not Config_freq.output_gene_file.is_file(): + logging.debug('loading hg19 genome') + dic_seq = load_hg19(Config_freq.hg19) + if not Config_freq.output_exon_file.is_file(): + logging.debug('get frequency table for exon') + df_exon = get_freq_table(Config.bed_exon, dic_seq) + logging.debug(df_exon.head()) + logging.debug('Get orf frequency for exon') + df_orf_exon = get_orf_freq_table(Config_freq.exon_orf_bed, dic_seq) + logging.debug(df_orf_exon.head()) + df_exon = pd.concat([df_exon, df_orf_exon], axis=0, ignore_index=True) + else: + logging.debug('Loading exon_freq file ...') + df_exon = pd.read_csv(Config_freq.output_exon_file, sep="\t") + if not Config_freq.output_gene_file.is_file(): + logging.debug('get frequency table gene') + df_gene = get_freq_table(Config.bed_gene, dic_seq) + logging.debug(df_gene.head()) + else: + df_gene = pd.read_csv(Config_freq.output_gene_file, sep="\t") + return df_exon, df_gene + + +def fill_frequency_tables(logging_level: str = 'DISABLE'): + """ + Fill the frequency tables of exon and gene. + + :param logging_level: The level of information to display + """ + logging_def(Config_freq.output, __file__, logging_level) + df_exon, df_gene = create_or_load_freq_table() + exit(1) + logging.debug('Filling cin_exon_frequency') + populate_df(table='cin_exon_frequency', df=df_exon, clean='y') + logging.debug('Filling cin_gene_frequency') + populate_df(table='cin_gene_frequency', df=df_gene, clean='y') + + +if __name__ == "__main__": + fill_frequency_tables('DEBUG') diff --git a/src/db_utils/frequency_scripts/frequency_function.py b/src/db_utils/frequency_scripts/frequency_function.py new file mode 100644 index 0000000000000000000000000000000000000000..b8b6075f59ac5cfce6f405636ed1c2e3a5b7da52 --- /dev/null +++ b/src/db_utils/frequency_scripts/frequency_function.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: Contains function dedicated to compute the nucleotide \ +frequency of a sequence. +""" + +from .config_freq import Config_freq, full_defined +from typing import Iterable, Dict, List +import logging +from Bio.Seq import Seq +import numpy as np + + +def frequencies(sequence: str, nt_list: Iterable[str]) -> Dict[str, float]: + """ + Compute the frequencies of ``sequence`` for every nt in ``nt_list. + + :param sequence: (str) a nucleotide sequence + :param nt_list: (a list of nt) + :return: (dic) contains the frequence of every nucleotide + """ + if not full_defined(sequence): + return {nt: np.nan for nt in nt_list} + dic = {nt: 0. for nt in nt_list} + seql = len(sequence) + for i, n in enumerate(sequence): + for nt in nt_list: + ntl = len(nt) + if ntl == 1: + if nt in ["S", "W", "K", "M", "R", "Y"]: + if n in Config_freq.iupac_dic[nt]: + dic[nt] += 1 / seql * 100 + else: + if n == nt: + dic[nt] += 1 / seql * 100 + else: + if sequence[i:i + ntl] == nt: + dic[nt] += 1 / (seql - ntl + 1) * 100 + for nt in nt_list: + dic[nt] = round(dic[nt], 5) + return dic + + +def compute_dic(dic_seq: Dict[str, Seq], coord: List[str], + nt_list: Iterable[str]) -> Dict[str, float]: + """ + Get the frequencies of a sequence. + + :param dic_seq: (dictionary of seq object) a genome + :param coord: (tuple of coordinate) + :param nt_list: (str) list of nucleotide of interest + :return: (dic) contains the frequence of every nucleotide + """ + sequence = dic_seq[coord[0]][int(coord[1]): int(coord[2])] + if coord[3] == "-": + sequence = sequence.reverse_complement() + return frequencies(sequence, nt_list) + + +def concat_dic(main_dic: Dict[str, List], sub_dic: Dict[str, float], + region: str, name_col: str) -> Dict[str, List]: + """ + Add the data in sub_dic into main_dic. + + :param main_dic: The dictionary that will contains the + :param sub_dic: The dictionary containing nucleotide frequency of region \ + `region` + :param region: The region with the nucleotide frequencies indicated by \ + sub_dic. + :param name_col: The name of the column where the name of the regions \ + are stored in main_dic + :return: main_dic with the frequency data of region `region` + """ + main_dic[name_col].append(region) + for k in sub_dic.keys(): + if k not in main_dic.keys(): + main_dic[k] = [sub_dic[k]] + else: + main_dic[k].append(sub_dic[k]) + return main_dic + + +def get_ft_type(ft: str) -> str: + """ + From a feature get the feature type of interest. + + :param ft: The name of the feature of interest + :return: The feature type of interest + """ + if len(ft) == 1: + return 'nt' + elif len(ft) == 2: + return 'dnt' + elif len(ft) == 3: + return 'tnt' + else: + msg = f'The len of parameter feature should be 1, 2, 3 not {len(ft)} !' + logging.exception(msg) + raise NameError(msg) diff --git a/src/db_utils/frequency_scripts/orf_frequency_functions.py b/src/db_utils/frequency_scripts/orf_frequency_functions.py new file mode 100644 index 0000000000000000000000000000000000000000..c0dc41c76dae0e2ffb0b0886e0b3a125ea6eb01a --- /dev/null +++ b/src/db_utils/frequency_scripts/orf_frequency_functions.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: This script contains function to handle ORF +""" + + +from typing import List, Dict, Tuple +from Bio.Seq import Seq +from .config_freq import full_defined, coding_full_defined +import numpy as np +import logging + + +def get_codon_freq(nt_seq: str, codon_list: List[str]) -> Dict[str, float]: + """ + Get the frequency of every amino acid. + + :param nt_seq: an amino acid sequence + :param codon_list: a list of codon + :return: a dictionary of float value corresponding to condon frequency \ + in nt_seq. + """ + if not full_defined(nt_seq): + return {codon: np.nan for codon in codon_list} + dic = {} + codon_seq = [nt_seq[i:i+3] for i in range(0, len(nt_seq) - 2, 3)] + for codon in codon_list: + dic[codon] = round(codon_seq.count(codon) / len(codon_seq) * 100, 5) + return dic + + +def get_aa_freq(aa_seq: str, aa_list: List[str]) -> Dict[str, float]: + """ + Get the frequency of every amino acid. + + :param aa_seq: an amino acid sequence + :param aa_list: a list of amino acid + :return: Frequency of each amino acid + """ + if not coding_full_defined(aa_seq): + return {aa: np.nan for aa in aa_list} + dic = {} + for aa in aa_list: + dic[aa] = round(aa_seq.count(aa) / len(aa_seq) * 100, 5) + return dic + + +def get_mean_propensity_score(aa_seq: str, + property_scale: Dict[str, Dict[str, float]] + ) -> Dict[str, float]: + """ + Get the mean score for each scale. + + :param aa_seq: an amino acid sequence + :param property_scale: link each scale to it's property. + :return: The mean property values for the sequence aa_seq + """ + if not coding_full_defined(aa_seq): + return {s: np.nan for s in property_scale.keys()} + dic = {} + for scale in property_scale.keys(): + count = 0 + for aa in aa_seq: + count += property_scale[scale][aa] + dic[scale] = round((count / len(aa_seq)), 5) + return dic + + +def mytranslate(dic_seq: Dict[str, Seq], coordinates: List[str] + ) -> Tuple[str, str]: + """ + Translate the sequence given by coordinates. + + :param dic_seq: a dictionary containing chromosome. + :param coordinates: chr, start, stop strand + :return: the sequence and the sequence translated + """ + ntseq = dic_seq[str(coordinates[0])][int(coordinates[1]): + int(coordinates[2])] + if coordinates[3] == "+": + seq = ntseq.translate(table=1) + else: + seq = ntseq.reverse_complement().translate(table=1) + ntseq = ntseq.reverse_complement() + if seq[-1] == "*": + seq = seq[:-1] + return str(ntseq), str(seq) + + +def get_orf_ft_type(ft: str) -> str: + """ + From a feature get the feature type of interest. + + :param ft: The name of the feature of interest + :return: The feature type of interest + """ + if len(ft) == 1: + return 'aa' + elif len(ft) == 3: + return 'codon' + elif len(ft) > 3: + return 'properties' + else: + msg = f'The len of parameter feature should be 1, 3, or >3' \ + f' not {len(ft)} !' + logging.exception(msg) + raise NameError(msg) \ No newline at end of file