Skip to content

topmark.pipeline.machine.envelopes

topmark / pipeline / machine / envelopes

Machine-output envelope builders for processing and probe commands.

This module composes machine-readable output shapes for pipeline runs (topmark check, topmark strip, and topmark probe).

It is intentionally: - console-/Click-free (no printing, no CLI concerns) - serialization-free (no json.dumps)

Responsibilities: - JSON: build a single top-level envelope containing meta, config, config_diagnostics, and either processing data (results / summary) or probe data (probes). - NDJSON: yield a stream of per-record mappings following the project's NDJSON contract (Pattern A: every record includes kind and meta), starting with config prefix records and followed by processing records (result / summary) or probe records (probe).

Where config diagnostics are included, this module exposes the flattened compatibility view derived from staged config-validation logs.

See Also: - topmark.pipeline.machine.payloads: domain payload fragments for processing and probe runs. - topmark.core.machine.envelopes: shared envelope/record helpers (build_json_envelope, build_ndjson_record, config prefix and diagnostic record generators).

build_probe_results_json_envelope

build_probe_results_json_envelope(
    *, meta, config, resolved_toml, results
)

Build the JSON envelope for resolution probe results.

Parameters:

Name Type Description Default
meta MetaPayload

Shared metadata payload (tool/version).

required
config FrozenConfig

The effective FrozenConfig instance used for the run.

required
resolved_toml ResolvedTopmarkTomlSources

Resolved TOML sources used to build the optional layered provenance export.

required
results list[ProcessingContext]

Ordered list of probe contexts. The list may include normal file-backed probe contexts and synthetic contexts for explicit inputs filtered before file-type probing.

required

Returns:

Type Description
dict[str, object]

JSON-serializable envelope mapping (not yet serialized to JSON).

Source code in src/topmark/pipeline/machine/envelopes.py
def build_probe_results_json_envelope(
    *,
    meta: MetaPayload,
    config: FrozenConfig,
    resolved_toml: ResolvedTopmarkTomlSources,
    results: list[ProcessingContext],
) -> dict[str, object]:
    """Build the JSON envelope for resolution probe results.

    Args:
        meta: Shared metadata payload (`tool`/`version`).
        config: The effective [`FrozenConfig`][topmark.config.model.FrozenConfig]
            instance used for the run.
        resolved_toml: Resolved TOML sources used to build the optional layered
            provenance export.
        results: Ordered list of probe contexts. The list may include normal
            file-backed probe contexts and synthetic contexts for explicit inputs
            filtered before file-type probing.

    Returns:
        JSON-serializable envelope mapping (not yet serialized to JSON).
    """
    # Prepare config payloads once, including flattened compatibility diagnostics.
    cfg_payload: ConfigPayload = build_config_payload(
        config,
        resolved_toml=resolved_toml,
    )
    cfg_diag_payload: ConfigDiagnosticsPayload = build_config_diagnostics_payload(config)
    # Probe payloads include both normal file-type probe results and synthetic
    # filtered results for explicit inputs removed during discovery.
    probe_payloads: list[dict[str, object]] = list(iter_probe_results_payload_items(results))

    return build_json_envelope(
        meta=meta,
        **{
            ConfigKey.CONFIG.value: cfg_payload,
            ConfigKey.CONFIG_DIAGNOSTICS.value: cfg_diag_payload,
            PipelineKey.PROBES.value: probe_payloads,
        },
    )

iter_probe_results_ndjson_records

iter_probe_results_ndjson_records(
    *, meta, config, resolved_toml, results
)

Yield NDJSON records for resolution probe results.

Parameters:

Name Type Description Default
meta MetaPayload

Shared metadata payload (tool/version).

required
config FrozenConfig

Effective configuration instance.

required
resolved_toml ResolvedTopmarkTomlSources

ResolvedTopmarkTomlSources,

required
results list[ProcessingContext]

Ordered list of probe contexts. The list may include normal file-backed probe contexts and synthetic contexts for explicit inputs filtered before file-type probing.

required

Yields:

Type Description
dict[str, object]

Shaped NDJSON record mappings for config prefix records, config diagnostics,

dict[str, object]

and one kind="probe" record per probe context.

Source code in src/topmark/pipeline/machine/envelopes.py
def iter_probe_results_ndjson_records(
    *,
    meta: MetaPayload,
    config: FrozenConfig,
    resolved_toml: ResolvedTopmarkTomlSources,
    results: list[ProcessingContext],
) -> Iterator[dict[str, object]]:
    """Yield NDJSON records for resolution probe results.

    Args:
        meta: Shared metadata payload (`tool`/`version`).
        config: Effective configuration instance.
        resolved_toml: ResolvedTopmarkTomlSources,
        results: Ordered list of probe contexts. The list may include normal
            file-backed probe contexts and synthetic contexts for explicit inputs
            filtered before file-type probing.

    Yields:
        Shaped NDJSON record mappings for config prefix records, config diagnostics,
        and one `kind="probe"` record per probe context.
    """
    # Prepare config payloads once, including flattened compatibility diagnostics.
    cfg_payload: ConfigPayload = build_config_payload(
        config,
        resolved_toml=resolved_toml,
    )
    cfg_diag_payload: ConfigDiagnosticsPayload = build_config_diagnostics_payload(config)

    yield from iter_config_prefix_ndjson_records(
        meta=meta,
        config=config,
        resolved_toml=resolved_toml,
        cfg_payload=cfg_payload,
        cfg_diag_payload=cfg_diag_payload,
    )

    flattened_diagnostics: FrozenDiagnosticLog = config.validation_logs.flattened()
    yield from iter_diagnostic_ndjson_records(
        meta=meta,
        domain=MachineDomain.CONFIG,
        diagnostics=flattened_diagnostics,
    )

    # One probe record per context, including synthetic filtered contexts.
    for payload in iter_probe_results_payload_items(results):
        yield build_ndjson_record(
            kind=PipelineKind.PROBE,
            meta=meta,
            payload=payload,
        )

build_processing_results_json_envelope

build_processing_results_json_envelope(
    *, meta, config, resolved_toml, results, summary_mode
)

Build the JSON envelope for processing results (check/strip).

Detail mode (summary_mode=False) produces a per-file results list:

    {
      "meta": ...,
      "config": <ConfigPayload>,
      "config_diagnostics": <ConfigDiagnosticsPayload>,
      "results": [ <per-file result dict> ... ]
    }

Summary mode (summary_mode=True) produces a flat summary-row list:

    {
      "meta": ...,
      "config": <ConfigPayload>,
      "config_diagnostics": <ConfigDiagnosticsPayload>,
      "summary": [
        {"outcome": "unchanged", "reason": "no changes needed", "count": 3},
        {"outcome": "would insert", "reason": "header missing, changes found", "count": 1}
      ]
    }

Parameters:

Name Type Description Default
meta MetaPayload

Shared metadata payload (tool/version).

required
config FrozenConfig

The effective FrozenConfig instance used for the run.

required
resolved_toml ResolvedTopmarkTomlSources

ResolvedTopmarkTomlSources,

required
results list[ProcessingContext]

Ordered list of per-file processing contexts.

required
summary_mode bool

If True, emit flat summary rows instead of per-file results.

required

Returns:

Type Description
dict[str, object]

A JSON-serializable envelope mapping (not yet serialized to a JSON string).

Source code in src/topmark/pipeline/machine/envelopes.py
def build_processing_results_json_envelope(
    *,
    meta: MetaPayload,
    config: FrozenConfig,
    resolved_toml: ResolvedTopmarkTomlSources,
    results: list[ProcessingContext],
    summary_mode: bool,
) -> dict[str, object]:
    """Build the JSON envelope for processing results (`check`/`strip`).

    Detail mode (`summary_mode=False`) produces a per-file results list:

    ```json
        {
          "meta": ...,
          "config": <ConfigPayload>,
          "config_diagnostics": <ConfigDiagnosticsPayload>,
          "results": [ <per-file result dict> ... ]
        }
    ```

    Summary mode (`summary_mode=True`) produces a flat summary-row list:

    ```json
        {
          "meta": ...,
          "config": <ConfigPayload>,
          "config_diagnostics": <ConfigDiagnosticsPayload>,
          "summary": [
            {"outcome": "unchanged", "reason": "no changes needed", "count": 3},
            {"outcome": "would insert", "reason": "header missing, changes found", "count": 1}
          ]
        }
    ```

    Args:
        meta: Shared metadata payload (`tool`/`version`).
        config: The effective [`FrozenConfig`][topmark.config.model.FrozenConfig]
            instance used for the run.
        resolved_toml: ResolvedTopmarkTomlSources,
        results: Ordered list of per-file processing contexts.
        summary_mode: If True, emit flat summary rows instead of per-file results.

    Returns:
        A JSON-serializable envelope mapping (not yet serialized to a JSON string).
    """
    # Prepare config payloads once, including flattened compatibility diagnostics.
    cfg_payload: ConfigPayload = build_config_payload(
        config,
        resolved_toml=resolved_toml,
    )
    cfg_diag_payload: ConfigDiagnosticsPayload = build_config_diagnostics_payload(config)

    # Envelope: meta + config + config diagnostics + results
    # OR meta + config + config diagnostics + summary
    # results_payload is {"results": [...]} or {"summary": {...}}
    if summary_mode:
        summary_list: list[OutcomeSummaryRow] = build_processing_results_summary_rows_payload(
            results,
        )
        payload: dict[str, object] = {
            ConfigKey.CONFIG.value: cfg_payload,
            ConfigKey.CONFIG_DIAGNOSTICS.value: cfg_diag_payload,
            PipelineKey.SUMMARY.value: summary_list,
        }
    else:
        results_iter: Iterator[dict[str, object]] = iter_processing_results_payload_items(
            results,
        )
        payload = {
            ConfigKey.CONFIG.value: cfg_payload,
            ConfigKey.CONFIG_DIAGNOSTICS.value: cfg_diag_payload,
            PipelineKey.RESULTS.value: list(results_iter),
        }

    envelope: dict[str, object] = build_json_envelope(
        meta=meta,
        **payload,
    )

    return envelope

iter_processing_results_ndjson_records

iter_processing_results_ndjson_records(
    *, meta, config, resolved_toml, results, summary_mode
)

Yield NDJSON records for processing results (check/strip).

The NDJSON stream is emitted in a stable order:

1) kind="config" record 2) kind="config_diagnostics" record (counts-only) 3) zero or more kind="diagnostic" records for flattened compatibility config diagnostics 4) then either: - summary mode: one kind="summary" record per (outcome, reason) bucket - detail mode: one kind="result" record per processed file

Parameters:

Name Type Description Default
meta MetaPayload

Shared metadata payload (tool/version).

required
config FrozenConfig

Effective configuration instance.

required
resolved_toml ResolvedTopmarkTomlSources

ResolvedTopmarkTomlSources,

required
results list[ProcessingContext]

Ordered list of per-file processing contexts.

required
summary_mode bool

Whether to emit summary records instead of per-file result records.

required

Yields:

Type Description
dict[str, object]

Shaped NDJSON record mappings (not yet serialized). Each yielded record

dict[str, object]

includes kind and meta and follows the project's NDJSON envelope contract.

Source code in src/topmark/pipeline/machine/envelopes.py
def iter_processing_results_ndjson_records(
    *,
    meta: MetaPayload,
    config: FrozenConfig,
    resolved_toml: ResolvedTopmarkTomlSources,
    results: list[ProcessingContext],
    summary_mode: bool,
) -> Iterator[dict[str, object]]:
    """Yield NDJSON records for processing results (`check`/`strip`).

    The NDJSON stream is emitted in a stable order:

    1) `kind="config"` record
    2) `kind="config_diagnostics"` record (counts-only)
    3) zero or more `kind="diagnostic"` records for flattened compatibility config diagnostics
    4) then either:
    - summary mode: one `kind="summary"` record per `(outcome, reason)` bucket
    - detail mode: one `kind="result"` record per processed file

    Args:
        meta: Shared metadata payload (`tool`/`version`).
        config: Effective configuration instance.
        resolved_toml: ResolvedTopmarkTomlSources,
        results: Ordered list of per-file processing contexts.
        summary_mode: Whether to emit summary records instead of per-file result records.

    Yields:
        Shaped NDJSON record mappings (not yet serialized). Each yielded record
        includes `kind` and `meta` and follows the project's NDJSON envelope contract.
    """
    # Prepare config payloads once, including flattened compatibility diagnostics.
    cfg_payload: ConfigPayload = build_config_payload(
        config,
        resolved_toml=resolved_toml,
    )
    cfg_diag_payload: ConfigDiagnosticsPayload = build_config_diagnostics_payload(config)

    yield from iter_config_prefix_ndjson_records(
        meta=meta,
        config=config,
        resolved_toml=resolved_toml,
        cfg_payload=cfg_payload,
        cfg_diag_payload=cfg_diag_payload,
    )

    # One diagnostic per line
    flattened_diagnostics: FrozenDiagnosticLog = config.validation_logs.flattened()
    yield from iter_diagnostic_ndjson_records(
        meta=meta,
        domain=MachineDomain.CONFIG,
        diagnostics=flattened_diagnostics,
    )

    if summary_mode:
        for record in iter_processing_results_summary_entries(results):
            yield build_ndjson_record(
                kind=PipelineKind.SUMMARY,
                meta=meta,
                payload=record,
            )
    else:
        for r in results:
            yield build_ndjson_record(
                kind=PipelineKind.RESULT,
                meta=meta,
                payload=r.to_dict(),
            )