Skip to content
Snippets Groups Projects
Commit bd886a9d authored by alapendr's avatar alapendr
Browse files

initial src/ commit

parent e5cdb6e5
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#Creation of the database using Sqlite3 and SQL commands
import sqlite3
conn = sqlite3.connect("../../Results/Db_creation/chia_pet_database.db")
c = conn.cursor()
###########################
# Create table - gin_gene #
###########################
c.execute('''CREATE TABLE IF NOT EXISTS gin_gene
([name] VARCHAR(30) NOT NULL,
[id] INT NOT NULL,
[chromosome] VARCHAR(2) NOT NULL,
[start] INT NOT NULL,
[stop] INT NOT NULL,
[strand] VARCHAR(1) NOT NULL,
PRIMARY KEY ([id]))''')
###########################
# Create table - gin_exon #
###########################
c.execute('''CREATE TABLE IF NOT EXISTS gin_exon
([id] VARCHAR(30) NOT NULL,
[pos] INT NOT NULL,
[name] VARCHAR(45) NOT NULL,
[id_gene] INT NOT NULL,
[chromosome] VARCHAR(2) NOT NULL,
[start] INT NOT NULL,
[stop] INT NOT NULL,
[strand] VARCHAR(1) NOT NULL,
PRIMARY KEY ([id])
FOREIGN KEY ([id_gene]) REFERENCES gin_gene([id]))''')
##############################
# Create table - gin_project #
##############################
c.execute('''CREATE TABLE IF NOT EXISTS gin_project
([id] INT NOT NULL,
[id_sample] VARCHAR(45) NOT NULL,
[id_project] VARCHAR(45) NOT NULL,
[database] VARCHAR(45) NOT NULL,
[name] VARCHAR(45) NULL,
[description] TEXT NULL,
[antiboby] VARCHAR(45) NOT NULL,
[cell_line] VARCHAR(45) NOT NULL,
[institute] VARCHAR(45) NULL,
[citation] VARCHAR(20) NULL,
PRIMARY KEY ([id]))''')
#####################################
# Create table - gin_gene_frequency #
#####################################
c.execute('''CREATE TABLE IF NOT EXISTS gin_gene_frequency
([id] INT NOT NULL,
[ft] VARCHAR(3) NOT NULL,
[id_gene] INT NOT NULL,
[frequency] FLOAT NULL,
PRIMARY KEY ([id]),
FOREIGN KEY ([id_gene]) REFERENCES gin_gene([id]))''')
#####################################
# Create table - gin_exon_frequency #
#####################################
c.execute('''CREATE TABLE IF NOT EXISTS gin_exon_frequency
([id] INT NOT NULL,
[ft] VARCHAR(3) NOT NULL,
[id_exon] VARCHAR(30) NOT NULL,
[frequency] FLOAT NULL,
PRIMARY KEY ([id]),
FOREIGN KEY ([id_exon]) REFERENCES gin_exon([id]))''')
#######################################
# Create table - gin_exon_interaction #
#######################################
c.execute('''CREATE TABLE IF NOT EXISTS gin_exon_interaction
([id] INT NOT NULL,
[force] INT NOT NULL,
[exon1] VARCHAR(30) NOT NULL,
[exon2] VARCHAR(30) NOT NULL,
[id_project] INT NOT NULL,
[level] VARCHAR(25) NOT NULL,
PRIMARY KEY ([id]),
FOREIGN KEY ([exon1]) REFERENCES gin_exon([id]),
FOREIGN KEY ([exon2]) REFERENCES gin_exon([id]),
FOREIGN KEY ([id_project]) REFERENCES gin_project([id]))''')
#######################################
# Create table - gin_gene_interaction #
#######################################
c.execute('''CREATE TABLE IF NOT EXISTS gin_gene_interaction
([id] INT NOT NULL,
[force] INT NOT NULL,
[gene1] INT NOT NULL,
[gene2] INT NOT NULL,
[id_project] INT NOT NULL,
[level] VARCHAR(25) NOT NULL,
PRIMARY KEY ([id]),
FOREIGN KEY ([gene1]) REFERENCES gin_gene([id]),
FOREIGN KEY ([gene2]) REFERENCES gin_gene([id]),
FOREIGN KEY ([id_project]) REFERENCES gin_project([id]))''')
############################################
# Create table - gin_project_splicing_lore #
############################################
c.execute('''CREATE TABLE IF NOT EXISTS gin_project_splicing_lore
([id] INT NOT NULL,
[project_name] VARCHAR(45) NULL,
[source_db] VARCHAR(45) NOT NULL,
[db_id_project] VARCHAR(15) NOT NULL,
[sf_name] VARCHAR(45) NOT NULL,
[cl_name] VARCHAR(45) NOT NULL,
PRIMARY KEY ([id]))''')
############################
# Create table - ase_event #
############################
c.execute('''CREATE TABLE IF NOT EXISTS ase_event
([id] INT NOT NULL,
[id_project] INT NOT NULL,
[gene_id] INT NOT NULL,
[pos] INT NOT NULL,
[exon_id] VARCHAR(30) NOT NULL,
[delta_psi] FLOAT NULL,
[pvalue] FLOAT NULL,
[pvalue_glm_cor] FLOAT NULL,
PRIMARY KEY ([id]),
FOREIGN KEY ([exon_id]) REFERENCES gin_exon([id]),
FOREIGN KEY ([gene_id]) REFERENCES gin_gene([id]),
FOREIGN KEY ([id_project]) REFERENCES gin_project_splicing_lore([id]))''')
conn.commit()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Description: Populate the gin database
"""
from .config import Config, logging_def
from .creation_of_exon_table import get_ctrl, get_exon_table
from .create_freq_table import create_freq_table
from .populate_database import populate
import logging
def launcher(logging_level: str = "INFO"):
"""
Fill the database
"""
logging_def(Config.output, "INFO")
Config.output.mkdir(exist_ok=True)
# logging.info(f"Creation of {Config.ctrl_exon_file} file")
# get_ctrl(Config.exon_intern)
# logging.info(f"Creation of {Config.exon_file} file")
# get_exon_table(Config.ctrl_exon_file, Config.gene_file, logging_level)
# logging.info(f"Creation of {Config.frequency_file} file")
# create_freq_table(Config.bed_orf, Config.bed_exon, Config.ctrl_exon_file,
# logging_level)
#
mpopulate = populate.__wrapped__
# logging.info(f"Filling {Config.tables[0]} table")
# mpopulate(Config.tables[0], Config.gene_file, "y", logging_level)
# logging.info(f"Filling {Config.tables[1]} table")
# mpopulate(Config.tables[1], Config.exon_file, "y", logging_level)
logging.info(f"Filling {Config.tables[2]} table")
mpopulate(Config.tables[2], Config.frequency_file, "y", logging_level)
launcher(logging_level = "DEBUG")
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Description: Configuration class
"""
from pathlib import Path
from typing import Dict, Any
import logging
import logging.config
class Config:
"""
A class containing every parameters used in the submodule db_utils
"""
db_file = Path(__file__).parents[1] / "db.sqlite3"
tables = ["gin_gene", "gin_exon", "gin_frequency", "gin_interaction"]
bed_orf = Path(__file__).parents[1] / "data" / "exon_orf.bed"
bed_exon = Path(__file__).parents[1] / "data" / "exon_freq.bed"
output = Path(__file__).parents[1] / "results"
exon_intern = Path(__file__).parents[1] / "data" / "exon_intern.bed"
ctrl_exon_file = output / "CTRL_exons.txt"
exon_file = output / "exons.txt"
gene_file = Path(__file__).parents[1] / "data" / "genes.csv"
frequency_file = output / "frequency.txt"
class LoggingLevelError(Exception):
pass
def logging_def(output: Path, level: str = "INFO"):
"""
Define a logging at the current level of the script
:param output: Folder where the result will be created
:param level: The log level
"""
possible_levels = ["INFO", "DEBUG", "ERROR", "WARNING", "CRITICAL"]
if level in possible_levels:
basename = str(Path(__file__).name).replace(".py", ".log")
LOGGING_CONFIG["handlers"]["file"]["filename"] = output / basename
LOGGING_CONFIG["loggers"][""]["level"] = level
logging.config.dictConfig(LOGGING_CONFIG)
elif level != "DISABLE":
raise LoggingLevelError(f"Logging level unknown : choose from "
f"{possible_levels} or DISABLE to disable the "
f"initialisation of logging in {__file__}")
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'complex': {
'format': '%(filename)s:%(lineno)s:%(funcName)s():%(asctime)s - %('
'levelname)s - %(message)s'
},
"simple": {
'format': '%(message)s'
}
},
'handlers': {
'default': {
'level': 'NOTSET',
'formatter': 'simple',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout', # Default is stderr
},
'file': {
'level': 'NOTSET',
'formatter': 'complex',
'class': 'logging.FileHandler',
'filename': 'test.log',
'mode': 'w',
},
},
'loggers': {
'': { # root logger
'handlers': ['default', "file"],
'level': 'NOTSET',
'propagate': True
},
}
} # type: Dict[str, Any]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Description: The goal of this script is to create the tables of frequencies \
that will be next injected into the web database
"""
from pathlib import Path
import pandas as pd
from .creation_of_exon_table import get_ctrl_exon
from .config import Config, logging_def
import logging
import sys
def load_bed(bed_file: Path) -> pd.DataFrame:
"""
Load a bed file.
:param bed_file: A bed file containing exons with a 7th column \
corresponding to frequencies of in the exon.
:return: the bed file as a dataframe
"""
if "orf" in bed_file.name:
mtype = "orf"
else:
mtype = "exon"
names = ["chr", "start", "stop", "name", "score", "strand", "freq"]
df = pd.read_csv(bed_file, sep="\t", names=names)
df = df.loc[:, ["name", "freq"]]
df["type"] = [mtype] * len(df)
df.freq = df.freq.apply(eval)
return df
def get_ft_type(bed_type: str, feature: str) -> str:
"""
Get the feature type of a feature.
:param bed_type: The type of a bed : orf or exon
:param feature: The feature of interest
:return: The feature type
"""
if bed_type == "orf":
if len(feature) == 1:
return "aa"
else:
return "ft"
else:
if len(feature) == 1:
return "nt"
elif len(feature) == 2:
return "dnt"
else:
return "tnt"
def create_table(df: pd.DataFrame) -> pd.DataFrame:
"""
Create the table of that that will be inserted in gin database.
:param df: A dataframe of exon and the aa/ft frequencies \
of their encoded peptide and their nt/dnt/tnt frequencies
:return: A dataframe
"""
dic = {"ft": [], "frequency": [], "exon_id": [], "ft_type": []}
tot = len(df)
for i in range(len(df)):
if round(i / tot * 100) in range(101):
sys.stdout.write(f"Progression : "
f" {round((i + 1) / (tot / 100))} % \r")
s = df.iloc[i, :]
for ft in s.loc["freq"].keys():
ft_type = get_ft_type(s.loc["type"], ft)
dic["ft"].append(ft)
dic["frequency"].append(s.loc["freq"][ft])
dic["exon_id"].append(s.loc["name"])
dic["ft_type"].append(ft_type)
del df
table = pd.DataFrame(dic)
return table[["ft_type", "ft", "exon_id", "frequency"]].reset_index()
def create_freq_table(bed_orf: Path, bed_exon: Path, ctrl_exon: Path,
logging_level: str = "DISABLE") -> None:
"""
From to bed files with as 7th column corresponding to frequency of an \
exon compute the table that will be used to populate the gin database.
:param bed_orf: A bed file containing only ORF for exons and \
a 7th column corresponding to the frequency in amino acid or \
feature in the peptide coded by the exons
:param bed_exon: A bed file containing exons with a 7th column \
containing the frequency of nucleotides, di-nucleotides, tri-nucleotides.
:param ctrl_exon: The file containing control exons
:param logging_level: The level of information to display
"""
logging_def(Config.output, logging_level)
logging.debug(f"Loading {bed_orf}")
df_orf = load_bed(bed_orf)
logging.debug(f"Loading {bed_exon}")
df_exon = load_bed(bed_exon)
logging.debug("Concatenating load file and filtering on ctrl exons")
df = pd.concat([df_exon, df_orf], axis=0, ignore_index=True)
del df_exon
del df_orf
ctrl_exon_list = get_ctrl_exon(ctrl_exon)
df = df.loc[df.name.isin(ctrl_exon_list), :]
logging.debug(df.head())
logging.debug("Creating Frequency table")
final_table = create_table(df)
logging.debug(final_table.head())
logging.debug(f"Saving table to {Config.frequency_file}")
final_table.columns = ["id", "ft_type", "ft", "exon_id", "frequency"]
final_table.to_csv(Config.frequency_file, sep="\t")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Description: Create the exon_ctrl file and the exon file
"""
from pathlib import Path
import pandas as pd
from typing import List
from .config import Config, logging_def
import logging
def get_ctrl_exon(mfile: Path) -> List[str]:
"""
Get the control exons in mfile.
:param mfile: A file containing control exons
:return: The lis of control exons in mfile
"""
with mfile.open("r") as f:
exon_list = f.read().splitlines()
return exon_list
def get_ctrl(bed_file: Path) -> None:
"""
Create a file containing the control exons
:param bed_file: A bed file containing intern exons
:return: the bed file as a dataframe
"""
df = pd.read_csv(bed_file, sep="\t")
df.columns = ["chr", "start", "stop", "name", "score", "strand"]
df = df.loc[(df.stop - df.start > 2), :]
df = df.loc[:, "name"]
df.to_csv(Config.ctrl_exon_file, header=False, index=False)
def get_exon_table(ctrl_exon: Path, gene_tab: Path,
logging_level: str = "DISABLE"):
"""
Create the table of exon.
:param ctrl_exon: A file containing control exons
:param gene_tab: A file containing FasterDB gene display \
in two columns: gene_symbol and fasterdb id.
:param logging_level: The level of display
"""
logging_def(Config.output, logging_level)
ctrl_exon_list = get_ctrl_exon(ctrl_exon)
logging.debug("Creating dataframe of control exons")
df_exon = pd.DataFrame({"id": ctrl_exon_list})
logging.debug(f"{df_exon.head()}\nshape:{df_exon.shape}")
logging.debug("Creation of a pos column")
df_exon["pos"] = df_exon["id"].apply(lambda x: int(x.split("_")[1]))
logging.debug(f"{df_exon.head()}\nshape:{df_exon.shape}")
logging.debug("Creation of a gene_id column")
df_exon["gene_id"] = df_exon["id"].apply(lambda x: int(x.split("_")[0]))
logging.debug(f"{df_exon.head()}\nshape:{df_exon.shape}")
logging.debug("Addition of a gene column")
gene_df = pd.read_csv(gene_tab, sep="\t")
gene_df.columns = ["gene", "gene_id"]
df_exon = df_exon.merge(gene_df)
logging.debug(f"{df_exon.head()}\nshape:{df_exon.shape}")
logging.debug("Addition of a name column")
df_exon["name"] = df_exon.apply(lambda x: f"{x.gene}_"
f"{x.pos}", axis=1)
df_exon = df_exon[["pos", "name", "id", "gene_id"]]
logging.debug(f"{df_exon.head()}\nshape:{df_exon.shape}")
df_exon.to_csv(Config.exon_file, sep="\t", index=False)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Description: This file can be used to populate the database of the website.
"""
import sqlite3
from .config import Config, logging_def
from pathlib import Path
from typing import List, Tuple
import lazyparser as lp
import logging
import pandas as pd
class TableNameError(Exception):
pass
class ColumnsNumberError(Exception):
pass
class ColumnsNameError(Exception):
pass
def get_number_columns(table: str, cnx: sqlite3.Connection) -> List[str]:
"""
Get the number of columns in the table ``table``.
:param table: The name of the table in the database in which we want \
to add data.
:param cnx: Connection to the database
:return: The columns number
"""
cursor = cnx.cursor()
query = f"PRAGMA table_info({table});"
cursor.execute(query)
cols_data = cursor.fetchall()
cursor.close()
col_name = [col[1] for col in cols_data]
return col_name
def check_file(table: str, file: Path, cnx: sqlite3.Connection) -> List[Tuple]:
"""
Check is the file has alwas the same nu_mber of columns and if \
it match the number of columns find in the table ``table``.
:param table: The name of the table in the database in which we want \
to add data.
:param file: A tabulated file containing the data to insert in the table \
tab.
:param cnx: Connection to the database
:return: The row in file.
"""
column_names = get_number_columns(table, cnx)
df = pd.read_csv(file, sep="\t")
if len(df.columns) != len(column_names):
msg = "Wrong number of columns"
logging.exception(msg)
raise ColumnsNumberError(msg)
if sorted(df.columns) != sorted(column_names):
msg = f"some of the columns name in your tabulated " \
f"file {column_names} " \
f"and the columns name of the database table {table} : " \
f"{df.columns} differs"
logging.exception(msg)
raise ColumnsNameError(msg)
df = df[column_names]
return df.values
def clean_table(table: str, cnx: sqlite3.Connection) -> None:
"""
Remove every data in the table ``table``.
:param table: The name of the table in the database in which we want \
to add data.
:param cnx: Connection to the database
"""
cursor = cnx.cursor()
query = f"DELETE FROM {table}"
cursor.execute(query,)
cnx.commit()
cursor.close()
def insert_data(table: str, content: List[Tuple], cnx: sqlite3.Connection
) -> None:
"""
Insert the data in the database.
:param table: The name of the table in the database in which we want \
to add data.
:param content: The content to inject in ``table``
:param cnx: Connection to the database
"""
cursor = cnx.cursor()
v = ",".join(list("?" * len(content[0])))
query = f"INSERT INTO {table} VALUES ({v});"
cursor.executemany(query, content)
cnx.commit()
cursor.close()
@lp.parse(file="file", clean=["y", "Y", "n", "N"])
def populate(table: str, file: str, clean: str, logging_level: str =
"DISABLE"):
"""
Update the content of the database of the web interface.
:param table: The name of the table in the database in which we want \
to add data.
:param file: A tabulated file containing the data to insert in the table \
tab.
:param clean: y to remove the data in the table, n else.
:param logging_level: The level of information to display
"""
logging_def(Config.output, logging_level)
mfile = Path(file)
if "gin" not in table:
table = f"gin_{table.lower()}"
if table not in Config.tables:
msg = f"The name {table} is not available." \
f" If the table exist in the database, " \
f"change the config file to add the table name " \
f"wanted in 'tables' field"
logging.exception(msg)
raise TableNameError(msg)
cnx = sqlite3.connect(Config.db_file)
logging.debug("Checking file ...")
content = check_file(table, mfile, cnx)
if clean.upper() == "Y":
logging.debug("Cleaning table")
clean_table(table, cnx)
logging.debug("Inserting data ...")
insert_data(table, content, cnx)
if __name__ == "__main__":
populate()
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Description: this file contains a dictionary that allows to \
configure the logging module of python
"""
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'complex': {
'format': '%(filename)s:%(lineno)s:%(funcName)s():%(asctime)s - %('
'levelname)s - %(message)s'
},
"simple": {
'format': '%(message)s'
}
},
'handlers': {
'default': {
'level': 'NOTSET',
'formatter': 'simple',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout', # Default is stderr
},
'file': {
'level': 'NOTSET',
'formatter': 'complex',
'class': 'logging.FileHandler',
'filename': 'test.log',
'mode': 'w',
},
},
'loggers': {
'': { # root logger
'handlers': ['default', "file"],
'level': 'NOTSET',
'propagate': True
},
}
} # type: Dict[str, Any]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment