Source code for lector.csv.preambles

"""Detectors of preambles in CSV files.

This is generally a chicken-and-egg-type situation. Do detect generic preambles robustly and
efficiently, it would really help to know the CSV dialect, or at least the delimiter. But to detect
the dialect/delimiter correctly, we need to ignore/(skip) the preamble. Detectors may therefore
rely on (somtimes) overly simplistic heuristics implicitly assuming a certain dialect.
"""
from __future__ import annotations

import csv
from abc import ABC, abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass, field
from itertools import islice
from typing import TextIO

from ..log import LOG


@dataclass
[docs] class PreambleDetector(ABC): """Base class for detecting preambles (initial junk) in a CSV buffer."""
[docs] n_rows: int = 100
@abstractmethod
[docs] def detect(self, buffer: TextIO) -> int: """Detect preamble and return number of lines to skip."""
[docs] class Preambles: """Registry to manage preamble detectors."""
[docs] DETECTORS = {}
@classmethod
[docs] def register(cls, registered: type) -> type: cls.DETECTORS[registered.__name__] = registered return registered
@classmethod
[docs] def detect( cls, buffer: TextIO, detectors: Iterable[PreambleDetector] | None = None, log: bool = False, ) -> int: """Get result of first preamble detector matching the csv buffer. Matching here means detecting more than 0 rows of preamble text, and result is the number of rows to skip. If no detectors are provided (as ordered sequence), all registered detector classes are tried in registered order and using default parameters. """ cursor = buffer.tell() if detectors is None: detectors = (det() for det in cls.DETECTORS.values()) for detector in detectors: skiprows = detector.detect(buffer) if skiprows: if log: name = detector.__class__.__name__ msg = f"'{name}' matches CSV buffer: detected {skiprows} rows to skip." LOG.info(msg) return skiprows buffer.seek(cursor) return 0
@Preambles.register @dataclass
[docs] class Brandwatch(PreambleDetector): """Detect CSV files exported from Brandwatch. Brandwatch uses the comma as separator and includes a row of commas only to separate preamble texts from the CSV table as such. """
[docs] def detect(self, buffer: TextIO) -> int: rows = [row.strip() for row in islice(buffer, self.n_rows)] for i, row in enumerate(rows): if len(row) > 0 and all(x == "," for x in row): return i + 1 return 0
@Preambles.register @dataclass
[docs] class Fieldless(PreambleDetector): """Detects initial rows that don't contain any delimited fields. Tries parsing buffer using Python's built-in csv functionality, assuming as delimiter the most frequent character amongst those configured via ``delimiters``. Given this delimiter, the parser detects N initial lines containing a single field only, followed by at least one line containing multiple fields. N is then the number of rows to skip. """
[docs] delimiters: str | list[str] = field(default_factory=lambda: [",", ";", "\t"])
[docs] def detect_with_delimiter(self, buffer: TextIO, delimiter: str) -> int: """Count how many consecutive initial fieldless rows we have given specific delimiter.""" reader = csv.reader( islice(buffer, self.n_rows), delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL, doublequote=True, skipinitialspace=True, ) for row in reader: if len(row) > 1: return reader.line_num - 1 return 0
[docs] def detect(self, buffer: TextIO) -> int: """Count consecutive initial fieldless rows given the most frequent delimiter.""" cursor = buffer.tell() delimiters = [self.delimiters] if isinstance(self.delimiters, str) else self.delimiters text = "".join(islice(buffer, self.n_rows)) counts = {delim: text.count(delim) for delim in delimiters} delimiter = max(counts.items(), key=lambda item: item[1])[0] buffer.seek(cursor) return self.detect_with_delimiter(buffer, delimiter)
@Preambles.register @dataclass