Skip to content
Snippets Groups Projects
community_calibration.py 11.2 KiB
Newer Older
#!/usr/bin/env python3

# -*- coding: UTF-8 -*-

"""
Description: The goal of this program is to create communites \
of genes or exons with HipMCL by changing the inflation parameter \
and see which values of inflation parameter gives the best result \
in term of modularity, number of communities detected, size of those \
communities
"""

import sqlite3
from .config import ConfigGraph
from typing import List
import logging
import pandas as pd
import numpy as np
import lazyparser as lp
from pathlib import Path
import seaborn as sns
from math import log
from .community_finder import logging_def, get_project_colocalisation, \
    write_interaction_file, create_graph, find_communities


def get_out_name(weight: int, global_weight: int, inflation: float,
                 project: str = "", same_gene=True, feature: str = "exon",
                 use_weight: bool = False):
    """
    return the output file where the communities are stored.

    :param project: The name of the project of interest
    :param weight: The minimum weight of interaction to consider
    :param global_weight: The global weight to consider. if \
    the global weight is equal to 0 then then density figure are calculated \
    by project, else all projet are merge together and the interaction \
    seen in `global_weight` project are taken into account
    :param inflation: The inflation parameter of HipMCL program
    :param same_gene: Say if we consider as co-localised, exons within the \
    same gene (True) or not (False) (default False)
    :param feature: The feature we want to analyse (default 'exon')
    :param use_weight: Say if we want to write the weight into the result file.
    :return:
    """
    w = "weigthed" if use_weight else "unweigthed"
    if global_weight != 0:
        project = f"global-weight-{global_weight}"
    return ConfigGraph.community_calibration_folder / "community_files" / \
        f"{project}_weight-{weight}_same_gene-{same_gene}_{feature}_" \
        f"{inflation}_{w}.txt"


def get_figname(weight: int, global_weight: int,
                project: str = "", same_gene=True, feature: str = "exon",
                use_weight: bool = False):
    """
    return the output file where the communities are stored.

    :param project: The name of the project of interest
    :param weight: The minimum weight of interaction to consider
    :param global_weight: The global weight to consider. if \
    the global weight is equal to 0 then then density figure are calculated \
    by project, else all projet are merge together and the interaction \
    seen in `global_weight` project are taken into account
    :param same_gene: Say if we consider as co-localised, exons within the \
    same gene (True) or not (False) (default False)
    :param feature: The feature we want to analyse (default 'exon')
    :param use_weight: Say if we want to write the weight into the result file.
    :return:
    """
    w = "weigthed" if use_weight else "unweigthed"
    if global_weight != 0:
        project = f"global-weight-{global_weight}"
    return ConfigGraph.community_calibration_folder / \
        f"{project}_weight-{weight}_same_gene-{same_gene}_{feature}_{w}.pdf"


def community_finder(weight: int, global_weight: int, inflation: float,
                     project: str = "", same_gene=True, feature: str = "exon",
                     use_weight: bool = False):
    """
    Find communities inside co-localisation between exons found in \
    a ChIA-PET project.

    :param project: The name of the project of interest
    :param weight: The minimum weight of interaction to consider
    :param global_weight: The global weight to consider. if \
    the global weight is equal to 0 then then density figure are calculated \
    by project, else all projet are merge together and the interaction \
    seen in `global_weight` project are taken into account
    :param inflation: The inflation parameter of HipMCL program
    :param same_gene: Say if we consider as co-localised, exons within the \
    same gene (True) or not (False) (default False)
    :param use_weight: Say if we want to write the weight into the result file.
    :param feature: The feature we want to analyse (default 'exon')
    """
    inflation = round(inflation, 2)
    logging.info(f"Working with inflation {inflation}")
    outfile = get_out_name(weight, global_weight, inflation,
                           project, same_gene, feature, use_weight)
    outfile.parent.mkdir(exist_ok=True, parents=True)
    if outfile.is_file():
        return pd.read_csv(outfile, sep="\t")
    cnx = sqlite3.connect(ConfigGraph.db_file)
    interaction = get_project_colocalisation(cnx, project, weight,
                                             global_weight, same_gene, True,
                                             level=feature)
    outfileg, result_file = write_interaction_file(interaction, project,
                                                   weight, global_weight,
                                                   same_gene, feature=feature,
                                                   use_weight=use_weight)
    if result_file.is_file():
        result_file.unlink()
    graph = create_graph(interaction)
    df, dic_community = find_communities(graph, project, outfileg, result_file,
                                         feature, inflation=inflation,
                                         compute_ec_cov=False)
    logging.debug('Writing results ...')
    df.to_csv(outfile, sep="\t", index=False)
    return df


def create_dataframe(list_df: List[pd.DataFrame], list_inflation: np.array
                     ) -> pd.DataFrame:
    """
    Create a pandas dataframe indicating for each inflation, the number \
    of community created and the modularity of the communities.

    :param list_df: A list of dataframe of community
    :param list_inflation: The list of inflation values
    :return: A dataframe containing the inflation column, the \
    number of community created, the modularity of those community and \
    the median size of the community

    >>> d = pd.DataFrame({"community": ["C1", "C2"], "nodes": [10, 20],
    ... "edges": [5, 7], "modularity": [0.95, 0.95]})
    >>> create_dataframe([d, d], [1.5, 2])
      inflation  modularity  community_number  median_size
    0       1.5        0.95                 2         15.0
    1       2.0        0.95                 2         15.0
    """
    dic = {"inflation": list_inflation, "modularity": [],
           "community_number": [], "median_size": []}
    for n, i in enumerate(list_inflation):
        dic["modularity"].append(list_df[n]["modularity"][0])
        dic["community_number"].append(list_df[n].shape[0])
        dic["median_size"].append(np.median(list_df[n]["nodes"].to_list()))

    df = pd.DataFrame(dic)
    df["inflation"] = df["inflation"].round(2).astype(str)
    return df


def create_community_size_dataframe(list_df: List[pd.DataFrame],
                                    list_inflation: np.array
                                    ) -> pd.DataFrame:
    """
    Create a pandas dataframe indicating for each inflation, the \
    size of each communities

    :param list_df: A list of dataframe of community
    :param list_inflation: The list of inflation values
    :return: A dataframe containing the inflation column, the \
    size of each community

    >>> d = pd.DataFrame({"community": ["C1", "C2"], "nodes": [10, 20],
    ... "edges": [5, 7], "modularity": [0.95, 0.95]})
    >>> create_community_size_dataframe([d, d], [1.5, 2])
       inflation  community_size
    0        1.5              10
    1        1.5              20
    2        2.0              10
    3        2.0              20
    """
    dic = {"inflation": [], "community_size": []}
    for n, i in enumerate(list_inflation):
        cdf = list_df[n]
        vect_size = cdf["nodes"].to_list()
        dic["community_size"] += vect_size
        dic["inflation"] += [i] * len(vect_size)
    df = pd.DataFrame(dic)
    df["inflation"] = df["inflation"].round(2).astype(str)
    return df


def create_scatter(df_infl: pd.DataFrame, fig_name: Path) -> None:
    """
    Create a figure indicating the modularity of communities for each \
    inflation parameter.

    :param df_infl: A dataframe containing a column inflation, modularity \
    community_number and median_size
    :param fig_name: The file where the figure will be stored
    """
    sns.set(context="poster")
    g = sns.catplot(x="inflation", y="modularity", data=df_infl, aspect=1.7,
                    height=12, palette=["red"] * df_infl.shape[0],
                    kind="point")
    xrange = g.ax.get_xlim()
    ax2 = g.ax.twinx()
    df_infl.plot(x="inflation", y="community_number",
                 color=(0.2, 0.8, 0.2, 0.4), ax=ax2,
                 kind="scatter", zorder=55, s=275)
    ax2.set_ylabel('community_number', color="green")
    ax2.tick_params(axis='y', labelcolor="green")
    ax2.grid(False)
    ax3 = g.ax.twinx()
    ax3.set_ylabel('median_size', color="purple")
    ax3.spines["right"].set_position(("axes", 1.1))
    ax3.spines["right"].set_visible(True)
    df_infl.plot(x="inflation", y="median_size",
                 color=(0.8, 0.2, 0.8, 0.4), ax=ax3,
                 kind="scatter", zorder=55, s=275)
    ax3.tick_params(axis='y', labelcolor="purple")
    ax3.grid(False)
    g.ax.set_xlim(xrange)
    g.savefig(fig_name)


def create_community_size_fig(df_infl: pd.DataFrame, fig_name: Path) -> None:
    """
    Create a figure indicating the size of communities for each \
    inflation parameter.

    :param df_infl: A dataframe containing a column inflation, and \
    community_size
    :param fig_name: The file where the figure will be stored
    """
    sns.set(context="poster")
    df_infl["community_size"] = df_infl["community_size"].astype(float)
    df_infl = df_infl[df_infl["community_size"] > 0]
    df_infl["community_size"] = [log(x) for x in
                                 df_infl["community_size"].to_list()]
    g = sns.catplot(x="inflation", y="community_size", data=df_infl,
                    aspect=1.7, height=12, kind="violin")
    g.savefig(fig_name)


@lp.parse(weight=range(1, 11), global_weight=range(11),
          feature=('gene', 'exon'), istart="1.1 <= istart < 2.5",
          istop="1.1 < istop <= 2.5", istep="0 < istep <= 1")
def make_calibration(weight: int, global_weight: int, istart: float =1.1,
                     istop: float=2.5, istep: float=0.1, project: str = "",
                     same_gene=True, feature: str = "exon",
                     use_weight: bool=False, logging_level: str = "INFO"):
    logging_def(ConfigGraph.output_folder, __file__, logging_level)
    inflations = np.arange(istart, istop + istep, istep)
    list_df = [
        community_finder(weight, global_weight, i, project, same_gene, feature,
                         use_weight=use_weight)
        for i in inflations
    ]
    df_infl = create_dataframe(list_df, inflations)
    df_size = create_community_size_dataframe(list_df, inflations)
    figname = get_figname(weight, global_weight, project, same_gene, feature,
                          use_weight)
    create_scatter(df_infl, figname)
    create_community_size_fig(df_size, figname.parent /
                              figname.name.replace(".pdf", "_sizes.pdf"))


if __name__ == "__main__":
    make_calibration()