diff --git a/src/nt_composition/distance.py b/src/nt_composition/distance.py new file mode 100644 index 0000000000000000000000000000000000000000..86b846680bea8330da33de410c7db32df7865360 --- /dev/null +++ b/src/nt_composition/distance.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: Contains the function to create distance figures. +""" + +import numpy as np +from typing import Dict, Tuple, List, Optional +import logging +import pandas as pd +import seaborn as sns +import matplotlib.pyplot as plt +from ..fig_utils.stat_annot import add_stat_annotation +from .config import ConfigNt +from ..logging_conf import logging_def +import sqlite3 +from .make_nt_correlation import get_project_colocalisation, get_frequency_dic +from pathlib import Path +from itertools import product +import os +import multiprocessing as mp +import doctest +from tqdm import tqdm + + +class IterationNumberError(Exception): + pass + + +def get_pvalue(values: np.array, target: float, iteration: int + ) -> Tuple[float, str]: + """ + Return The p-value and the regulation + + :param values: The list of control values + :param target: The test value + :param iteration: The iteration of interest + :return: The p-value and the regulation + + >>> get_pvalue(np.arange(10), 1.75, 10) + (0.2, ' . ') + >>> get_pvalue(np.arange(10), -1, 10) + (0.0, ' . ') + >>> get_pvalue(np.arange(10), 9.75, 10) + (0.0, ' . ') + >>> get_pvalue(np.arange(10), 8.8, 10) + (0.1, ' . ') + >>> get_pvalue(np.arange(100), 98.8, 100) + (0.01, ' + ') + >>> get_pvalue(np.arange(100), 100, 100) + (0.0, ' + ') + >>> get_pvalue(np.arange(100), -1, 100) + (0.0, ' - ') + >>> get_pvalue(np.arange(0, 0.10, 0.01), 0.05, 10) + (0.5, ' . ') + """ + values = np.sort(values) + val_len = len(values) + if val_len != iteration: + msg = f"Error the length of values vector {len(values)} " \ + f"is different than iteration {iteration}" + logging.exception(msg) + raise IterationNumberError(msg) + idx = values[values <= target].size + impov = idx / val_len + idx = values[values >= target].size + enrich = idx / val_len + regulation = " . " + if impov < enrich: + pval = impov + if pval <= 0.05: + regulation = " - " + else: + pval = enrich + if pval <= 0.05: + regulation = " + " + if iteration < 20: + regulation = " . " + return pval, regulation + + +def randomize_colocalisation(exon: np.array, nb_interation: int): + """ + Create an array of random co-localization one the same group of exon. + + :param arr_interaction: array of interaction between exons. + :return: An array with random interaction + + >>> exons = np.array(['1_1', '2_1', '1_2', '2_2', '3_2']) + >>> res = randomize_colocalisation(exons, 6) + >>> ex_found = list(np.unique(res.flatten())) + >>> len(res) == 6 and len(res[0]) == 2 + True + >>> len(list(exons)) >= len(ex_found) + True + >>> len([a for a in ex_found if a in exons]) == len(ex_found) + True + """ + return np.random.choice(exon, [nb_interation, 2]) + + + +def compute_mean_distance(arr_interaction: np.array, + dic_freq: Dict[str, float]) -> float: + """ + Get the mean distances between each couple of co-localized exons. + + :param arr_interaction: array of interaction between exons. + :param dic_freq: The frequency dataframe. + :return: The density table + + >>> inter = np.asarray([['1_1', '2_1'], ['3_1', '1_1']]) + >>> d = {'1_1': 10, '2_1': 20, '3_1': 30} + >>> compute_mean_distance(inter, d) + 15.0 + >>> d = {'1_1': 10, '2_1': 20, '3_1': np.nan} + >>> compute_mean_distance(inter, d) + 10.0 + """ + # return float(np.nanmean([abs(dic_freq[exon1] - dic_freq[exon2]) + # for exon1, exon2 in arr_interaction])) + res = [] + for exon1, exon2 in arr_interaction: + if exon1 != exon2: + val = abs(dic_freq[exon1] - dic_freq[exon2]) + res.append(val) + return float(np.nanmean(res)) + + +def compute_controls_distances(arr_interaction: np.array, + dic_freq: Dict[str, float], + iteration: int): + """ + Compute the mean distance for `iteraction`number of co-localised exons. + + :param arr_interaction: array of interaction between exons. + :param dic_freq: The frequency dataframe. + :param iteration: The number of iteration of interest + :return: The number of interaction of interest + + >>> inter = np.asarray([['1_1', '2_1'], ['3_1', '1_1']]) + >>> d = {'1_1': 10, '2_1': 20, '3_1': 30} + >>> res = compute_controls_distances(inter, d, 10) + >>> len(res) + 10 + """ + list_val = [] + exon = np.unique(arr_interaction.flatten()) + nb_interaction = len(arr_interaction) + pbar = tqdm(range(iteration), position=int( + mp.current_process().name.replace("ForkPoolWorker-", ""))) + for _ in pbar: + list_val.append(compute_mean_distance( + randomize_colocalisation(exon, nb_interaction), dic_freq)) + return list_val + + +def add_in_dic(dic: Dict[str, List], iteration: Optional[int], + kind: Optional[str], distance: Optional[float]): + """ + + :param dic: The dictionary of distance data + :param iteration: The number of iteration of interest + :param kind: The kind of interaction of interaction + :param distance: The differance in nucleotide frequencies between couple \ + of exons. + :return: The dictionary updated + + + >>> d = {'iteration': [], 'kind': [], 'distance': []} + >>> d = add_in_dic(d, 0, 'LOL', 10) + >>> d == {'iteration': [1], 'kind': ['LOL'], 'distance': [10]} + True + """ + dic['iteration'].append(iteration + 1) + dic['kind'].append(kind) + dic['distance'].append(distance) + return dic + + +def create_distance_table(arr_interaction: np.array, + dic_freq: Dict[str, float], project: str, + iteration: int, ft: str, ft_type: str + ) -> pd.DataFrame: + """ + Create a dataframe with the distance information on a given nucleotides + + :param arr_interaction: array of interaction between exons. + :param dic_freq: The frequency dataframe. + :param iteration: The number of iteration of interest + :param ft: A feature + :param ft_type: the kind of feature of interest + :param project: The project of interest + :return: The dataframe of distance + """ + dic = {"iteration": [], 'kind': [], 'distance': []} + + # Add ctrl distance to the dictionary + ctrl_distance = compute_controls_distances(arr_interaction, dic_freq, + iteration) + for k, d in enumerate(ctrl_distance): + dic = add_in_dic(dic, k, 'CTRL', d) + + # Add ctrl distance to the dictionary + test_distance = compute_mean_distance(arr_interaction, dic_freq) + dic = add_in_dic(dic, 0, project, test_distance) + + # Add p-value data + pval = get_pvalue(ctrl_distance, test_distance, iteration)[0] + dic = add_in_dic(dic, np.nan, f"p = {pval:.2e}", np.nan) + dic['ft'] = [ft] * len(dic['iteration']) + dic['ft_type'] = [ft_type] * len(dic['iteration']) + return pd.DataFrame(dic) + + +def create_distance_figure(df: pd.DataFrame, project: str, figfile: Path, + ft: str, iteration: int): + """ + Create a barplot figure showing the nucleotide distance between \ + co-localised exons in a ChIA-PET project and what this distance would be \ + if those exons where randmly co-localized. + + :param df: The dataframe of nucleotide distance + :param project: The name of the project of interest + :param figfile: The file where the figure will be created + :param ft: The feature of interest + :param iteration: Number of iteration + """ + sns.set(context='talk') + pval = df.loc[-df['kind'].isin(['CTRL', project]), 'kind'].values[0] + pval = float(pval.replace('p = ', "")) + df = df.loc[df['kind'].isin(['CTRL', project]), :] + g = sns.catplot(x="kind", y="distance", data=df, ci="sd", kind="bar", + height=12, aspect=1.7) + add_stat_annotation(g.ax, data=df, x="kind", y="distance", + loc='inside', box_pair_list=[['CTRL', project, pval]], + linewidth=2) + plt.title(f"Difference of {ft} frequency distance between " + f"co-localized exons and\nthe one of those same exons " + f"if the co-localisation where random ({iteration} " + f"samples).") + g.savefig(figfile) + plt.close() + + +def create_figures(ft: str, ft_type: str, + project: str, weight: int, global_weight: int, + same_gene: bool, iteration: int, + logging_level: str = "DISABLE" + ) -> None: + """ + Create a distance figure for the project `project` for the nucleotide \ + `nt`. + + :param project: The chia-pet project of interest + :param ft: The feature used to build the figure + :param ft_type: The type of feature of interest + :param weight: The minimum weight of interaction to consider + :param global_weight: The global weight to consider. if \ + the global weight is equal to 0 then then density figure are calculated \ + by project, else all projet are merge together and the interaction \ + seen in `global_weight` project are taken into account + :param same_gene: Say if we consider as co-localised exon within the \ + same gene + :param logging_level: The level of information to display + :param iteration: The number of iteration + """ + logging_def(ConfigNt.interaction, __file__, logging_level) + outfile = ConfigNt.get_density_file(weight, global_weight, project, + ft_type, ft, fig=False, + kind="distance") + if not outfile.is_file(): + cnx = sqlite3.connect(ConfigNt.db_file) + arr_interaction = get_project_colocalisation(cnx, project, weight, + global_weight, same_gene) + dic_freq = get_frequency_dic(cnx, ft, ft_type) + df = create_distance_table(arr_interaction, dic_freq, project, + iteration, ft, ft_type) + df.to_csv(outfile, sep="\t", index=False) + figfile = ConfigNt.get_density_file(weight, global_weight, project, + ft_type, ft, fig=True, + kind="distance") + create_distance_figure(df, project, figfile, ft, iteration) + + +def execute_density_figure_function(project: str, + ft_type: str, ft: str, weight: int, + global_weight: int, + same_gene: bool, + iteration: int) -> None: + """ + Execute create_density_figure and organized the results in a dictionary. + + :param project: The project of interest + :param ft_type: The feature type of interest + :param ft: The feature of interest + :param weight: The minimum weight of interaction to consider + :param global_weight: The global weight to consider. if \ + the global weight is equal to 0 then then density figure are calculated \ + by project, else all projet are merge together and the interaction \ + seen in `global_weight` project are taken into account + :param same_gene: Say if we consider as co-localised exon within the \ + same gene + :param iteration: The number of iteration of interest to create \ + the control set. + """ + logging.info(f'Working on {project}, {ft_type}, {ft} - {os.getpid()}') + create_figures(ft, ft_type, project, weight, + global_weight, same_gene, iteration) + + +def create_all_distance_figures(ps: int, weight: int = 1, + global_weight: int = 0, ft_type: str = "nt", + same_gene: bool = True, + iteration: int = 10000, + logging_level: str = "DISABLE"): + """ + Make density figure for every selected projects. + + :param logging_level: The level of data to display. + :param weight: The weight of interaction to consider + :param global_weight: The global weight to consider. if \ + the global weight is equal to 0 then then density figure are calculated \ + by project, else all projet are merge together and the interaction \ + seen in `global_weight` project are taken into account + :param ft_type: The kind of feature to analyse + :param same_gene: Say if we consider as co-localised exon within the \ + same gene + :param iteration: Number of iteration for control exons + :param ps: The number of processes to create + """ + logging_def(ConfigNt.interaction, __file__, logging_level) + if global_weight == 0: + di = pd.read_csv(ConfigNt.get_interaction_file(weight), sep="\t") + projects = di.loc[di['interaction_count'] > 300, 'projects'].values + else: + projects = [f"Global_projects_{global_weight}"] + ft_list = ConfigNt.get_features(ft_type) + param = product(projects, ft_list, [ft_type]) + pool = mp.Pool(processes=ps) + processes = [] + for project, ft, ft_type in param: + args = [project, ft_type, ft, weight, global_weight, same_gene, + iteration] + processes.append(pool.apply_async(execute_density_figure_function, + args)) + for proc in processes: + proc.get(timeout=None) + pool.close() + pool.join() + + +if __name__ == "__main__": + doctest.testmod()