Skip to content

topmark.pipeline.steps.base

topmark / pipeline / steps / base

Base class for class-based pipeline steps.

The engine and CLI invoke steps as callables. BaseStep implements the common lifecycle:

ctx = step(ctx)  # internally: may_proceed → run? → hint

Design goals

  • Single place for per-step bookkeeping (invocation counts, tracing hooks).
  • Clear separation of concerns: step gating, mutation, advisory hints.
  • Zero impact on final classification: the outcome is derived elsewhere.

BaseStep dataclass

BaseStep(*, name, primary_axis, axes_written=())

Reusable foundation for pipeline steps.

Subclass this to implement a concrete step by overriding may_proceed(), run(), and optionally hint(). Do not override __call__ unless you need custom lifecycle behavior.

Attributes:

Name Type Description
name str

Fully qualified, stable step identifier for logs/tracing.

primary_axis Axis | None

The axis this step "represents" in summaries

axes_written tuple[Axis, ...]

Status axes this step is allowed to write (e.g. ("fs",)).

may_proceed

may_proceed(ctx)

Return whether the step should run given the current context.

Default: True (always run). Override in subclasses to respect pipeline gates.

Parameters:

Name Type Description Default
ctx ProcessingContext

The mutable processing context.

required

Returns:

Type Description
bool

True to run run(), False to skip.

Source code in src/topmark/pipeline/steps/base.py
def may_proceed(self, ctx: ProcessingContext) -> bool:
    """Return whether the step should run given the current context.

    Default: ``True`` (always run). Override in subclasses to respect
    pipeline gates.

    Args:
        ctx: The mutable processing context.

    Returns:
        True to run ``run()``, False to skip.
    """
    return True

run

run(ctx)

Perform the step's primary work, mutating ctx in place.

Subclasses must implement this method and only write to declared axes.

Parameters:

Name Type Description Default
ctx ProcessingContext

The mutable processing context.

required
Source code in src/topmark/pipeline/steps/base.py
def run(self, ctx: ProcessingContext) -> None:
    """Perform the step's primary work, mutating ``ctx`` in place.

    Subclasses must implement this method and only write to declared axes.

    Args:
        ctx: The mutable processing context.
    """
    pass

hint

hint(ctx)

Attach non-binding hints/telemetry to ctx (optional).

This method should never influence the final outcome directly.

Parameters:

Name Type Description Default
ctx ProcessingContext

The mutable processing context.

required
Source code in src/topmark/pipeline/steps/base.py
def hint(self, ctx: ProcessingContext) -> None:
    """Attach non-binding hints/telemetry to ``ctx`` (optional).

    This method should never influence the final outcome directly.

    Args:
        ctx: The mutable processing context.
    """
    pass