Source code for lector.utils

"""Common helpers to work with pyarrow objects."""
from __future__ import annotations

import json
from collections import namedtuple
from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from functools import singledispatch
from time import perf_counter
from typing import Callable, Union

import pyarrow as pa
from pyarrow import (
    Array,
    ChunkedArray,
    DataType,
    Schema,
    Table,
    type_for_alias,  # noqa: F401
)
from pyarrow import compute as pac
from pyarrow import types as pat
from pyarrow.lib import ensure_type  # noqa: F401

try:
    import pandas as pd

[docs] PANDAS_INSTALLED = True
except Exception: PANDAS_INSTALLED = False
[docs] Number = Union[int, float]
[docs] Limit = namedtuple("Limit", "min,max")
[docs] INT_LIMITS: dict[str, Limit] = { "int8": Limit(-128, 127), "int16": Limit(-32_768, 32_767), "int32": Limit(-2_147_483_648, 2_147_483_647), "int64": Limit(-9_223_372_036_854_775_808, 9_223_372_036_854_775_807), "uint8": Limit(0, 255), "uint16": Limit(0, 65_535), "uint32": Limit(0, 4_294_967_295), "uint64": Limit(0, 18_446_744_073_709_551_615), }
"""Minimum and maximum for each integer subtype."""
[docs] MISSING_STRINGS: set[str] = { "#N/A", "#N/A N/A", "#NA", "-1.#IND", "-1.#INF", "-1.#QNAN", "1.#IND", "1.#INF", "1.#INF000000", "1.#QNAN", "-NaN", "-nan", "<NA>", "N/A", "n/a", "NA", "NAN", "NaN", "nan", "NULL", "Null", "null", # Would expect this to happen automatically, but not the case # (at least when Arrow reads CSV with types="string") "", }
"""Extension of pandas and arrow default missing values.""" @contextmanager
[docs] def reset_buffer(buffer): """Caches and resets buffer position.""" cursor = buffer.tell() yield buffer.seek(cursor)
[docs] def smallest_int_type(vmin: Number, vmax: Number) -> str | None: """Find the smallest int type able to hold vmin and vmax.""" if vmin >= 0: types = ["uint8", "uint16", "uint32"] else: types = ["int8", "int16", "int32"] for type in types: limits = INT_LIMITS[type] if vmin >= limits.min and vmax <= limits.max: return type return None
[docs] def dtype_name(arr: Array): """Return a pandas-compatible type name including extension types where possible.""" type = arr.type name = str(type) if pat.is_integer(type) and arr.null_count > 0: name = name.replace("i", "I").replace("u", "U") return name
[docs] def min_max(arr: Array, skip_nulls: bool = True) -> tuple[Number, Number]: """Wrapper to get minimum and maximum in arrow array as python tuple.""" mm = pac.min_max(arr, skip_nulls=skip_nulls).as_py() return mm["min"], mm["max"]
[docs] def proportion_valid(arr: Array) -> float: """Proportion of non-null values in array.""" size = len(arr) return (size - arr.null_count) / size
[docs] def proportion_unique(arr: Array) -> float: """Proportion of non-null values that are unique in array.""" n_valid = len(arr) - arr.null_count if n_valid == 0: return 0 n_unique = pac.count_distinct(arr, mode="only_valid").as_py() return n_unique / n_valid
[docs] def proportion_trueish(arr: Array) -> float: if len(arr) == 0: # Still means we had no trueish values return 0 n_trueish = pac.sum(arr).as_py() or 0 # may return None otherwise, which we consider falsish return n_trueish / len(arr)
[docs] def proportion_equal(arr1: Array, arr2: Array, ignore_nulls=True) -> float: """Proportion of equal values, optionally ignoring nulls (which otherwise compare falsish.""" equal = pac.equal(arr1, arr2) if ignore_nulls: equal = equal.drop_null() return proportion_trueish(equal)
[docs] def empty_to_null(arr: Array) -> Array: """Convert empty strings to null values.""" is_empty = pac.equal(arr, "") return pac.if_else(is_empty, None, arr)
[docs] def sorted_value_counts(arr: Array, order: str = "descending", top_n: int | None = None) -> Array: """Arrow's built-in value count doesn't allow sorting.""" valcnt = arr.value_counts() counts = valcnt.field("counts") order = pac.array_sort_indices(counts, order="descending") if top_n is None: return valcnt.take(order) return valcnt.take(order[:top_n])
[docs] def map_values(arr: Array, map: dict, unknown: str = "keep") -> Array: """Slow value mapping in pure Python while Arrow doesn't have a native compute function. For now assumes type can be left unchanged. """ values = arr.to_pylist() if unknown == "keep": values = [map.get(val, val) for val in values] else: values = [map.get(val, None) for val in values] return pa.array(values, type=arr.type)
[docs] def categories(array: Array | ChunkedArray) -> Array: """Returns an array containing categories in input array of dictionary type.""" if not pat.is_dictionary(array.type): raise TypeError("Must have an array with dictionary type!") if isinstance(array, ChunkedArray): array = array.unify_dictionaries() return array.chunk(0).dictionary return array.dictionary
[docs] def is_stringy(type: DataType) -> bool: """Check if array is stringy (string or dictionary of strings).""" if pat.is_string(type): return True return pat.is_dictionary(type) and pat.is_string(type.value_type)
[docs] def with_flatten(arr: Array, func: Callable): """Apply a compute function to all elements of flattened (and restored) lists.""" isna = pac.is_null(arr) flat = pac.list_flatten(arr) transformed = func(flat) nested = pa.ListArray.from_arrays(arr.offsets, transformed) return pac.if_else(isna, None, nested)
[docs] def schema_diff(s1: Schema, s2: Schema) -> dict[str, tuple[DataType, DataType]]: """Check differences in schema's column types.""" diff = {} for field in s1: other = s2.field(field.name) if field.type != other.type: diff[field.name] = (field.type, other.type) return diff
[docs] def encode_metadata(d: dict): """Json-byte-encode a dict, like Arrow expects its metadata.""" return {k.encode("utf-8"): json.dumps(v).encode("utf-8") for k, v in d.items()}
[docs] def decode_metadata(d: dict): """Decode Arrow metadata to dict.""" return {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in d.items()}
[docs] class Timer:
[docs] def __enter__(self): self.start = perf_counter() return self
[docs] def __exit__(self, type, value, traceback): self.end = perf_counter() self.elapsed = self.end - self.start
if PANDAS_INSTALLED: # Arrow currently doesn't have any way to map its integer types to pandas # extension dtypes depending on whether a columns has missing values or not @singledispatch
[docs] def to_pandas(array: Array): """Proper conversion allowing pandas extension types.""" atype = array.type if pat.is_string(atype): return array.to_pandas().astype("string") if pat.is_boolean(atype): return array.to_pandas().astype("boolean") if pat.is_integer(atype) and array.null_count > 0: dtype_name = str(atype).replace("i", "I").replace("u", "U") return array.to_pandas(integer_object_nulls=True).astype(dtype=dtype_name) return array.to_pandas()
@to_pandas.register def _(table: Table): columns = [to_pandas(array) for array in table] df = pd.concat(columns, axis=1) df.columns = table.column_names return df
[docs] def uniquify(items: Sequence[str]) -> Iterator[str]: """Add suffixes to inputs strings if necessary to ensure is item is unique.""" seen = set() for item in items: newitem = item suffix = 0 while newitem in seen: suffix += 1 newitem = f"{item}_{suffix}" seen.add(newitem) yield newitem