diff --git a/src/find_interaction_cluster/graph_figures/create_community_node_graph.py b/src/find_interaction_cluster/graph_figures/create_community_node_graph.py new file mode 100644 index 0000000000000000000000000000000000000000..63222d817c35ec57e7514e4e9bca2e62d7ec72b7 --- /dev/null +++ b/src/find_interaction_cluster/graph_figures/create_community_node_graph.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 + +# -*- coding: UTF-8 -*- + +""" +Description: The goal of this script is to create a community node \ +graph and store it inside a json file. +""" + + +from .graph_functions import recover_json_graph_of_interest, Parameters, \ + load_graphic +from typing import Tuple, Dict, List +import networkx as nx +import itertools +import pandas as pd +from pathlib import Path +from .config_graph_figure import Config +import json +from pyvis.network import Network +import lazyparser as lp +from tqdm import tqdm +from math import log10, log2 + + +def connected_nodes(g: nx.Graph, node1: str, node2: str) -> bool: + """ + True if node1 and node2 are connected by an edge, False else. + + :param g: A networkx graph + :param node1: A node in the graph + :param node2: Another node in the graph + :return: True if the node are connect through an edge + + >>> g = nx.Graph() + >>> g.add_nodes_from(list("ABCDEF")) + >>> g.add_edges_from([("A", "B"), ("A", "C"), ("D", "E"), ("D", "F"), + ... ("C", "B"), ("E", "F"), ("B", "D"), ("C", "F")]) + >>> {f"{x}-{y}": connected_nodes(g, x, y) + ... for x, y in list(itertools.combinations(list("ABCDEF"), 2))} == { + ... 'A-B': True, + ... 'A-C': True, 'A-D': False, 'A-E': False, 'A-F': False, 'B-C': True, + ... 'B-D': True, 'B-E': False, 'B-F': False, 'C-D': False, 'C-E': False, + ... 'C-F': True, 'D-E': True, 'D-F': True, 'E-F': True} + True + """ + return g.get_edge_data(node1, node2) is not None + + +def get_community_interaction_weight(g: nx.Graph, dic_com: Dict[str, List], + community1: str, community2: str + ) -> Tuple[str, str, int]: + """ + + :param g: A network x graph + :param dic_com: A dictionary containing communities and \ + the nodes of g contained in this community + :param community1: The name of community1 (must be a key of dic_com). + :param community2: The name of community2 (must be a key of dic_com). + :return: A tuple containing the communities name and the weight of \ + the interactions. + >>> g = nx.Graph() + >>> g.add_nodes_from(list("ABCDEF")) + >>> g.add_edges_from([("A", "B"), ("A", "C"), ("D", "E"), ("D", "F"), + ... ("C", "B"), ("E", "F"), ("B", "D"), ("C", "F")]) + >>> d = {"C1": list("ABC"), "C2": list("DEF")} + >>> get_community_interaction_weight(g, d, "C1", "C2") + ('C1', 'C2', 2) + """ + prod = itertools.product(dic_com[community1], dic_com[community2]) + val = sum(connected_nodes(g, node1, node2) for node1, node2 in prod) + return (community1, community2, val) + + +def compute_edges(g: nx.Graph, dic_com: Dict[str, List], + ) -> List[Tuple[str, str, int]]: + """ + + :param g: A network x graph + :param dic_com: A dictionary containing communities and \ + the nodes of g contained in this community + :return: The list of interactions between communities + >>> g = nx.Graph() + >>> g.add_nodes_from(list("ABCDEF")) + >>> g.add_edges_from([("A", "B"), ("A", "C"), ("D", "E"), ("D", "F"), + ... ("C", "B"), ("E", "F"), ("B", "D"), ("C", "F")]) + >>> d = {"C1": list("ABC"), "C2": list("DF"), "C3": ["E"]} + >>> compute_edges(g, d) + [('C1', 'C2', 2), ('C2', 'C3', 2)] + """ + pbar = tqdm(list(itertools.combinations(list(dic_com.keys()), 2))) + val = [get_community_interaction_weight(g, dic_com, c1, c2) + for c1, c2 in pbar] + return [v for v in val if v[2] != 0] + + +def create_graph(list_nodes: List[str], list_edges: List[Tuple], + sizes: List[int]) -> nx.Graph: + """ + Create a community sized graph. + + :param list_nodes: A list of nodes (the communities) + :param list_edges: A list of edges (interactions between communities) + :param size: community size list + :return: The community sized network + """ + g = nx.Graph() + nodes = [(n, {"node_size": s, "title": str(s), "size": log2(s)}) + for n, s in zip(list_nodes, sizes)] + g.add_nodes_from(nodes) + g.add_weighted_edges_from(list_edges) + for a, b, c in list_edges: + g.edges[a, b]["title"] = str(g.edges[a, b]["weight"]) + g.edges[a, b]["width"] = log10(g.edges[a, b]["weight"]) + return g + + +def create_dicom(file_com: Path, feature: str, min_community_size: int = 10 + ) -> Dict[str, List]: + """ + Create a dictionary linking communities to their nodes. + + :param file_com: A file containing communities + :param feature: The kind of feature of interest + :param min_community_size: The minimum size used to consider communities + :return: dictionary linking communities to their nodes. + + >>> create_dicom(Config.tests_files / "test_community_file.txt", + ... "gene") == {'C1': ['415', '416', '421', '422', '423', '433', + ... '441', '475', '481', '502', '511'], 'C2': ['10123', '10209', '8812', + ... '9140', '9166']} + True + """ + df = pd.read_csv(file_com, sep="\t") + df = df[df["nodes"] >= min_community_size].copy() + list_com = df["community"].to_list() + list_ft = df[f"{feature}s"].to_list() + return {c: f.split(", ") for c, f in zip(list_com, list_ft)} + + +@lp.parse +def create_community_sized_graph(project: str, weight: int, global_weight: int, + same_gene: bool, inflation: float, + cell_line: str, feature: str, + min_community_size: int = 10) -> Path: + """ + :param project: A project name of interest. Used only if \ + global_weight is 0 + :param weight: The weight of interaction to consider + :param global_weight: The global weight to consider. if \ + the global weight is equal to 0 then then density figure are \ + calculated by project, else all project are merge together and the \ + interaction seen in `global_weight` project are taken into account \ + :param same_gene: Say if we consider as co-localised, exons within \ + the same gene (True) or not (False) + :param inflation: The inflation parameter + :param cell_line: Interactions are only selected from projects made \ + on a specific cell line (ALL to disable this filter) + :param feature: The feature we want to analyse + :param min_community_size: The minimum size used to consider communities + """ + p = Parameters(project, weight, global_weight, same_gene, inflation, + cell_line, feature) + graph_file, comm_file = recover_json_graph_of_interest(p) + outfile_json = comm_file.parent / \ + f"community_min-size={min_community_size}_level_graph.json" + if outfile_json.is_file(): + return outfile_json + dic_com = create_dicom(comm_file, feature, min_community_size) + graph = load_graphic(graph_file) + edges = compute_edges(graph, dic_com) + tmp = pd.read_csv(comm_file, sep="\t") + nodes = tmp.loc[tmp["nodes"] >= min_community_size, "community"].to_list() + sizes = tmp.loc[tmp["nodes"] >= min_community_size, "nodes"].to_list() + g = create_graph(nodes, edges, sizes) + json_graph = nx.json_graph.node_link_data(g) + outfile_html = comm_file.parent / \ + f"community_min-size={min_community_size}_level_graph.html" + json.dump(json_graph, outfile_json.open('w'), indent=2) + net = Network(width="100%", height="100%") + net.from_nx(g) + net.hrepulsion() + net.toggle_physics(False) + net.show_buttons(filter_=["nodes", "edges", "physics"]) + net.write_html(str(outfile_html)) + return outfile_json + + + + + +if __name__ == "__main__": + create_community_sized_graph()