Skip to content

topmark.pipeline.steps.writer

topmark / pipeline / steps / writer

Writer step for committing updated content to a sink.

This step is the canonical place where TopMark writes results to a destination (filesystem, stdout, or dry-run). It avoids implementation drift between the CLI and public API.

This step is responsible for the final I/O after all other steps have computed ctx.views.updated and selected the intended PlanStatus (INSERTED, REPLACED, REMOVED, or PREVIEWED for stdout output). It also applies policy gates (e.g., header mutation mode) so that command-line intent and config policies are centralized here.

Sinks

  • InplaceFileSink: writes in-place to the file path.
  • AtomicFileSink: writes through a same-directory temporary file and atomically replaces the target. POSIX-only durability and permission helpers such as os.fchmod() and os.O_DIRECTORY are used only when available; Windows falls back to best-effort path-based permission handling and skips directory fsync().
  • StdoutSink: writes the updated content to stdout (stdin-content mode).
  • NullSink: no-op (dry-run).

The step respects WriterStep.may_proceed() and the tri-state intent/feasibility via ProcessingContext.would_change and ProcessingContext.can_change.

WriteSink

Bases: Protocol

Behavioral protocol for writer-step sinks.

Sink implementations encapsulate the final destination-specific I/O for a processing context, such as writing to disk, emitting to stdout, or performing a dry-run no-op. The protocol intentionally exposes only behavior and does not require shared sink state.

write

write(*, ctx)

Write the updated content for ctx to the target sink.

Implementations perform the final write operation for a processed file, such as writing to disk, emitting to stdout, or performing no operation in dry-run mode.

Parameters:

Name Type Description Default
ctx ProcessingContext

Context that holds updated content and write status.

required

Returns:

Type Description
WriteResult

Structured result indicating the write status and byte count when

WriteResult

the sink can report one.

Source code in src/topmark/pipeline/steps/writer.py
def write(self, *, ctx: ProcessingContext) -> WriteResult:
    """Write the updated content for ``ctx`` to the target sink.

    Implementations perform the final write operation for a processed file,
    such as writing to disk, emitting to stdout, or performing no operation
    in dry-run mode.

    Args:
        ctx: Context that holds updated content and write status.

    Returns:
        Structured result indicating the write status and byte count when
        the sink can report one.
    """
    ...

WriteResult dataclass

WriteResult(*, status, bytes_written=0)

Structured result of a writer sink operation.

Attributes:

Name Type Description
status WriteStatus

Final status reported by the sink.

bytes_written int

Number of bytes written when the sink can report one; zero for dry-run, skipped, failed, or unreported file writes.

NullSink

Dry-run sink: does not write anything.

write

write(*, ctx)

No-op write for dry-run mode.

Parameters:

Name Type Description Default
ctx ProcessingContext

Processing context for the current file.

required

Returns:

Type Description
WriteResult

The current ctx.status.write echoed back with zero bytes written.

Source code in src/topmark/pipeline/steps/writer.py
def write(self, *, ctx: ProcessingContext) -> WriteResult:
    """No-op write for dry-run mode.

    Args:
        ctx: Processing context for the current file.

    Returns:
        The current ``ctx.status.write`` echoed back with zero bytes written.
    """
    return WriteResult(status=ctx.status.write, bytes_written=0)

StdoutSink

Standard-output sink (stdin-content mode).

write

write(*, ctx)

Emit updated content to standard output.

This sink is used when the CLI/API is configured to read a single file's content from STDIN and emit the updated result to STDOUT.

Parameters:

Name Type Description Default
ctx ProcessingContext

Processing context containing the updated lines.

required

Returns:

Type Description
WriteResult

WRITTEN with the number of UTF-8 bytes printed when content is available;

WriteResult

otherwise SKIPPED with zero bytes written.

Source code in src/topmark/pipeline/steps/writer.py
def write(self, *, ctx: ProcessingContext) -> WriteResult:
    """Emit updated content to standard output.

    This sink is used when the CLI/API is configured to read a single file's
    content from STDIN and emit the updated result to STDOUT.

    Args:
        ctx: Processing context containing the updated lines.

    Returns:
        ``WRITTEN`` with the number of UTF-8 bytes printed when content is available;
        otherwise ``SKIPPED`` with zero bytes written.
    """
    lines: list[str] | None = _updated_lines(ctx)
    if lines is None:
        logger.debug(
            "StdoutSink: ctx.views.updated not defined or ctx.views.updated.lines not defined: "
            "nothing to do"
        )
        return WriteResult(status=WriteStatus.SKIPPED, bytes_written=0)

    text: str = "".join(lines)
    size: int = len(text.encode("utf-8"))
    print(text, end="")  # noqa: T201 (intentional: pipeline handles stdout here)
    return WriteResult(status=WriteStatus.WRITTEN, bytes_written=size)

InplaceFileSink

Bases: WriteSink

Write updated content by truncating the original file and writing in place.

Pros
  • Keeps inode identity stable.
  • Minimal I/O.

Cons: - Risk of partial/truncated files on crash. - Live readers may observe mid-write changes.

write

write(*, ctx)

Write updated content directly into the original file (in-place).

Opens the file in binary write mode, truncates its contents, and writes ctx.views.updated.lines directly. This operation preserves the inode identity but may leave a truncated file if the process is interrupted mid-write.

Parameters:

Name Type Description Default
ctx ProcessingContext

The active processing context, expected to contain ctx.views.updated.lines with UTF-8-encoded text to write.

required

Returns:

Type Description
WriteResult

Structured write result containing WriteStatus.WRITTEN on success,

WriteResult

or WriteStatus.FAILED after recording diagnostic information on error.

Source code in src/topmark/pipeline/steps/writer.py
def write(self, *, ctx: ProcessingContext) -> WriteResult:
    """Write updated content directly into the original file (in-place).

    Opens the file in binary write mode, truncates its contents, and writes
    `ctx.views.updated.lines` directly. This operation preserves the inode identity
    but may leave a truncated file if the process is interrupted mid-write.

    Args:
        ctx: The active processing context, expected to contain `ctx.views.updated.lines`
            with UTF-8-encoded text to write.

    Returns:
        Structured write result containing `WriteStatus.WRITTEN` on success,
        or `WriteStatus.FAILED` after recording diagnostic information on error.
    """
    path: Path = ctx.path
    try:
        # Preserve mode; other metadata handled best-effort.
        try:
            st_mode: int = path.stat().st_mode
            mode: int | None = stat.S_IMODE(st_mode)
        except OSError:
            mode = None

        with path.open("wb") as f:
            # Prefer streaming via the iterator (good for memory)
            for line in ctx.iter_updated_lines():
                f.write(line.encode("utf-8"))
            f.flush()
            os.fsync(f.fileno())
        if mode is not None:
            # Best-effort: preserve original permissions.
            with contextlib.suppress(OSError):
                path.chmod(mode)
        return WriteResult(status=WriteStatus.WRITTEN)
    except (OSError, UnicodeError) as e:
        ctx.diagnostics.add_error(f"In-place write failed: {e}")
        return WriteResult(status=WriteStatus.FAILED)

AtomicFileSink

Bases: WriteSink

Write updated content to a temp file and atomically replace the target.

This sink writes to a temporary file in the same directory as the target, fsync()s it, then calls os.replace() to atomically swap it in.

Permission preservation and directory durability are best-effort and platform-aware: POSIX uses os.fchmod() and directory fsync() when available, while Windows falls back to path-based chmod() and skips directory fsync() because os.fchmod and os.O_DIRECTORY are not exposed there.

Pros
  • Atomic visibility; crash-safe (old file remains until replace).

Cons: - New inode/ID on POSIX; slightly more I/O. - Directory fsync() is POSIX-only and therefore skipped on Windows.

write

write(*, ctx)

Atomically replace the target file by writing to a temp file first.

Writes ctx.views.updated.lines to a temporary file in the same directory, calls os.fsync() to ensure durability, and performs os.replace() to atomically swap it in place. The operation guarantees that readers will either see the old file or the complete new file, never a partial write.

Parameters:

Name Type Description Default
ctx ProcessingContext

The active processing context, expected to contain ctx.views.updated.lines with UTF-8-encoded text to write.

required

Returns:

Type Description
WriteResult

Structured write result with WriteStatus.WRITTEN on success, or

WriteResult

WriteStatus.FAILED after recording diagnostic information on error.

Source code in src/topmark/pipeline/steps/writer.py
def write(self, *, ctx: ProcessingContext) -> WriteResult:
    """Atomically replace the target file by writing to a temp file first.

    Writes `ctx.views.updated.lines` to a temporary file in the same directory,
    calls `os.fsync()` to ensure durability, and performs `os.replace()` to
    atomically swap it in place. The operation guarantees that readers will
    either see the old file or the complete new file, never a partial write.

    Args:
        ctx: The active processing context, expected to
            contain `ctx.views.updated.lines` with UTF-8-encoded text to write.

    Returns:
        Structured write result with `WriteStatus.WRITTEN` on success, or
        `WriteStatus.FAILED` after recording diagnostic information on error.
    """
    path: Path = ctx.path
    dirpath: Path = path.parent
    # Generate a hidden, per-process, per-file temp name.
    tmp: Path = dirpath / f".{path.name}.topmark.tmp-{os.getpid()}-{secrets.token_hex(4)}"
    try:
        # Read original metadata for later re-apply (best-effort)
        try:
            st: os.stat_result | None = path.stat()
            mode: int | None = stat.S_IMODE(st.st_mode) if st else None
        except OSError:
            st = None
            mode = None

        with tmp.open("wb") as f:
            # Apply permissions early to reduce race windows.
            if mode is not None:
                fchmod = getattr(os, "fchmod", None)
                if fchmod is not None:
                    try:
                        # Try to apply permissions to the open file descriptor (fchmod).
                        # This is ideal: no race window, applies before rename.
                        fchmod(f.fileno(), mode)
                    except OSError:
                        # Fallback behavior: if fchmod fails, try chmod(tmp).
                        with contextlib.suppress(OSError):
                            tmp.chmod(mode)
                else:
                    # Windows does not expose os.fchmod. Apply permissions to the temporary path
                    # directly as a best-effort fallback.
                    with contextlib.suppress(OSError):
                        tmp.chmod(mode)
            # Prefer streaming via the iterator (good for memory)
            for line in ctx.iter_updated_lines():
                f.write(line.encode("utf-8"))
            f.flush()
            os.fsync(f.fileno())

        tmp.replace(path)

        # Try to fsync the directory for durability (POSIX only).
        o_directory: int | None = getattr(os, "O_DIRECTORY", None)
        if o_directory is not None:
            try:
                dir_fd: int = os.open(str(dirpath), o_directory)
                try:
                    os.fsync(dir_fd)
                finally:
                    os.close(dir_fd)
            except OSError:
                # Best-effort durability; ignore on platforms/filesystems that don't support it.
                logger.debug("AtomicFileSink: directory fsync not supported", exc_info=True)
        else:
            logger.debug("AtomicFileSink: directory fsync not available on this platform")

        return WriteResult(status=WriteStatus.WRITTEN)
    except (OSError, UnicodeError) as e:
        # Best-effort cleanup of the temp file
        try:
            if tmp.exists():
                tmp.unlink()
        except OSError:
            logger.debug("AtomicFileSink: failed to clean up temp file %s", tmp, exc_info=True)
        ctx.diagnostics.add_error(f"Atomic write failed: {e}")
        return WriteResult(status=WriteStatus.FAILED)

WriterStep

WriterStep()

Bases: BaseStep

Commit updated content to a sink (filesystem/stdout/null).

Applies policy gates and the intended write action (insert/replace/remove) to produce a final write result. Performs the only I/O in the pipeline.

Axes written
  • write
Sets
  • WriteStatus: {PENDING, WRITTEN, SKIPPED, FAILED}
Source code in src/topmark/pipeline/steps/writer.py
def __init__(self) -> None:
    super().__init__(
        name=self.__class__.__name__,
        primary_axis=Axis.WRITE,
        axes_written=(Axis.WRITE,),
    )

may_proceed

may_proceed(ctx)

Return True if the writer is allowed to commit changes.

The writer should only run when
  • The pipeline has not requested an early halt (ctx.flow.halt is False);
  • The caller explicitly enabled applying changes (run_options.apply_changes is True);
  • The updater selected a concrete write action (INSERTED/REPLACED/REMOVED);
  • We have an updated image to write and the engine deemed the change safe (ctx.views.updated.lines is present and can_change(ctx) is True).

Policy and intent have already been enforced by the updater. Re-checking header/comparison/strip intent here can drift from the authoritative UpdateStatus and cause double-gating, so we avoid it.

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context for the current file.

required

Returns:

Type Description
bool

True if processing can proceed to the write step, False otherwise.

Source code in src/topmark/pipeline/steps/writer.py
def may_proceed(self, ctx: ProcessingContext) -> bool:
    """Return True if the writer is allowed to commit changes.

    The writer should only run when:
      * The pipeline has not requested an early halt (``ctx.flow.halt`` is False);
      * The caller explicitly enabled applying changes (``run_options.apply_changes`` is True);
      * The updater selected a concrete write action (``INSERTED``/``REPLACED``/``REMOVED``);
      * We have an updated image to write and the engine deemed the change safe
        (``ctx.views.updated.lines`` is present and ``can_change(ctx) is True``).

    Policy and intent have already been enforced by the updater. Re-checking
    header/comparison/strip intent here can drift from the authoritative
    ``UpdateStatus`` and cause double-gating, so we avoid it.

    Args:
        ctx: The processing context for the current file.

    Returns:
        True if processing can proceed to the write step, False otherwise.
    """
    if ctx.is_halted:
        return False

    # Require an updated image.
    updated_view: UpdatedView | None = ctx.views.updated
    updated_view_exists: bool = updated_view is not None and updated_view.lines is not None
    if not updated_view_exists:
        return False

    # STDOUT target is non-mutating. Allow emitting updated content even when
    # `apply_changes` is False (preview) and even when filesystem feasibility
    # would block an on-disk write.
    if ctx.run_options.output_target == OutputTarget.STDOUT:
        return ctx.status.plan in {
            PlanStatus.PREVIEWED,
            PlanStatus.INSERTED,
            PlanStatus.REPLACED,
            PlanStatus.REMOVED,
        }

    # File target: only write when the caller explicitly enabled apply mode.
    if not ctx.run_options.apply_changes:
        return False

    # Only execute when updater produced a concrete write operation.
    if ctx.status.plan not in {
        PlanStatus.INSERTED,
        PlanStatus.REPLACED,
        PlanStatus.REMOVED,
    }:
        return False

    return can_change(ctx) is True

run

run(ctx)

Writer step: commit updates to the selected sink.

This step executes only when may_proceed() returns True. Otherwise it converts a preview status to a non-mutating terminal status.

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context with update intent.

required
Mutations

Updates ctx.status.write and may append diagnostics when policy or sink failures prevent writing.

Source code in src/topmark/pipeline/steps/writer.py
def run(self, ctx: ProcessingContext) -> None:
    """Writer step: commit updates to the selected sink.

    This step executes only when `may_proceed()` returns `True`.
    Otherwise it converts a preview status to a non-mutating terminal status.

    Args:
        ctx: The processing context with update intent.

    Mutations:
        Updates `ctx.status.write` and may append diagnostics when policy or
        sink failures prevent writing.
    """
    logger.debug("ctx: %s", ctx)

    # In preview mode we normally skip writes. However, when the configured
    # destination is STDOUT, previewing means "emit the updated content".
    if (
        ctx.status.plan == PlanStatus.PREVIEWED
        and ctx.run_options.output_target != OutputTarget.STDOUT
    ):
        ctx.status.write = WriteStatus.SKIPPED
        return

    # --- Policy enforcement (centralized + file-type-specific when configured) ---
    pol: FrozenPolicy = ctx.get_effective_policy()

    # Only gate insert/replace (check mode) - strip/removal is not governed by add/update.
    if (
        ctx.status.plan == PlanStatus.INSERTED
        and pol.header_mutation_mode == HeaderMutationMode.UPDATE_ONLY
    ):
        ctx.status.write = WriteStatus.SKIPPED
        ctx.diagnostics.add_info("Skipped by policy: header_mutation_mode=update_only")
        return

    if (
        ctx.status.plan == PlanStatus.REPLACED
        and pol.header_mutation_mode == HeaderMutationMode.ADD_ONLY
    ):
        ctx.status.write = WriteStatus.SKIPPED
        ctx.diagnostics.add_info("Skipped by policy: header_mutation_mode=add_only")
        return

    # Defensive: nothing to write if updater did not produce an updated image
    updated_view: UpdatedView | None = ctx.views.updated
    if updated_view is None or updated_view.lines is None:
        ctx.diagnostics.add_info("File unchanged - nothing to write.")
        return

    sink: WriteSink = _select_sink(ctx)
    result: WriteResult = sink.write(ctx=ctx)

    # Update write status:
    ctx.status.write = result.status

hint

hint(ctx)

Attach write hints (non-binding).

Parameters:

Name Type Description Default
ctx ProcessingContext

The processing context.

required
Source code in src/topmark/pipeline/steps/writer.py
def hint(self, ctx: ProcessingContext) -> None:
    """Attach write hints (non-binding).

    Args:
        ctx: The processing context.
    """
    st: WriteStatus = ctx.status.write
    # May proceed to next step (always):
    if st == WriteStatus.WRITTEN:
        ctx.hint(
            axis=Axis.WRITE,
            code=KnownCode.WRITE_WRITTEN,
            cluster=Cluster.CHANGED,
            message="changes written",
        )
    elif st == WriteStatus.SKIPPED:
        if ctx.status.plan in {PlanStatus.INSERTED, PlanStatus.REPLACED}:
            msg: str = "write skipped (policy)"
        else:
            msg = "write skipped"
        ctx.hint(
            axis=Axis.WRITE,
            code=KnownCode.WRITE_SKIPPED,
            cluster=Cluster.SKIPPED,
            message=msg,
        )
    # Stop processing:
    elif st == WriteStatus.FAILED:
        ctx.hint(
            axis=Axis.WRITE,
            code=KnownCode.WRITE_FAILED,
            cluster=Cluster.ERROR,
            message="write failed",
            terminal=True,
        )
    elif st == WriteStatus.PENDING:
        # writer did not complete
        ctx.request_halt(reason=f"{self.__class__.__name__} did not set state.", at_step=self)