From 4e9a6161c8dca99964d08d583e101edcedddcc79 Mon Sep 17 00:00:00 2001
From: Fontrodona Nicolas <nicolas.fontrodona@ens-lyon.fr>
Date: Thu, 5 Mar 2020 14:27:11 +0100
Subject: [PATCH]  src/db_utils/frequency_scripts: script allowing to populate
 the frequency tables of chia-pet database

---
 src/db_utils/frequency_scripts/config_freq.py |  80 ++++++++++
 .../create_n_fill_exon_frequency_file.py      | 144 ++++++++++++++++++
 .../frequency_scripts/frequency_function.py   | 102 +++++++++++++
 .../orf_frequency_functions.py                | 110 +++++++++++++
 4 files changed, 436 insertions(+)
 create mode 100644 src/db_utils/frequency_scripts/config_freq.py
 create mode 100644 src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py
 create mode 100644 src/db_utils/frequency_scripts/frequency_function.py
 create mode 100644 src/db_utils/frequency_scripts/orf_frequency_functions.py

diff --git a/src/db_utils/frequency_scripts/config_freq.py b/src/db_utils/frequency_scripts/config_freq.py
new file mode 100644
index 00000000..4f70c3c4
--- /dev/null
+++ b/src/db_utils/frequency_scripts/config_freq.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python3
+
+# -*- coding: UTF-8 -*-
+
+"""
+Description: Contain configuration variables used for computing \
+frequencies of exons, and gene
+"""
+
+from typing import List, Dict
+from ..config import Config
+import pandas as pd
+from pathlib import Path
+
+
+def full_defined(sequence: str) -> bool:
+    """
+    Says if all the nucleotide are well defined within the sequence
+
+    :param sequence: (string) nucleotide sequence
+    :return: (boolean) True if all nucleotide are well defined, False else
+    """
+    return "N" not in sequence
+
+
+def coding_full_defined(sequence: str) -> bool:
+    """
+    Says if sequence is an amino adic sequenc fully defined.
+
+    :param sequence: An amino acid sequence
+    :return: True if all amino acid are well defined, False else
+    """
+    for aa in sequence:
+        if aa in ['B', 'X', 'Z', 'J', 'U', 'O']:
+            return False
+    return True
+
+
+def compute_nt_list() -> List[str]:
+    """
+    compute the list of nucleotides, di-nucelotides and tri-nucleotides of \
+    interest.
+
+    :return: list of nucleotides, di-nucelotides and tri-nucleotides
+    """
+    nt = ["A", "C", "G", "T", "S", "W", "R", "Y"]
+    nt += [x + y for x in nt[0:4] for y in nt[0:4]]
+    nt += [x + y + z for x in nt[0:4] for y in nt[0:4] for z in nt[0:4]]
+    return nt
+
+
+def get_property(property_file: Path) -> Dict[str, Dict[str, float]]:
+    """
+
+    :param property_file: a file containing the scale
+    :return:  link each amino_acid \
+    to it's score for each property scale
+    """
+    df = pd.read_csv(property_file, sep="\t", index_col=0)
+    if not df.empty:
+        dic = df.to_dict()
+        return dic
+    else:
+        raise ValueError(f"The file {property_file} is empty !")
+
+
+class Config_freq:
+    """A class containing configurations variables used in this subfolder."""
+    property_file = Config.data / 'chosen_scales.csv'
+    exon_orf_bed = Config.data / 'bed' / 'exon_orf.bed'
+    output = Config.results / 'frequency_table'
+    output_exon_file = output / 'exon_freq.txt'
+    output_gene_file = output / 'gene_freq.txt'
+    iupac_dic = {"S": ["G", "C"], "W": ["A", "T"], "K": ["G", "T"],
+                 "M": ["A", "C"], "R": ["A", "G"], "Y": ["C", "T"]}
+    nt_list = compute_nt_list()
+    aa_list = list("RHKDESTNQCGPAVILMFYW")
+    codon_list = [nt for nt in nt_list if len(nt) == 3]
+    property_dic = get_property(property_file)
+    hg19 = Config.data / "Homo_sapiens.GRCh37.dna.primary_assembly.fa"
diff --git a/src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py b/src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py
new file mode 100644
index 00000000..4b6c8d38
--- /dev/null
+++ b/src/db_utils/frequency_scripts/create_n_fill_exon_frequency_file.py
@@ -0,0 +1,144 @@
+#!/usr/bin/env python3
+
+# -*- coding: UTF-8 -*-
+
+"""
+Description: Create
+"""
+
+from Bio import SeqIO
+from Bio.Seq import Seq
+import logging
+import logging.config
+from typing import Dict, List, Tuple
+from .config_freq import Config_freq, Config
+from pathlib import Path
+import pandas as pd
+from .frequency_function import concat_dic, compute_dic, get_ft_type
+from .orf_frequency_functions import mytranslate, get_codon_freq, \
+    get_aa_freq, get_mean_propensity_score, get_orf_ft_type
+from ...logging_conf import logging_def
+from ..populate_database import populate_df
+from tqdm import tqdm
+
+
+def get_freq_table(bed_file: Path, dic_seq: Dict[str, Seq]) -> pd.DataFrame:
+    """
+    Create a table with new frequency files.
+
+    :param bed_file: a bed file
+    :param dic_seq: Dictionary of sequences
+    """
+    logging.info(f"Files {bed_file}")
+    id_col = f'id_{bed_file.name.replace(".bed", "")}'
+    dic = {id_col: []}
+    num_lines = sum(1 for _ in bed_file.open("r"))
+    with bed_file.open("r") as inbed:
+        pbar = tqdm(inbed, desc=f'Processing {bed_file.name}', total=num_lines)
+        for line in pbar:
+            if "#" not in line[0]:
+                line = line.replace("\n", "")
+                sline: List[str] = line.split("\t")
+                if int(sline[2]) - int(sline[1]) > 2:
+                    coord = [sline[0], sline[1], sline[2], sline[5]]
+                    dic_freq = compute_dic(dic_seq, coord, Config_freq.nt_list)
+                    dic = concat_dic(dic, dic_freq, sline[3], id_col)
+    df = pd.DataFrame(dic)
+    df = df.melt(id_vars=[id_col], var_name="ft", value_name='frequency')
+    df['ft_type'] = df['ft'].apply(get_ft_type)
+    return df
+
+
+def get_orf_freq_table(bed_orf: Path, dic_seq: Dict[str, Seq]):
+    """
+    Write an orf bed exon file.
+
+    :param bed_orf: a bed file containing exons orf
+    :param dic_seq:a dictionary containing chromosome sequences.
+    """
+    id_col = 'id_exon'
+    dic = {id_col: []}
+    num_lines = sum(1 for _ in bed_orf.open("r"))
+    with bed_orf.open("r") as infile:
+        pbar = tqdm(infile, desc=f'Processing {bed_orf.name}', total=num_lines)
+        for line in pbar:
+            exon: List = line.replace('\n', '').split('\t')
+            exon[1] = int(exon[1])
+            exon[2] = int(exon[2])
+            if (exon[2] - exon[1]) % 3 != 0:
+                msg = f"The length of {exon[3]} is not a multiple of 3 !"
+                logging.exception(msg)
+                raise IndexError(msg)
+            nt_seq, aa_seq = mytranslate(dic_seq, [exon[0], exon[1],
+                                                   exon[2], exon[5]])
+            if len(aa_seq) > 0 and "*" not in aa_seq:
+                dic_codon = get_codon_freq(nt_seq, Config_freq.codon_list)
+                dic_aa = get_aa_freq(aa_seq, Config_freq.aa_list)
+                dic_prop = get_mean_propensity_score(aa_seq,
+                                                     Config_freq.property_dic)
+                dic_freq = dict(dic_codon, **dic_aa, **dic_prop)
+                dic = concat_dic(dic, dic_freq, exon[3], id_col)
+    df = pd.DataFrame(dic)
+    df = df.melt(id_vars=[id_col], var_name="ft", value_name='frequency')
+    df['ft_type'] = df['ft'].apply(get_orf_ft_type)
+    return df
+
+
+def load_hg19(hg19: Path) -> Dict[str, Seq]:
+    """
+    Load hg19 genome.
+
+    :param hg19: The human genome
+    :return: The sequence by chromosome in hg19
+    """
+    dic = {}
+    for record in SeqIO.parse(hg19, 'fasta'):
+        dic[record.id] = record.seq
+    return dic
+
+
+def create_or_load_freq_table() -> Tuple[pd.DataFrame, pd.DataFrame]:
+    """
+    Create frequency tables for gene and exons.
+    """
+    if not Config_freq.output_exon_file.is_file() \
+            or not Config_freq.output_gene_file.is_file():
+        logging.debug('loading hg19 genome')
+        dic_seq = load_hg19(Config_freq.hg19)
+    if not Config_freq.output_exon_file.is_file():
+        logging.debug('get frequency table for exon')
+        df_exon = get_freq_table(Config.bed_exon, dic_seq)
+        logging.debug(df_exon.head())
+        logging.debug('Get orf frequency for exon')
+        df_orf_exon = get_orf_freq_table(Config_freq.exon_orf_bed, dic_seq)
+        logging.debug(df_orf_exon.head())
+        df_exon = pd.concat([df_exon, df_orf_exon], axis=0, ignore_index=True)
+    else:
+        logging.debug('Loading exon_freq file ...')
+        df_exon = pd.read_csv(Config_freq.output_exon_file, sep="\t")
+    if not Config_freq.output_gene_file.is_file():
+        logging.debug('get frequency table gene')
+        df_gene = get_freq_table(Config.bed_gene, dic_seq)
+        logging.debug(df_gene.head())
+    else:
+        df_gene = pd.read_csv(Config_freq.output_gene_file, sep="\t")
+    return df_exon, df_gene
+
+
+def fill_frequency_tables(logging_level: str = 'DISABLE'):
+    """
+    Fill the frequency tables of exon and gene.
+
+    :param logging_level: The level of information to display
+    """
+    logging_def(Config_freq.output, __file__, logging_level)
+    df_exon, df_gene = create_or_load_freq_table()
+    exit(1)
+    logging.debug('Filling cin_exon_frequency')
+    populate_df(table='cin_exon_frequency', df=df_exon, clean='y')
+    logging.debug('Filling cin_gene_frequency')
+    populate_df(table='cin_gene_frequency', df=df_gene, clean='y')
+
+
+if __name__ == "__main__":
+    fill_frequency_tables('DEBUG')
diff --git a/src/db_utils/frequency_scripts/frequency_function.py b/src/db_utils/frequency_scripts/frequency_function.py
new file mode 100644
index 00000000..b8b6075f
--- /dev/null
+++ b/src/db_utils/frequency_scripts/frequency_function.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3
+
+# -*- coding: UTF-8 -*-
+
+"""
+Description: Contains function dedicated to compute the nucleotide \
+frequency of a sequence.
+"""
+
+from .config_freq import Config_freq, full_defined
+from typing import Iterable, Dict, List
+import logging
+from Bio.Seq import Seq
+import numpy as np
+
+
+def frequencies(sequence: str, nt_list: Iterable[str]) -> Dict[str, float]:
+    """
+    Compute the frequencies of ``sequence`` for every nt in ``nt_list.
+
+    :param sequence: (str) a nucleotide sequence
+    :param nt_list: (a list of nt)
+    :return: (dic) contains the frequence of every nucleotide
+    """
+    if not full_defined(sequence):
+        return {nt: np.nan for nt in nt_list}
+    dic = {nt: 0. for nt in nt_list}
+    seql = len(sequence)
+    for i, n in enumerate(sequence):
+        for nt in nt_list:
+            ntl = len(nt)
+            if ntl == 1:
+                if nt in ["S", "W", "K", "M", "R", "Y"]:
+                    if n in Config_freq.iupac_dic[nt]:
+                        dic[nt] += 1 / seql * 100
+                else:
+                    if n == nt:
+                        dic[nt] += 1 / seql * 100
+            else:
+                if sequence[i:i + ntl] == nt:
+                    dic[nt] += 1 / (seql - ntl + 1) * 100
+    for nt in nt_list:
+        dic[nt] = round(dic[nt], 5)
+    return dic
+
+
+def compute_dic(dic_seq: Dict[str, Seq], coord: List[str],
+                nt_list: Iterable[str]) -> Dict[str, float]:
+    """
+    Get the frequencies of a sequence.
+
+    :param dic_seq: (dictionary of seq object) a genome
+    :param coord: (tuple of coordinate)
+    :param nt_list: (str) list of nucleotide of interest
+    :return: (dic) contains the frequence of every nucleotide
+    """
+    sequence = dic_seq[coord[0]][int(coord[1]): int(coord[2])]
+    if coord[3] == "-":
+        sequence = sequence.reverse_complement()
+    return frequencies(sequence, nt_list)
+
+
+def concat_dic(main_dic: Dict[str, List], sub_dic: Dict[str, float],
+               region: str, name_col: str) -> Dict[str, List]:
+    """
+    Add the data in sub_dic into main_dic.
+
+    :param main_dic: The dictionary that will contains the
+    :param sub_dic: The dictionary containing nucleotide frequency of region \
+    `region`
+    :param region: The region with the nucleotide frequencies indicated by \
+    sub_dic.
+    :param name_col: The name of the column where the name of the regions \
+    are stored in main_dic
+    :return: main_dic with the frequency data of region `region`
+    """
+    main_dic[name_col].append(region)
+    for k in sub_dic.keys():
+        if k not in main_dic.keys():
+            main_dic[k] = [sub_dic[k]]
+        else:
+            main_dic[k].append(sub_dic[k])
+    return main_dic
+
+
+def get_ft_type(ft: str) -> str:
+    """
+    From a feature get the feature type of interest.
+
+    :param ft: The name of the feature of interest
+    :return: The feature type of interest
+    """
+    if len(ft) == 1:
+        return 'nt'
+    elif len(ft) == 2:
+        return 'dnt'
+    elif len(ft) == 3:
+        return 'tnt'
+    else:
+        msg = f'The len of parameter feature should be 1, 2, 3 not {len(ft)} !'
+        logging.exception(msg)
+        raise NameError(msg)
diff --git a/src/db_utils/frequency_scripts/orf_frequency_functions.py b/src/db_utils/frequency_scripts/orf_frequency_functions.py
new file mode 100644
index 00000000..c0dc41c7
--- /dev/null
+++ b/src/db_utils/frequency_scripts/orf_frequency_functions.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+
+# -*- coding: UTF-8 -*-
+
+"""
+Description: This script contains function to handle ORF
+"""
+
+
+from typing import List, Dict, Tuple
+from Bio.Seq import Seq
+from .config_freq import full_defined, coding_full_defined
+import numpy as np
+import logging
+
+
+def get_codon_freq(nt_seq: str, codon_list: List[str]) -> Dict[str, float]:
+    """
+    Get the frequency of every amino acid.
+
+    :param nt_seq: an amino acid sequence
+    :param codon_list: a list of codon
+    :return: a dictionary of float value corresponding to condon frequency \
+    in nt_seq.
+    """
+    if not full_defined(nt_seq):
+        return {codon: np.nan for codon in codon_list}
+    dic = {}
+    codon_seq = [nt_seq[i:i+3] for i in range(0, len(nt_seq) - 2, 3)]
+    for codon in codon_list:
+        dic[codon] = round(codon_seq.count(codon) / len(codon_seq) * 100, 5)
+    return dic
+
+
+def get_aa_freq(aa_seq: str, aa_list: List[str]) -> Dict[str, float]:
+    """
+    Get the frequency of every amino acid.
+
+    :param aa_seq: an amino acid sequence
+    :param aa_list: a list of amino acid
+    :return: Frequency of each amino acid
+    """
+    if not coding_full_defined(aa_seq):
+        return {aa: np.nan for aa in aa_list}
+    dic = {}
+    for aa in aa_list:
+        dic[aa] = round(aa_seq.count(aa) / len(aa_seq) * 100, 5)
+    return dic
+
+
+def get_mean_propensity_score(aa_seq: str,
+                              property_scale: Dict[str, Dict[str, float]]
+                              ) -> Dict[str, float]:
+    """
+    Get the mean score for each scale.
+
+    :param aa_seq: an amino acid sequence
+    :param property_scale: link each scale to it's property.
+    :return: The mean property values for the sequence aa_seq
+    """
+    if not coding_full_defined(aa_seq):
+        return {s: np.nan for s in property_scale.keys()}
+    dic = {}
+    for scale in property_scale.keys():
+        count = 0
+        for aa in aa_seq:
+            count += property_scale[scale][aa]
+        dic[scale] = round((count / len(aa_seq)), 5)
+    return dic
+
+
+def mytranslate(dic_seq: Dict[str, Seq], coordinates: List[str]
+                ) -> Tuple[str, str]:
+    """
+    Translate the sequence given by coordinates.
+
+    :param dic_seq: a dictionary containing chromosome.
+    :param coordinates: chr, start, stop strand
+    :return: the sequence and the sequence translated
+    """
+    ntseq = dic_seq[str(coordinates[0])][int(coordinates[1]):
+                                         int(coordinates[2])]
+    if coordinates[3] == "+":
+        seq = ntseq.translate(table=1)
+    else:
+        seq = ntseq.reverse_complement().translate(table=1)
+        ntseq = ntseq.reverse_complement()
+    if seq[-1] == "*":
+        seq = seq[:-1]
+    return str(ntseq), str(seq)
+
+
+def get_orf_ft_type(ft: str) -> str:
+    """
+    From a feature get the feature type of interest.
+
+    :param ft: The name of the feature of interest
+    :return: The feature type of interest
+    """
+    if len(ft) == 1:
+        return 'aa'
+    elif len(ft) == 3:
+        return 'codon'
+    elif len(ft) > 3:
+        return 'properties'
+    else:
+        msg = f'The len of parameter feature should be 1, 3, or >3' \
+              f' not {len(ft)} !'
+        logging.exception(msg)
+        raise NameError(msg)
\ No newline at end of file
-- 
GitLab