Skip to content

topmark.pipeline.protocols

topmark / pipeline / protocols

Type contracts for pipeline steps (engine-facing).

This module defines the minimal protocol that all pipeline steps must implement. Steps are instantiated objects that are callable; the engine invokes step(ctx) where ctx is the pipeline context (typically ProcessingContext).

Lifecycle

1) The runner calls step.may_proceed(ctx) to gate execution. 2) If allowed, it calls step.run(ctx) (which mutates ctx in place). 3) Regardless, it calls step.hint(ctx) so a step can attach non-binding reason/telemetry hints (diagnostics). Final outcome classification is handled centrally, not by steps.

Attributes:

name : str Fully qualified name used for tracing/telemetry. axes_written : tuple[Axis, ...] Declares which status axes this step is allowed to set (e.g. ("fs", "content")).

Step

Bases: Protocol[Ctx]

Protocol for a single pipeline step.

A step is a callable object that mutates a ProcessingContext and declares which status axes it is responsible for. Implementations typically subclass topmark.pipeline.steps.base.BaseStep.

Notes

In this context we're not type-checking step execution; we're keeping a log of step instances. The concrete type we care about elsewhere is Step[ProcessingContext] (in ProcessingContext.steps and pipeline declarations). Here, it's fine to say "step of any context".

may_proceed

may_proceed(ctx)

Return whether the step should run given the current context.

Parameters:

Name Type Description Default
ctx Ctx

The mutable processing context.

required

Returns:

Type Description
bool

True if the step may run; False to skip this step.

Source code in src/topmark/pipeline/protocols.py
def may_proceed(self, ctx: Ctx) -> bool:
    """Return whether the step should run given the current context.

    Args:
        ctx: The mutable processing context.

    Returns:
        True if the step may run; False to skip this step.
    """
    ...

run

run(ctx)

Execute the step, mutating the context in place.

Implementations must only write to axes they own (as declared in axes_written) and must not raise for expected control flow. I/O or parser errors should update the appropriate status axis and diagnostics.

Parameters:

Name Type Description Default
ctx Ctx

The mutable processing context.

required
Source code in src/topmark/pipeline/protocols.py
def run(self, ctx: Ctx) -> None:
    """Execute the step, mutating the context in place.

    Implementations must **only** write to axes they own (as declared in
    ``axes_written``) and must not raise for expected control flow. I/O or
    parser errors should update the appropriate status axis and diagnostics.

    Args:
        ctx: The mutable processing context.
    """
    ...

hint

hint(ctx)

Attach non-binding hints/telemetry to the context.

A step can add structured reason hints or metrics to aid later human-readable summaries. This must not change final outcome classification.

Parameters:

Name Type Description Default
ctx Ctx

The mutable processing context.

required
Source code in src/topmark/pipeline/protocols.py
def hint(self, ctx: Ctx) -> None:
    """Attach non-binding hints/telemetry to the context.

    A step can add structured reason hints or metrics to aid later
    human-readable summaries. This must not change final outcome
    classification.

    Args:
        ctx: The mutable processing context.
    """
    ...

StepContext

Bases: Protocol

Minimum context surface required by the step lifecycle (BaseStep/runner).

steps property

steps

Executed steps (context type not relevant here).

is_halted property

is_halted

Return True if a step has requested an early halt for this file.

request_halt

request_halt(reason, at_step)

Record an early halt requested by a step.

Source code in src/topmark/pipeline/protocols.py
def request_halt(self, reason: str, at_step: AnyStep) -> None:
    """Record an early halt requested by a step."""
    ...