Skip to content

topmark.pipeline.protocols

topmark / pipeline / protocols

Type contracts for pipeline steps (engine-facing).

This module defines the minimal protocol that all pipeline steps must implement. Steps are instantiated objects that are callable; the engine invokes step(ctx) where ctx is the pipeline context (typically ProcessingContext).

Lifecycle

1) The runner calls step.may_proceed(ctx) to gate execution. 2) If allowed, it calls step.run(ctx) (which mutates ctx in place). 3) Regardless, it calls step.hint(ctx) so a step can attach non-binding reason/telemetry hints (diagnostics). Final outcome classification is handled centrally, not by steps.

Attributes:

name : str Fully qualified name used for tracing/telemetry. axes_written : tuple[Axis, ...] Declares which status axes this step is allowed to set (e.g. ("fs", "content")).

Steps also declare the view slots they consume. The runner uses this metadata
to release consumed view payloads after the last remaining consumer has run,
without keying pruning behavior to step-name strings.

Step

Bases: Protocol[Ctx]

Protocol for a single pipeline step.

A step is a callable object that mutates a ProcessingContext and declares which status axes it is responsible for. Implementations typically subclass topmark.pipeline.steps.base.BaseStep.

Notes

In this context we're not type-checking step execution; we're keeping a log of step instances. The concrete type we care about elsewhere is Step[ProcessingContext] (in ProcessingContext.steps and pipeline declarations). Here, it's fine to say "step of any context".

may_proceed

may_proceed(ctx)

Return whether the step should run given the current context.

Parameters:

Name Type Description Default
ctx Ctx

The mutable processing context.

required

Returns:

Type Description
bool

True if the step may run; False to skip this step.

Source code in src/topmark/pipeline/protocols.py
def may_proceed(self, ctx: Ctx) -> bool:
    """Return whether the step should run given the current context.

    Args:
        ctx: The mutable processing context.

    Returns:
        True if the step may run; False to skip this step.
    """
    ...

run

run(ctx)

Execute the step, mutating the context in place.

Implementations must only write to axes they own (as declared in axes_written) and must not raise for expected control flow. I/O or parser errors should update the appropriate status axis and diagnostics.

Parameters:

Name Type Description Default
ctx Ctx

The mutable processing context.

required
Source code in src/topmark/pipeline/protocols.py
def run(self, ctx: Ctx) -> None:
    """Execute the step, mutating the context in place.

    Implementations must **only** write to axes they own (as declared in
    ``axes_written``) and must not raise for expected control flow. I/O or
    parser errors should update the appropriate status axis and diagnostics.

    Args:
        ctx: The mutable processing context.
    """
    ...

hint

hint(ctx)

Attach non-binding hints/telemetry to the context.

A step can add structured reason hints or metrics to aid later human-readable summaries. This must not change final outcome classification.

Parameters:

Name Type Description Default
ctx Ctx

The mutable processing context.

required
Source code in src/topmark/pipeline/protocols.py
def hint(self, ctx: Ctx) -> None:
    """Attach non-binding hints/telemetry to the context.

    A step can add structured reason hints or metrics to aid later
    human-readable summaries. This must not change final outcome
    classification.

    Args:
        ctx: The mutable processing context.
    """
    ...

StepContext

Bases: Protocol

Minimum context surface required by the step lifecycle (BaseStep/runner).

steps property

steps

Executed steps (context type not relevant here).

is_halted property

is_halted

Return True if a step has requested an early halt for this file.

request_halt

request_halt(reason, at_step)

Record an early halt requested by a step.

Source code in src/topmark/pipeline/protocols.py
def request_halt(self, reason: str, at_step: AnyStep) -> None:
    """Record an early halt requested by a step."""
    ...