diff --git a/src/find_interaction_cluster/radomization_test_ppi.py b/src/find_interaction_cluster/radomization_test_ppi.py new file mode 100644 index 0000000000000000000000000000000000000000..fa046d4ae7a8041a5f5794b3fa0daa33b32e1b37 --- /dev/null +++ b/src/find_interaction_cluster/radomization_test_ppi.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: Contains all the functions needed to make randomisation tests. +""" + + +import pandas as pd +import numpy as np +from doctest import testmod +from typing import Dict, Tuple +import logging +from tqdm import tqdm +from statsmodels.stats.multitest import multipletests + + +class IterationNumberError(Exception): + pass + + +def get_ppi_community_gene(df: pd.DataFrame) -> np.array: + """ + From a dataframe containing ppi communities return an array of \ + the gene id found in those community + + :param df: dataframe containing ppi communities + :return: An array containing genes belonging to PPI community + + >>> df_test = pd.DataFrame({"community": ["C1", "C2"], "node": [3, 2], + ... "edges": [3, 1], "cov": [0.77, 0.77], "modularity": [0.5, 0.5], + ... "genes": ["1, 2, 3", "5, 7"], "project": ["Global-weight-3"] * 2}) + >>> get_ppi_community_gene(df_test) + array([1, 2, 3, 5, 7]) + """ + gene_list = [] + for i in range(df.shape[0]): + gene_list += df.iloc[i, :]['genes'].split(", ") + return np.unique(np.array(gene_list, dtype=int)) + + +def get_dna_community_gene(df: pd.DataFrame) -> Dict[str, np.array]: + """ + Create a dictionary containing gene communities at DNA level. + + :param df: A dataframe containing the community at dna level + :return: A dictionary linking each community to the genes it contains + + >>> df_test = pd.DataFrame({"community": ["C1", "C2"], "node": [3, 2], + ... "edges": [3, 1], "cov": [0.77, 0.77], "modularity": [0.5, 0.5], + ... "genes": ["1, 2, 3", "5, 7"], "project": ["Global-weight-3"] * 2}) + >>> get_dna_community_gene(df_test) + {'C1': array([1, 2, 3]), 'C2': array([5, 7])} + >>> df_test = pd.DataFrame({"community": ["C1", "C1"], "node": [3, 2], + ... "edges": [3, 1], "cov": [0.77, 0.77], "modularity": [0.5, 0.5], + ... "genes": ["1, 2, 3", "5, 7"], "project": ["Global-weight-3"] * 2}) + >>> get_dna_community_gene(df_test) + Traceback (most recent call last): + ... + ValueError: A community name is duplicated + """ + dic_dna_gene = {} + if df.shape[0] != len(df['community'].unique()): + raise ValueError("A community name is duplicated") + for i in range(df.shape[0]): + ms = df.iloc[i, :] + dic_dna_gene[ms["community"]] = np.array(ms["genes"].split(', '), + dtype=int) + return dic_dna_gene + + +def get_common_genes(dna_gene: np.array, ppi_gene: np.array) -> int: + """ + Return the number of gene common between ppi_gene and size_ppi. + + :param ppi_gene: A community of gene whose protein product interact \ + between each other. + :param dna_gene: A community of gene at dna level + :return: The number of common exons between the two lists + + >>> get_common_genes(np.array([1, 5, 9, 15]), np.array([5, 9, 15, 17])) + 3 + >>> get_common_genes(np.array([1, 5, 9, 15]), np.array([5, 9, 15, 1])) + 4 + >>> get_common_genes(np.array([1, 5, 9, 15]), np.array([18, 19, 115, 0])) + 0 + """ + return len(np.intersect1d(dna_gene, ppi_gene)) + + +def get_list_common_gene(community: str, dic_dna_gene: Dict[str, np.array], + ppi_size: int, ppi_gene: np.array, iteration: int + ) -> np.array: + """ + Return the number of common gene between genes in the community \ + `community`and a sublist of randomly chosen genes in `ppi_gene` of \ + size `ppi_size`. + + :param community: A name of a gene community at dna level + :param dic_dna_gene: The dictionary containing gene interacting at dna \ + level + :param ppi_size: The size of the ppi_gene subsample + :param ppi_gene: All the gene interacting at protein level + :param iteration: The number of time we want to repeat the process + :return: The number of gene in common between the genes in community \ + `community` and the subsample of ppi_size + + >>> dna_gene = {"C1": [1, 2, 3, 4, 5]} + >>> ppi_g = list(range(1, 51)) + >>> get_list_common_gene("C1", dna_gene, 10, ppi_g, 5) + array([2, 1, 0, 1, 1]) + >>> ppi_g = list(range(1, 1001)) + >>> get_list_common_gene("C1", dna_gene, 100, ppi_g, 5) + array([0, 0, 1, 0, 0]) + >>> ppi_g = list(range(1, 11)) + >>> get_list_common_gene("C1", dna_gene, 5, ppi_g, 5) + array([3, 2, 2, 4, 3]) + >>> get_list_common_gene("C1", dna_gene, 10, ppi_g, 5) + array([5, 5, 5, 5, 5]) + """ + np.random.seed(1) + common = [ + get_common_genes( + dic_dna_gene[community], + np.random.choice(ppi_gene, ppi_size, replace=False), + ) + for _ in range(iteration) + ] + return np.array(common) + + +def get_pvalue(values: np.array, target: float, iteration: int + ) -> Tuple[float, str]: + """ + Return The p-value and the regulation + :param values: The list of control values + :param target: The test value + :param iteration: The iteration of interest + :return: The p-value and the regulation + >>> get_pvalue(np.arange(10), 1.75, 10) + (0.2, ' . ') + >>> get_pvalue(np.arange(10), -1, 10) + (0.0, ' . ') + >>> get_pvalue(np.arange(10), 9.75, 10) + (0.0, ' . ') + >>> get_pvalue(np.arange(10), 8.8, 10) + (0.1, ' . ') + >>> get_pvalue(np.arange(100), 98.8, 100) + (0.01, ' + ') + >>> get_pvalue(np.arange(100), 100, 100) + (0.0, ' + ') + >>> get_pvalue(np.arange(100), -1, 100) + (0.0, ' - ') + >>> get_pvalue(np.arange(0, 0.10, 0.01), 0.05, 10) + (0.5, ' . ') + """ + values = np.sort(values) + val_len = len(values) + if val_len != iteration: + msg = f"Error the length of values vector {len(values)} " \ + f"is different than iteration {iteration}" + logging.exception(msg) + raise IterationNumberError(msg) + idx = values[values <= target].size + impov = idx / val_len + idx = values[values >= target].size + enrich = idx / val_len + regulation = " . " + if impov < enrich: + pval = impov + if pval <= 0.05: + regulation = " - " + else: + pval = enrich + if pval <= 0.05: + regulation = " + " + if iteration < 20: + regulation = " . " + return pval, regulation + + +def update_overlap_df(df_overlap: pd.DataFrame, + dic_dna_gene: Dict[str, np.array], + ppi_gene: np.array, iteration: int + ) -> pd.DataFrame: + """ + + :param df_overlap: A dataframe containing every gene community at dna \ + level and the number of common gene with a gne community at protein \ + level + :param dic_dna_gene: The dictionary containing gene interacting at dna \ + level + :param ppi_gene: All the gene interacting at protein level + :param iteration: The number of time we want to repeat the process + :return: The updated Df (column p-value, p-adjust and regulation + + >>> do = pd.DataFrame({"DNA_community": ["C1", "C2", "C3"], + ... "community_size": [2, 3, 2], "nb_com-ppi": [2, 3, 1], + ... "size_com-ppi": [10, 10, 10]}) + >>> dna_gene = {"C1": [1, 2], "C2": [3, 4, 5], "C3": [6, 7]} + >>> ppi_g = list(range(11)) + >>> update_overlap_df(do, dna_gene, ppi_g, 100).iloc[:, + ... [0, 1, 2, 4, 5]] + DNA_community community_size nb_com-ppi p-value p-adjust + 0 C1 2 2 0.79 0.79 + 1 C2 3 3 0.72 0.79 + 2 C3 2 1 0.18 0.54 + >>> d = update_overlap_df(do, dna_gene, list(range(1000)), 100) + >>> d.iloc[:, [0, 1, 2, 4, 5]] + DNA_community community_size nb_com-ppi p-value p-adjust + 0 C1 2 2 0.01 0.01 + 1 C2 3 3 0.00 0.00 + 2 C3 2 1 0.00 0.00 + >>> d["regulation"].to_list() == [" + "] * 3 + True + """ + pval_list = [] + reg_list = [] + for i in tqdm(range(df_overlap.shape[0])): + values = get_list_common_gene(df_overlap.iloc[i, :]["DNA_community"], + dic_dna_gene, + df_overlap.iloc[i, :]["size_com-ppi"], + ppi_gene, + iteration) + pval, reg = get_pvalue(values, df_overlap.iloc[i, :]["nb_com-ppi"], + iteration) + pval_list.append(pval) + reg_list.append(reg) + df_overlap["p-value"] = pval_list + df_overlap["p-adjust"] = multipletests(pval_list, alpha=0.05, + method='fdr_bh', + is_sorted=False, + returnsorted=False)[1] + df_overlap["regulation"] = reg_list + return df_overlap + + +if __name__ == "__main__": + testmod()