Skip to content

topmark.pipeline.steps.sniffer

topmark / pipeline / steps / sniffer

Pre-read file sniffer.

This module implements the SnifferStep, a lightweight pre-read pipeline step that inspects files between resolution and full text loading. It performs:

  • existence and permission checks
  • fast binary sniff using a NUL-byte heuristic
  • strict UTF-8 validation on small chunks
  • BOM + shebang ordering checks for shebang-aware file types
  • raw newline histogram construction (LF/CRLF/CR) with mixed-newline detection

The main step entrypoint is [SnifferStep.run], which delegates to several helpers:

  • _count_newlines - counts LF/CRLF/CR sequences with CR/LF carry handling
  • inspect_bom_shebang - inspects the prefix for UTF-8 BOM and shebang ordering
  • _commit_newline_stats - populates newline histogram and derived stats on the context
  • _sniff_stream - orchestrates byte-level sniffing and returns an optional terminal FsStatus that SnifferStep.run applies
Sets
  • FsStatus → {OK, EMPTY, NOT_FOUND, NO_READ_PERMISSION, UNREADABLE, NO_WRITE_PERMISSION, BINARY, UNICODE_DECODE_ERROR, BOM_BEFORE_SHEBANG, MIXED_LINE_ENDINGS}

SnifferStep

SnifferStep()

Bases: BaseStep

Pre-read checks (existence/perm, binary/UTF-8, BOM/shebang, newline mix).

Performs fast, bytes-level checks and sets FsStatus. It does not load the full text image; ReaderStep remains authoritative for ContentStatus.

Axes written
  • fs
Sets
  • FsStatus: {PENDING, OK, EMPTY, NOT_FOUND, NO_READ_PERMISSION, UNREADABLE, NO_WRITE_PERMISSION, BINARY, BOM_BEFORE_SHEBANG, UNICODE_DECODE_ERROR, MIXED_LINE_ENDINGS}
Source code in src/topmark/pipeline/steps/sniffer.py
def __init__(self) -> None:
    super().__init__(
        name=self.__class__.__name__,
        primary_axis=Axis.FS,
        axes_written=(Axis.FS,),
    )

may_proceed

may_proceed(ctx)

Determine if processing can proceed to the read step.

Processing can proceed if: - The file was successfully resolved (ctx.status.resolve is RESOLVED) - A file type is present (ctx.file_type is not None) - A header processor is available (ctx.header_processor is not None)

Note

The file system status (ctx.status.fs) is not strictly required here, to allow tests to skip the sniffer and invoke the reader directly. In such cases, the reader is the definitive authority for content checks (existence, permissions, binary/text, etc).

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context for the current file.

required

Returns:

Type Description
bool

True if ctx.status.resolve == RESOLVED, ctx.file_type and ctx.header_processor

bool

are set.

Source code in src/topmark/pipeline/steps/sniffer.py
def may_proceed(self, ctx: ProcessingContext) -> bool:
    """Determine if processing can proceed to the read step.

    Processing can proceed if:
    - The file was successfully resolved (ctx.status.resolve is RESOLVED)
    - A file type is present (ctx.file_type is not None)
    - A header processor is available (ctx.header_processor is not None)

    Note:
        The file system status (`ctx.status.fs`) is not strictly required here,
        to allow tests to skip the sniffer and invoke the reader directly. In such
        cases, the reader is the definitive authority for content checks (existence,
        permissions, binary/text, etc).

    Args:
        ctx: The processing context for the current file.

    Returns:
        True if `ctx.status.resolve == RESOLVED`, `ctx.file_type` and `ctx.header_processor`
        are set.
    """
    if ctx.is_halted:
        return False

    return ctx.status.resolve == ResolveStatus.RESOLVED

run

run(ctx)

Lightweight I/O step between resolver and reader.

Responsibilities: - Confirm file exists and is readable. - Fast text-vs-binary sniff (NUL bytes, incremental UTF-8 decode of tiny chunks). - BOM + shebang ordering check for shebang-aware file types. - Quick newline histogram (bytes-level) and strict mixed-newlines skip. - Establish a tentative newline_style (dominant or default to LF) without loading full text.

Notes: - Does not populate ctx.file_lines; that is the reader's job. - If this step sets a non-RESOLVED file status, later steps will early-return.

Source code in src/topmark/pipeline/steps/sniffer.py
def run(self, ctx: ProcessingContext) -> None:
    """Lightweight I/O step between resolver and reader.

    Responsibilities:
    - Confirm file exists and is readable.
    - Fast text-vs-binary sniff (NUL bytes, incremental UTF-8 decode of tiny chunks).
    - BOM + shebang ordering check for shebang-aware file types.
    - Quick newline histogram (bytes-level) and strict mixed-newlines skip.
    - Establish a tentative newline_style (dominant or default to LF) without loading full text.

    Notes:
    - Does **not** populate `ctx.file_lines`; that is the reader's job.
    - If this step sets a non-RESOLVED file status, later steps will early-return.
    """
    apply: bool = (
        False if ctx.run_options.apply_changes is None else ctx.run_options.apply_changes
    )
    ctx.status.fs = FsStatus.PENDING

    # Existence / permission
    try:
        st: stat_result = ctx.path.stat()
    except FileNotFoundError:
        ctx.status.fs = FsStatus.NOT_FOUND
        reason: str = f"File not found: {ctx.path}"
        ctx.request_halt(reason=reason, at_step=self)
        return
    except PermissionError as e:
        ctx.status.fs = FsStatus.NO_READ_PERMISSION
        reason = f"Permission denied: {e}"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    # Get the path's modification timestamp
    ctx.timestamp = get_path_mtime_utc(path=ctx.path)

    # Apply mode: check write permission upfront
    if apply is True and not os.access(ctx.path, os.W_OK):
        ctx.status.fs = FsStatus.NO_WRITE_PERMISSION
        ctx.diagnostics.add_error("Permission denied: cannot write to file")
        return

    if st.st_size == 0:
        ctx.status.fs = FsStatus.EMPTY

        # If policy does NOT allow inserting headers into empty files for this type,
        # attach a non-terminal hint explaining how to enable it.
        if ctx.file_type is not None and not allow_insert_into_empty_like(ctx):
            file_type: FileType = ctx.file_type
            table_name: str = f"policy_by_type.{file_type.local_key}"
            ctx.hint(
                axis=Axis.FS,
                code=KnownCode.FS_EMPTY,
                cluster=Cluster.BLOCKED_POLICY,
                message="Empty file skipped by default.",
                detail=(
                    f"{file_type.description}:\n"
                    "To allow headers in empty "
                    f"{file_type.local_key} files, add the following "
                    "to your TopMark configuration:\n"
                    f"  [{table_name}]\n"
                    "  allow_header_in_empty_files = true\n"
                    f"(for pyproject.toml, use [tool.topmark.{table_name}])"
                ),
                terminal=False,
            )
        else:
            ctx.diagnostics.add_info("File is empty.")

        return

    # Read a small prefix to check BOM/shebang and begin newline counting
    try:
        fs_status: FsStatus | None = _sniff_stream(ctx)
        if fs_status is not None:
            # Final state
            ctx.status.fs = fs_status
            return
    except FileNotFoundError:
        ctx.status.fs = FsStatus.NOT_FOUND
        reason = f"File not found: {ctx.path}"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return
    except PermissionError as e:
        ctx.status.fs = FsStatus.NO_READ_PERMISSION
        reason = f"Permission denied: {e}"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return
    except (OSError, UnicodeError, ValueError) as e:
        ctx.status.fs = FsStatus.UNREADABLE
        reason = f"Error while sniffing: {e}"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    # Keep status RESOLVED so the reader proceeds.
    ctx.status.fs = FsStatus.OK
    return

hint

hint(ctx)

Attach sniff outcome hints (non-binding).

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context.

required
Source code in src/topmark/pipeline/steps/sniffer.py
def hint(self, ctx: ProcessingContext) -> None:
    """Attach sniff outcome hints (non-binding).

    Args:
        ctx: The processing context.
    """
    st: FsStatus = ctx.status.fs

    # May proceed to next step (always):
    if st == FsStatus.OK:
        # Implies ctx.status.resolve == ResolveStatus.RESOLVED
        pass  # healthy, no hint
    # May proceed to next step (policy):
    elif st == FsStatus.EMPTY:
        # Implies ctx.status.resolve == ResolveStatus.RESOLVED
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.CONTENT_EMPTY_FILE,
            cluster=Cluster.BLOCKED_POLICY,
            message="empty file",
        )
    elif st == FsStatus.BOM_BEFORE_SHEBANG:
        # Implies ctx.status.resolve == ResolveStatus.RESOLVED
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.FS_BOM_BEFORE_SHEBANG,
            cluster=Cluster.BLOCKED_POLICY,
            message="UTF-8 BOM before shebang",
        )
    elif st == FsStatus.MIXED_LINE_ENDINGS:
        # Implies ctx.status.resolve == ResolveStatus.RESOLVED
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.CONTENT_SKIPPED_MIXED,
            cluster=Cluster.BLOCKED_POLICY,
            message="mixed line endings",
        )
    # Stop processing:
    elif st == FsStatus.NOT_FOUND:
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.FS_NOT_FOUND,
            cluster=Cluster.SKIPPED,
            message="file not found",
            terminal=True,
        )
    elif st == FsStatus.NO_READ_PERMISSION:
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.FS_UNREADABLE,
            cluster=Cluster.SKIPPED,
            message="permission denied",
            terminal=True,
        )
    elif st == FsStatus.UNREADABLE:
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.FS_UNREADABLE,
            cluster=Cluster.SKIPPED,
            message="read error",
            terminal=True,
        )
    elif st == FsStatus.NO_WRITE_PERMISSION:
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.FS_UNWRITABLE,
            cluster=Cluster.SKIPPED,
            message="no write permission",
            terminal=True,
        )
    elif st == FsStatus.BINARY:
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.CONTENT_NOT_SUPPORTED,
            cluster=Cluster.SKIPPED,
            message="binary file",
            terminal=True,
        )
    elif st == FsStatus.UNICODE_DECODE_ERROR:
        ctx.hint(
            axis=Axis.FS,
            code=KnownCode.CONTENT_ENCODING_ERROR,
            cluster=Cluster.SKIPPED,
            message="Unicode decode error",
            terminal=True,
        )
    elif st == FsStatus.PENDING:
        # sniffer did not complete
        ctx.request_halt(reason=f"{self.__class__.__name__} did not set state.", at_step=self)

inspect_bom_shebang

inspect_bom_shebang(first_bytes)

Inspect the initial bytes for BOM and shebang ordering.

This helper performs a lightweight inspection of the first few bytes of a file to determine:

  • whether a UTF-8 BOM is present
  • whether a shebang (#!) is present
  • whether the shebang appears immediately after a BOM (i.e. at byte offset 3), which is the problematic ordering for POSIX shebang recognition.

It is intentionally pure and does not mutate the processing context or apply any policy decisions. Callers are responsible for updating ProcessingContext flags (leading_bom, has_shebang) and deciding whether a given BOM/shebang combination should be treated as a policy violation for the current file type.

Parameters:

Name Type Description Default
first_bytes bytes

The first bytes of the file being inspected.

required

Returns:

Type Description
bool

A tuple (has_bom, has_shebang, shebang_after_bom) where:

bool
  • has_bom is True when a UTF-8 BOM is present at the start.
bool
  • has_shebang is True when a shebang is present either at byte 0 or immediately after the BOM.
tuple[bool, bool, bool]
  • shebang_after_bom is True when the shebang starts at byte 3, directly following the BOM.
Source code in src/topmark/pipeline/steps/sniffer.py
def inspect_bom_shebang(first_bytes: bytes) -> tuple[bool, bool, bool]:
    """Inspect the initial bytes for BOM and shebang ordering.

    This helper performs a lightweight inspection of the first few bytes of
    a file to determine:

    - whether a UTF-8 BOM is present
    - whether a shebang (``#!``) is present
    - whether the shebang appears immediately after a BOM (i.e. at byte
      offset 3), which is the problematic ordering for POSIX shebang
      recognition.

    It is intentionally pure and does not mutate the processing context or
    apply any policy decisions. Callers are responsible for updating
    ``ProcessingContext`` flags (``leading_bom``, ``has_shebang``) and
    deciding whether a given BOM/shebang combination should be treated as a
    policy violation for the current file type.

    Args:
        first_bytes: The first bytes of the file being inspected.

    Returns:
        A tuple ``(has_bom, has_shebang, shebang_after_bom)`` where:

        - ``has_bom`` is True when a UTF-8 BOM is present at the start.
        - ``has_shebang`` is True when a shebang is present either at byte 0
            or immediately after the BOM.
        - ``shebang_after_bom`` is True when the shebang starts at byte 3,
            directly following the BOM.
    """
    has_bom: bool = first_bytes.startswith(b"\xef\xbb\xbf")
    starts_with_shebang: bool = first_bytes.startswith(b"#!")
    shebang_after_bom: bool = has_bom and first_bytes[3:5] == b"#!"
    has_shebang: bool = starts_with_shebang or shebang_after_bom
    return has_bom, has_shebang, shebang_after_bom