Skip to content

topmark.pipeline.steps.planner

topmark / pipeline / steps / planner

Plan step for inserting, replacing, or removing the TopMark header (view-based).

This step consumes views produced by earlier phases and emits an updated file image via ctx.views.updated. It respects comparer outcomes and strip fast-paths and never performs I/O.

Inputs
  • ctx.views.header (HeaderView) - existing header (range/lines/block).
  • ctx.views.build (BuilderView) - field dictionaries (not used directly).
  • ctx.views.render (RenderView) - expected header text to write.
  • ctx.views.image (FileImageView) - original file image.
Outputs
  • ctx.views.updated (UpdatedView) - updated file image (sequence/iterable).
  • ctx.status.write - write outcome (SKIPPED/REPLACED/INSERTED/REMOVED/PREVIEWED/FAILED).
Behavior
  • Strip fast-path: if status.strip == READY, keep the precomputed image in ctx.views.updated, reattach BOM if needed, mark write=REMOVED (or PREVIEWED).
  • Already up-to-date: if status.comparison == UNCHANGED, set write=SKIPPED and mirror the original image into ctx.views.updated.
  • Replace: if a header range is known, splice rendered header lines over that range.
  • Insert (text-based): prefer character-offset insertion when supported.
  • Insert (line-based): fallback using compute_insertion_anchor plus optional whitespace fixes.

PlannerStep

PlannerStep()

Bases: BaseStep

Synthesize an updated image from compare/strip results (no I/O).

Consumes either a prepared strip image (fast path) or the rendered header text to construct a new file image in memory. This step does not perform any writes; it only sets the intended update/write statuses and populates ctx.views.updated for downstream patch/apply steps.

Sets

PlanStatus: {PENDING, PREVIEWED, INSERTED, REPLACED, REMOVED, SKIPPED, FAILED}

Notes

The writer performs the actual I/O. Policies (e.g., add-only/update-only) are respected here when deciding the intended action.

Source code in src/topmark/pipeline/steps/planner.py
def __init__(self) -> None:
    super().__init__(
        name=self.__class__.__name__,
        primary_axis=Axis.PLAN,
        axes_written=(Axis.PLAN,),
    )

may_proceed

may_proceed(ctx)

Return True if the planner can compute an updated image.

This gate only checks whether the planner has enough upstream information to synthesize an updated image. It deliberately avoids enforcing mutation policy here; policy checks (e.g. allow_insert_into_empty_like) are handled inside run().

Planner may proceed when
  • The pipeline has not been halted; and
  • Either a strip operation is ready, a rendered header exists, or the comparer detected a change.

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context.

required

Returns:

Type Description
bool

True if the planner can attempt to compute an updated image.

Source code in src/topmark/pipeline/steps/planner.py
def may_proceed(self, ctx: ProcessingContext) -> bool:
    """Return True if the planner can compute an updated image.

    This gate only checks whether the planner has enough upstream
    information to synthesize an updated image. It deliberately avoids
    enforcing mutation policy here; policy checks (e.g.
    `allow_insert_into_empty_like`) are handled inside `run()`.

    Planner may proceed when:
        * The pipeline has not been halted; and
        * Either a strip operation is ready, a rendered header exists,
          or the comparer detected a change.

    Args:
        ctx: The processing context.

    Returns:
        True if the planner can attempt to compute an updated image.
    """
    if ctx.is_halted:
        return False

    outcome: bool = (
        # Strip fast-path
        ctx.status.strip == StripStatus.READY
        # Normal update: content OK or empty+policy allowed
        or ctx.status.render == RenderStatus.RENDERED
        or ctx.status.comparison == ComparisonStatus.CHANGED
    )
    return outcome

run

run(ctx)

Plan insert/replace/remove of the TopMark header for the current file (view-based).

Behavior by case: - Strip fast-path: When status.strip == READY, keep the precomputed ctx.views.updated.lines, reattach a BOM if the reader saw one, and mark write=REMOVED (or PREVIEWED when not applying). - Already up-to-date: When status.comparison == UNCHANGED, do nothing, set write=SKIPPED, and mirror the original image into ctx.views.updated. - Replace: If a header range is known (ctx.views.header.range), splice the rendered header lines (ctx.views.render.lines) over that range; reattach BOM if needed; if the result equals the original, set write=SKIPPED; otherwise write=REPLACED (or PREVIEWED). - Insert (text-based): If the processor provides a character offset, insert the rendered header text there (after optional prepare_header_for_insertion_text), reattach BOM if needed; if identical to original, SKIPPED; else INSERTED. - Insert (line-based fallback): Use compute_insertion_anchor (line index), optionally adjust whitespace via prepare_header_for_insertion, reattach BOM if needed; set INSERTED unless the result is identical, in which case SKIPPED.

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context.

required
Notes

This function performs no I/O; it only populates ctx.views.updated and ctx.status.write for downstream patch/apply steps.

Source code in src/topmark/pipeline/steps/planner.py
def run(self, ctx: ProcessingContext) -> None:
    """Plan insert/replace/remove of the TopMark header for the current file (view-based).

    Behavior by case:
    - **Strip fast-path**: When ``status.strip == READY``, keep the precomputed
        ``ctx.views.updated.lines``, reattach a BOM if the reader saw one, and mark
        ``write=REMOVED`` (or ``PREVIEWED`` when not applying).
    - **Already up-to-date**: When ``status.comparison == UNCHANGED``, do nothing,
        set ``write=SKIPPED``, and mirror the original image into ``ctx.views.updated``.
    - **Replace**: If a header range is known (``ctx.views.header.range``), splice
        the rendered header lines (``ctx.views.render.lines``) over that range; reattach BOM
        if needed; if the result equals the original, set ``write=SKIPPED``; otherwise
        ``write=REPLACED`` (or ``PREVIEWED``).
    - **Insert (text-based)**: If the processor provides a character offset, insert
        the rendered header text there (after optional ``prepare_header_for_insertion_text``),
        reattach BOM if needed; if identical to original, ``SKIPPED``; else ``INSERTED``.
    - **Insert (line-based fallback)**: Use ``compute_insertion_anchor`` (line index),
        optionally adjust whitespace via ``prepare_header_for_insertion``, reattach BOM
        if needed; set ``INSERTED`` unless the result is identical, in which case ``SKIPPED``.

    Args:
        ctx: The processing context.

    Notes:
        This function performs no I/O; it only populates ``ctx.views.updated`` and
        ``ctx.status.write`` for downstream patch/apply steps.
    """
    logger.debug("ctx: %s", ctx)
    logger.debug("ctx.run_options.apply_changes = %s", ctx.run_options.apply_changes)

    apply: bool = (
        False if ctx.run_options.apply_changes is None else ctx.run_options.apply_changes
    )

    # Materialize original image once (list[str]) for splice operations.
    original_lines: list[str] = list(ctx.iter_image_lines())

    if ctx.status.content != ContentStatus.OK and not allow_insert_into_empty_like(ctx):
        ctx.status.plan = PlanStatus.SKIPPED
        reason: str = f"Could not update file (status: {ctx.status.content.value})."
        ctx.diagnostics.add_info(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    # TODO: enable updating based on future allow_XXX_by_policy() policy:
    if ctx.status.header in {
        HeaderStatus.MALFORMED_ALL_FIELDS,
        HeaderStatus.MALFORMED_SOME_FIELDS,
    }:
        ctx.status.plan = PlanStatus.SKIPPED
        ctx.views.updated = UpdatedView(lines=original_lines)
        reason = "Existing header has malformed fields; TopMark will not update it."
        ctx.diagnostics.add_warning(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    logger.debug("ctx: %s", ctx.to_dict())

    # --- strip fast-path (must run before any add/replace logic) ---
    if ctx.status.strip == StripStatus.READY:
        # Previous step computed updated_file_lines for a removal.
        updated_view: UpdatedView | None = ctx.views.updated
        if not updated_view or updated_view.lines is None:
            # TODO: check whether this can be the case when the file to be stripped
            # would become empty after stripping!
            logger.error(
                "Stripper might have omitted to set ctx.views.updated, "
                "or the resulting updated file might become empty after stripping. "
                "ctx.views.updated is %s, ctx.views.updated.file_lines is %s",
                "defined" if updated_view else "not set",
                "defined" if updated_view and updated_view.lines else "not set",
            )
            ctx.status.plan = PlanStatus.FAILED  # TODO FIXME
            reason = "No updated file lines available for stripping."
            ctx.request_halt(reason=reason, at_step=self)
            return

        # âś… Preserve empty list as a valid updated image
        seq: Sequence[str] | Iterable[str] = updated_view.lines
        stripped_lines: list[str] = seq if isinstance(seq, list) else list(seq)

        # Re-attach BOM only if needed (no-op for empty)
        stripped_lines = _prepend_bom_to_lines_if_needed(stripped_lines, ctx)

        ctx.views.updated = UpdatedView(lines=stripped_lines)
        ctx.status.plan = PlanStatus.REMOVED if apply else PlanStatus.PREVIEWED
        return

    # If the comparer determined the file is already compliant, do nothing.
    if ctx.status.comparison == ComparisonStatus.UNCHANGED:
        ctx.status.plan = PlanStatus.SKIPPED
        # Preserve the original image as the "updated" content for downstream steps.
        ctx.views.updated = UpdatedView(lines=original_lines)
        logger.trace("Updater: no-op (comparison=UNCHANGED) for %s", ctx.path)
        return

    # Non-strip processing
    render_view: RenderView | None = ctx.views.render
    if render_view is None or render_view.lines is None:
        ctx.status.plan = PlanStatus.FAILED
        reason = "Cannot update header: no rendered header available"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    rendered_expected_header_lines: list[str] = list(render_view.lines)

    if ctx.header_processor is None:
        ctx.status.plan = PlanStatus.FAILED
        reason = "Cannot update header: no header processor assigned"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    ft: FileType | None = ctx.file_type
    checker: InsertChecker | None = ft.pre_insert_checker if ft else None

    # --- Pre-insert capability check (authoritative) --------------------------
    # Only evaluate here if the advisory checker did not run (UNEVALUATED).
    if checker is not None:
        if ctx.pre_insert_capability == InsertCapability.UNEVALUATED:
            try:
                view = PreInsertViewAdapter(ctx)
                result: InsertCheckResult = checker(view) or {}
            except Exception as exc:
                logger.exception(
                    "pre-insert checker failed for %s: %s", getattr(ft, "name", ft), exc
                )

                # Obtain the name of the checker
                checker_name = getattr(checker, "__qualname__", None) or getattr(
                    checker, "__name__", None
                )
                if checker_name is None:
                    checker_name = type(checker).__name__

                result = InsertCheckResult(
                    capability=InsertCapability.SKIP_OTHER,
                    reason=f"checker error: {checker_name}, {exc}",
                    origin=__name__,
                )

            # Persist the authoritative view for downstream bucketing/rendering
            cap: InsertCapability = result.get("capability", InsertCapability.OK)
            ctx.pre_insert_capability = cap
            ctx.pre_insert_reason = result.get("reason", ctx.pre_insert_reason)
            ctx.pre_insert_origin = result.get("origin", __name__)

            if cap != InsertCapability.OK:
                pre_insert_reason = ctx.pre_insert_reason or "pre-insert checker skipped update"
                origin = ctx.pre_insert_origin or __name__
                logger.debug(
                    "pre-insert: %s - %s", getattr(cap, "value", cap), pre_insert_reason
                )
                # Preserve original image; mark as skipped
                ctx.views.updated = UpdatedView(lines=original_lines)
                ctx.status.plan = PlanStatus.SKIPPED
                reason = f"{pre_insert_reason} (origin: {origin})"
                ctx.diagnostics.add_warning(reason)
                ctx.request_halt(reason=reason, at_step=self)
                return

        if ctx.pre_insert_capability == InsertCapability.SKIP_IDEMPOTENCE_RISK:
            # TODO - align with reader.ReaderStep.run()
            if ctx.status.content == ContentStatus.OK and allow_content_reflow(ctx):
                pass
            else:
                # Advisory-only pre-insert probe already ran in reader step.
                # Here we enforce the gate: if the advisory is not OK, skip updating.
                pre_insert_reason: str = (
                    ctx.pre_insert_reason or "pre-insert check refused insertion"
                )
                origin: str = ctx.pre_insert_origin or __name__
                logger.debug(
                    "pre-insert (advisory): %s - %s",
                    getattr(ctx.pre_insert_capability, "value", ctx.pre_insert_capability),
                    pre_insert_reason,
                )
                ctx.views.updated = UpdatedView(lines=original_lines)
                ctx.status.plan = PlanStatus.SKIPPED
                reason = f"{pre_insert_reason} (origin: {origin})"
                ctx.diagnostics.add_warning(reason)
                ctx.request_halt(reason=reason, at_step=self)
                return

        # --- Pre-insert capability gate (authoritative) ---------------------------
        elif ctx.pre_insert_capability != InsertCapability.OK:
            # Advisory-only pre-insert probe already ran in reader step.
            # Here we enforce the gate: if the advisory is not OK, skip updating.
            pre_insert_reason: str = (
                ctx.pre_insert_reason or "pre-insert check refused insertion"
            )
            origin: str = ctx.pre_insert_origin or __name__
            logger.debug(
                "pre-insert (advisory): %s - %s",
                getattr(ctx.pre_insert_capability, "value", ctx.pre_insert_capability),
                pre_insert_reason,
            )
            ctx.views.updated = UpdatedView(lines=original_lines)
            ctx.status.plan = PlanStatus.SKIPPED
            reason = f"{pre_insert_reason} (origin: {origin})"
            ctx.diagnostics.add_warning(reason)
            ctx.request_halt(reason=reason, at_step=self)
            return

    # --- Replace path (view-based) ---
    header_view: HeaderView | None = ctx.views.header
    existing_range: tuple[int, int] | None = header_view.range if header_view else None
    if existing_range is not None:
        # Replace existing header: remove old header lines and insert new header in place
        start: int
        end: int
        start, end = existing_range
        new_lines: list[str] = (
            original_lines[:start] + rendered_expected_header_lines + original_lines[end + 1 :]
        )
        # Prepend BOM if needed
        new_lines = _prepend_bom_to_lines_if_needed(new_lines, ctx)
        # If replacement is identical to the original, treat as a no-op.
        if new_lines == original_lines:
            ctx.status.plan = PlanStatus.SKIPPED
            ctx.views.updated = UpdatedView(lines=original_lines)
            logger.trace("Updater: replacement yields no changes for %s", ctx.path)
            return
        ctx.status.plan = PlanStatus.REPLACED if apply else PlanStatus.PREVIEWED
        ctx.views.updated = UpdatedView(lines=new_lines)
        logger.trace("Updated file (replace):\n%s", "".join(new_lines))
        return

    # --- Insert: text-based first ---
    try:
        logger.debug("upd.path: try=text; file=%s", ctx.path)
        original_text: str = "".join(original_lines)
        char_offset: int | None = None
        if hasattr(ctx.header_processor, "get_header_insertion_char_offset"):
            char_offset = ctx.header_processor.get_header_insertion_char_offset(original_text)
        logger.debug("upd.text: offset=%s; head[:40]=%r", char_offset, original_text[:40])

        if char_offset is not None:
            header_text: str = "".join(rendered_expected_header_lines)
            if hasattr(ctx.header_processor, "prepare_header_for_insertion_text"):
                try:
                    header_text = ctx.header_processor.prepare_header_for_insertion_text(
                        original_text=original_text,
                        insert_offset=char_offset,
                        rendered_header_text=header_text,
                        newline_style=ctx.newline_style or "\n",
                    )
                except (ValueError, TypeError, AttributeError) as e:
                    logger.warning(
                        "prepare_header_for_insertion_text failed for %s: %s", ctx.path, e
                    )
            logger.debug(
                "upd.text.pad: insert_offset=%d; header_head[:40]=%r",
                char_offset,
                header_text[:40],
            )
            logger.debug(
                "upd.text.splice: pre_tail[:10]=%r post_head[:10]=%r",
                original_text[max(0, char_offset - 10) : char_offset],
                original_text[char_offset : char_offset + 10],
            )
            new_text: str = (
                original_text[:char_offset] + header_text + original_text[char_offset:]
            )
            # If header text ends at EOF with trailing blank lines, drop them all.
            # The blank-after-header policy should only apply when body text follows.
            while new_text.endswith("\r\n\r\n"):
                new_text = new_text[:-2]  # drop one CRLF pair
            while new_text.endswith("\n\n") or new_text.endswith("\r\r"):
                new_text = new_text[:-1]  # drop one LF or CR
            logger.trace("Updater (text): trimmed EOF blanks; len=%d", len(new_text))

            # Prepend BOM if needed
            if getattr(ctx, "leading_bom", False) and not new_text.startswith("\ufeff"):
                new_text = "\ufeff" + new_text

            # Canonicalize logically-empty placeholder bodies (e.g. a file that was just "\n")
            # so insert→strip→insert round-trips are stable (avoid a double trailing newline).
            if getattr(ctx, "is_logically_empty", False):
                insert_index_lines: int = (
                    len(original_text[:char_offset].splitlines(keepends=True))
                    if char_offset
                    else 0
                )
                header_len_lines: int = len(header_text.splitlines(keepends=True))
                new_lines_tmp: list[str] = new_text.splitlines(keepends=True)
                new_lines_tmp = _canonicalize_logically_empty_body_after_insert(
                    lines=new_lines_tmp,
                    insert_index=insert_index_lines,
                    header_len=header_len_lines,
                    ctx=ctx,
                )
                new_text = "".join(new_lines_tmp)

            if new_text == original_text:
                ctx.views.updated = UpdatedView(lines=original_lines)
                # ctx.status.write = WriteStatus.SKIPPED
                ctx.status.plan = PlanStatus.SKIPPED
                logger.trace("Updater: text-based insertion yields no changes for %s", ctx.path)
                return
            ctx.views.updated = UpdatedView(lines=new_text.splitlines(keepends=True))
            ctx.status.plan = PlanStatus.INSERTED if apply else PlanStatus.PREVIEWED
            return
    except (ValueError, TypeError, AttributeError) as e:
        logger.warning("text-based insertion failed for %s: %s", ctx.path, e)

    # --- Insert: line-based fallback ---
    insert_index: int = ctx.header_processor.compute_insertion_anchor(original_lines)
    if insert_index == NO_LINE_ANCHOR:
        ctx.status.plan = PlanStatus.FAILED
        reason = f"No line-based insertion anchor for file: {ctx.path}"
        ctx.diagnostics.add_error(reason)
        ctx.request_halt(reason=reason, at_step=self)
        return

    # defensive clamp
    if insert_index < 0:
        insert_index = 0
    elif insert_index > len(original_lines):
        insert_index = len(original_lines)

    # optional whitespace adjustments
    try:
        header_lines: list[str] = ctx.header_processor.prepare_header_for_insertion(
            original_lines=original_lines,
            insert_index=insert_index,
            rendered_header_lines=rendered_expected_header_lines,
            newline_style=ctx.newline_style or "\n",
        )
    except (ValueError, TypeError, AttributeError) as e:
        logger.warning("prepare_header_for_insertion failed for %s: %s", ctx.path, e)
        header_lines = rendered_expected_header_lines

    # Splice header; if inserting at BOF and the first original line is a BOM-only blank,
    # consume it so we don't leave a dangling BOM+blank after the header.
    body_start: int = insert_index
    if insert_index == 0 and original_lines:
        first: str = original_lines[0]
        # Consume only a BOM-bearing blank at BOF (e.g., "\ufeff" or "\ufeff\n"),
        # but preserve a user-authored plain blank line.
        is_bom_blank: bool = (
            first.startswith("\ufeff") and first.replace("\ufeff", "").strip() == ""
        )
        if is_bom_blank:
            body_start = 1
            logger.trace(
                "Updater (line): consuming BOM-only/blank line at BOF after header insertion"
            )

    new_lines = original_lines[:insert_index] + header_lines + original_lines[body_start:]

    # If header occupies the tail and last line is blank, drop all trailing blanks.
    new_lines = _drop_trailing_blank_if_header_at_eof(
        new_lines, insert_index, len(header_lines)
    )
    # Canonicalize logically-empty placeholder bodies to keep round-trips stable.
    new_lines = _canonicalize_logically_empty_body_after_insert(
        lines=new_lines,
        insert_index=insert_index,
        header_len=len(header_lines),
        ctx=ctx,
    )
    # Prepend BOM if needed
    new_lines = _prepend_bom_to_lines_if_needed(new_lines, ctx)
    if new_lines == original_lines:
        ctx.views.updated = UpdatedView(lines=original_lines)
        ctx.status.plan = PlanStatus.SKIPPED
        logger.trace("Updater: line-based insertion yields no changes for %s", ctx.path)
        return
    ctx.views.updated = UpdatedView(lines=new_lines)
    ctx.status.plan = PlanStatus.INSERTED if apply else PlanStatus.PREVIEWED
    logger.trace("Updated file (line-based):\n%s", "".join(new_lines))
    return

hint

hint(ctx)

Attach update hints (non-binding).

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context.

required
Source code in src/topmark/pipeline/steps/planner.py
def hint(self, ctx: ProcessingContext) -> None:
    """Attach update hints (non-binding).

    Args:
        ctx: The processing context.
    """
    apply: bool = ctx.run_options.apply_changes is True
    ft: FileType | None = ctx.file_type
    checker: InsertChecker | None = ft.pre_insert_checker if ft else None
    st: PlanStatus = ctx.status.plan

    # May proceed to next step (always):
    if st == PlanStatus.INSERTED:
        ctx.hint(
            axis=Axis.PLAN,
            code=KnownCode.PLAN_INSERT,
            cluster=Cluster.CHANGED if apply else Cluster.WOULD_CHANGE,
            message="header will be inserted" if apply else "header would be inserted",
        )
    elif st == PlanStatus.REPLACED:
        ctx.hint(
            axis=Axis.PLAN,
            code=KnownCode.PLAN_UPDATE,
            cluster=Cluster.CHANGED if apply else Cluster.WOULD_CHANGE,
            message="header will be replaced" if apply else "header would be replaced",
        )
    elif st == PlanStatus.REMOVED:
        ctx.hint(
            axis=Axis.PLAN,
            code=KnownCode.PLAN_REMOVE,
            cluster=Cluster.CHANGED if apply else Cluster.WOULD_CHANGE,
            message="header will be removed" if apply else "header would be removed",
        )
    elif st == PlanStatus.SKIPPED:
        if ctx.status.content != ContentStatus.OK and not allow_insert_into_empty_like(ctx):
            msg: str = f"Could not update file (status: {ctx.status.content.value})."
        elif ctx.status.header in {
            HeaderStatus.MALFORMED_ALL_FIELDS,
            HeaderStatus.MALFORMED_SOME_FIELDS,
        }:
            # TODO: enable updating based on future policy:
            msg = "Existing header has malformed fields; TopMark will not update it."
        elif checker is not None and ctx.pre_insert_capability != InsertCapability.OK:
            pre_insert_reason: str = (
                ctx.pre_insert_reason or "pre-insert check refused insertion"
            )
            origin: str = ctx.pre_insert_origin or __name__
            msg = f"{pre_insert_reason} (origin: {origin})"
        else:
            msg = "no update needed"
        ctx.hint(
            axis=Axis.PLAN,
            code=KnownCode.PLAN_SKIP,
            cluster=Cluster.SKIPPED,
            message=msg,
            terminal=True,
        )
    # Stop processing:
    elif st == PlanStatus.PREVIEWED:
        # TODO: stop processing of proceed to next step?
        ctx.hint(
            axis=Axis.PLAN,
            code="previewed",
            # code=KnownCode.,
            # cluster=Cluster,
            message="previewed changes",
            terminal=True,
        )
    elif st == PlanStatus.FAILED:
        ctx.hint(
            axis=Axis.PLAN,
            code=KnownCode.PLAN_FAILED,
            cluster=Cluster.SKIPPED,
            message="failed to compute update",
            terminal=True,
        )
    elif st == PlanStatus.PENDING:
        # updater did not complete
        ctx.request_halt(reason=f"{self.__class__.__name__} did not set state.", at_step=self)