Skip to content
Snippets Groups Projects
Select Git revision
  • 7d1306df29d024c69f033b803e98fb413edd06e8
  • master default protected
  • fmortreu-master-patch-11d2
  • origin_hicstuff
  • dev
  • modified_containers
  • hicstuff
  • nico
  • TEMPLATE
  • nf-core-template-merge-2.7.2
  • nf-core-template-merge-2.7.1
  • nf-core-template-merge-2.6
  • nf-core-template-merge-2.5.1
  • nf-core-template-merge-2.5
  • nf-core-template-merge-2.4
  • nf-core-template-merge-2.3.2
  • nf-core-template-merge-2.3.1
  • nf-core-template-merge-2.3
  • nf-core-template-merge-2.2
  • nf-core-template-merge-2.1
  • nf-core-template-merge-2.0.1
  • 1.3.0
  • 1.2.2
  • 1.2.1
  • 1.2.0
  • 1.1.0
  • 1.0.0
27 results

check_samplesheet.py

Blame
  • user avatar
    nf-core-bot authored
    7d1306df
    History
    check_samplesheet.py 9.21 KiB
    #!/usr/bin/env python
    
    
    """Provide a command line tool to validate and transform tabular samplesheets."""
    
    
    import argparse
    import csv
    import logging
    import sys
    from collections import Counter
    from pathlib import Path
    
    logger = logging.getLogger()
    
    
    class RowChecker:
        """
        Define a service that can validate and transform each given row.
    
        Attributes:
            modified (list): A list of dicts, where each dict corresponds to a previously
                validated and transformed row. The order of rows is maintained.
    
        """
    
        VALID_FORMATS = (
            ".fq.gz",
            ".fastq.gz",
        )
    
        def __init__(
            self,
            sample_col="sample",
            first_col="fastq_1",
            second_col="fastq_2",
            single_col="single_end",
            **kwargs,
        ):
            """
            Initialize the row checker with the expected column names.
    
            Args:
                sample_col (str): The name of the column that contains the sample name
                    (default "sample").
                first_col (str): The name of the column that contains the first (or only)
                    FASTQ file path (default "fastq_1").
                second_col (str): The name of the column that contains the second (if any)
                    FASTQ file path (default "fastq_2").
                single_col (str): The name of the new column that will be inserted and
                    records whether the sample contains single- or paired-end sequencing
                    reads (default "single_end").
    
            """
            super().__init__(**kwargs)
            self._sample_col = sample_col
            self._first_col = first_col
            self._second_col = second_col
            self._single_col = single_col
            self._seen = set()
            self.modified = []
    
        def validate_and_transform(self, row):
            """
            Perform all validations on the given row and insert the read pairing status.
    
            Args:
                row (dict): A mapping from column headers (keys) to elements of that row
                    (values).
    
            """
            self._validate_sample(row)
            self._validate_first(row)
            self._validate_second(row)
            self._validate_pair(row)
            self._seen.add((row[self._sample_col], row[self._first_col]))
            self.modified.append(row)
    
        def _validate_sample(self, row):
            """Assert that the sample name exists and convert spaces to underscores."""
            if len(row[self._sample_col]) <= 0:
                raise AssertionError("Sample input is required.")
            # Sanitize samples slightly.
            row[self._sample_col] = row[self._sample_col].replace(" ", "_")
    
        def _validate_first(self, row):
            """Assert that the first FASTQ entry is non-empty and has the right format."""
            if len(row[self._first_col]) <= 0:
                raise AssertionError("At least the first FASTQ file is required.")
            self._validate_fastq_format(row[self._first_col])
    
        def _validate_second(self, row):
            """Assert that the second FASTQ entry has the right format if it exists."""
            if len(row[self._second_col]) > 0:
                self._validate_fastq_format(row[self._second_col])
    
        def _validate_pair(self, row):
            """Assert that read pairs have the same file extension. Report pair status."""
            if row[self._first_col] and row[self._second_col]:
                row[self._single_col] = False
                first_col_suffix = Path(row[self._first_col]).suffixes[-2:]
                second_col_suffix = Path(row[self._second_col]).suffixes[-2:]
                if first_col_suffix != second_col_suffix:
                    raise AssertionError("FASTQ pairs must have the same file extensions.")
            else:
                row[self._single_col] = True
    
        def _validate_fastq_format(self, filename):
            """Assert that a given filename has one of the expected FASTQ extensions."""
            if not any(filename.endswith(extension) for extension in self.VALID_FORMATS):
                raise AssertionError(
                    f"The FASTQ file has an unrecognized extension: {filename}\n"
                    f"It should be one of: {', '.join(self.VALID_FORMATS)}"
                )
    
        def validate_unique_samples(self):
            """
            Assert that the combination of sample name and FASTQ filename is unique.
    
            In addition to the validation, also rename all samples to have a suffix of _T{n}, where n is the
            number of times the same sample exist, but with different FASTQ files, e.g., multiple runs per experiment.
    
            """
            if len(self._seen) != len(self.modified):
                raise AssertionError("The pair of sample name and FASTQ must be unique.")
            seen = Counter()
            for row in self.modified:
                sample = row[self._sample_col]
                seen[sample] += 1
                row[self._sample_col] = f"{sample}_T{seen[sample]}"
    
    
    def read_head(handle, num_lines=10):
        """Read the specified number of lines from the current position in the file."""
        lines = []
        for idx, line in enumerate(handle):
            if idx == num_lines:
                break
            lines.append(line)
        return "".join(lines)
    
    
    def sniff_format(handle):
        """
        Detect the tabular format.
    
        Args:
            handle (text file): A handle to a `text file`_ object. The read position is
            expected to be at the beginning (index 0).
    
        Returns:
            csv.Dialect: The detected tabular format.
    
        .. _text file:
            https://docs.python.org/3/glossary.html#term-text-file
    
        """
        peek = read_head(handle)
        handle.seek(0)
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(peek)
        return dialect
    
    
    def check_samplesheet(file_in, file_out):
        """
        Check that the tabular samplesheet has the structure expected by nf-core pipelines.
    
        Validate the general shape of the table, expected columns, and each row. Also add
        an additional column which records whether one or two FASTQ reads were found.
    
        Args:
            file_in (pathlib.Path): The given tabular samplesheet. The format can be either
                CSV, TSV, or any other format automatically recognized by ``csv.Sniffer``.
            file_out (pathlib.Path): Where the validated and transformed samplesheet should
                be created; always in CSV format.
    
        Example:
            This function checks that the samplesheet follows the following structure,
            see also the `viral recon samplesheet`_::
    
                sample,fastq_1,fastq_2
                SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz
                SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz
                SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq.gz,
    
        .. _viral recon samplesheet:
            https://raw.githubusercontent.com/nf-core/test-datasets/viralrecon/samplesheet/samplesheet_test_illumina_amplicon.csv
    
        """
        required_columns = {"sample", "fastq_1", "fastq_2"}
        # See https://docs.python.org/3.9/library/csv.html#id3 to read up on `newline=""`.
        with file_in.open(newline="") as in_handle:
            reader = csv.DictReader(in_handle, dialect=sniff_format(in_handle))
            # Validate the existence of the expected header columns.
            if not required_columns.issubset(reader.fieldnames):
                req_cols = ", ".join(required_columns)
                logger.critical(f"The sample sheet **must** contain these column headers: {req_cols}.")
                sys.exit(1)
            # Validate each row.
            checker = RowChecker()
            for i, row in enumerate(reader):
                try:
                    checker.validate_and_transform(row)
                except AssertionError as error:
                    logger.critical(f"{str(error)} On line {i + 2}.")
                    sys.exit(1)
            checker.validate_unique_samples()
        header = list(reader.fieldnames)
        header.insert(1, "single_end")
        # See https://docs.python.org/3.9/library/csv.html#id3 to read up on `newline=""`.
        with file_out.open(mode="w", newline="") as out_handle:
            writer = csv.DictWriter(out_handle, header, delimiter=",")
            writer.writeheader()
            for row in checker.modified:
                writer.writerow(row)
    
    
    def parse_args(argv=None):
        """Define and immediately parse command line arguments."""
        parser = argparse.ArgumentParser(
            description="Validate and transform a tabular samplesheet.",
            epilog="Example: python check_samplesheet.py samplesheet.csv samplesheet.valid.csv",
        )
        parser.add_argument(
            "file_in",
            metavar="FILE_IN",
            type=Path,
            help="Tabular input samplesheet in CSV or TSV format.",
        )
        parser.add_argument(
            "file_out",
            metavar="FILE_OUT",
            type=Path,
            help="Transformed output samplesheet in CSV format.",
        )
        parser.add_argument(
            "-l",
            "--log-level",
            help="The desired log level (default WARNING).",
            choices=("CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"),
            default="WARNING",
        )
        return parser.parse_args(argv)
    
    
    def main(argv=None):
        """Coordinate argument parsing and program execution."""
        args = parse_args(argv)
        logging.basicConfig(level=args.log_level, format="[%(levelname)s] %(message)s")
        if not args.file_in.is_file():
            logger.error(f"The given input file {args.file_in} was not found!")
            sys.exit(2)
        args.file_out.parent.mkdir(parents=True, exist_ok=True)
        check_samplesheet(args.file_in, args.file_out)
    
    
    if __name__ == "__main__":
        sys.exit(main())