def run(self, ctx: ProcessingContext) -> None:
"""Remove the TopMark header using the processor and known span if available (view-based).
Args:
ctx: Pipeline context. Must contain a file image, the active header processor, and
(optionally) the scanner-detected header span.
Raises:
RuntimeError: If header processor is not defined.
Mutations:
ProcessingContext: The same context, with ``ctx.views.updated`` populated when a
removal occurs and ``StripStatus`` updated to reflect the outcome.
Notes:
- Leaves ``HeaderStatus`` untouched (owned by the scanner).
- Trims a single leading blank line when the header starts at the top of the file
(handled inside the processor).
"""
if ctx.header_processor is None:
# For static code analysis
raise RuntimeError("ctx.header_processor not defined")
if ctx.status.content != ContentStatus.OK:
# Respect content policy: do not attempt to strip when content was refused
ctx.status.strip = StripStatus.NOT_NEEDED
if ctx.status.fs == FsStatus.EMPTY:
reason: str = "Could not strip header from empty file."
else:
reason = f"Could not strip header from file (status: {ctx.status.content.value})."
ctx.diagnostics.add_info(reason)
return
if ctx.status.header is HeaderStatus.MISSING:
ctx.status.strip = StripStatus.NOT_NEEDED
reason = "No header to be stripped."
ctx.diagnostics.add_info(reason)
return
if ctx.status.header not in [HeaderStatus.EMPTY, HeaderStatus.DETECTED]:
if ctx.status.header in {
HeaderStatus.MALFORMED_ALL_FIELDS,
HeaderStatus.MALFORMED_SOME_FIELDS,
}:
# TODO: enable stripping based on future policy
ctx.status.strip = StripStatus.FAILED
reason = f"No header to be stripped: {ctx.status.header}"
ctx.diagnostics.add_info(reason)
ctx.request_halt(reason=reason, at_step=self)
else:
# No header to be stripped
ctx.status.strip = StripStatus.FAILED
reason = f"No header to be stripped: {ctx.status.header}"
ctx.diagnostics.add_info(reason)
ctx.request_halt(reason=reason, at_step=self)
return
original_lines: list[str] = list(ctx.iter_image_lines())
if not original_lines:
# Empty file
ctx.status.strip = StripStatus.NOT_NEEDED
reason = "Empty file, stripping not needed."
ctx.diagnostics.add_info(reason)
return
# Prefer the span detected by the scanner; fall back to processor logic otherwise.
header_view: HeaderView | None = ctx.views.header
span: tuple[int, int] | None = header_view.range if header_view else None
strip_result: StripHeaderResult = ctx.header_processor.strip_header_block(
lines=original_lines,
span=span,
newline_style=ctx.newline_style,
ends_with_newline=ctx.ends_with_newline,
)
new_lines: list[str] = strip_result.lines
removed: tuple[int, int] | None = strip_result.removed_span
diag: StripDiagnostic = strip_result.diagnostic
# Surface any additional diagnostic notes from the processor
for note in getattr(diag, "notes", []) or []:
ctx.diagnostics.add_info(note)
# Handle diagnostic outcome explicitly before continuing.
if diag.kind is StripDiagKind.NOT_FOUND:
ctx.status.strip = StripStatus.NOT_NEEDED
reason = diag.reason or "No header detected."
ctx.diagnostics.add_info(reason)
return
if diag.kind is StripDiagKind.NOOP_EMPTY:
ctx.status.strip = StripStatus.NOT_NEEDED
reason = diag.reason or "Empty file, nothing to strip."
ctx.diagnostics.add_info(reason)
return
if diag.kind is StripDiagKind.MALFORMED_REFUSED:
ctx.status.strip = StripStatus.NOT_NEEDED
reason = diag.reason or "Malformed header detected; removal refused by policy."
ctx.diagnostics.add_error(reason)
ctx.request_halt(reason=reason, at_step=self)
return
if diag.kind is StripDiagKind.ERROR:
ctx.status.strip = StripStatus.NOT_NEEDED
reason = diag.reason or "Error while analyzing header for stripping."
ctx.diagnostics.add_error(reason)
ctx.request_halt(reason=reason, at_step=self)
return
# For REMOVED or MALFORMED_REMOVED, proceed with post-removal normalization.
# Guard: if processor reported removal but we somehow have no span/changes, treat as no-op.
if removed is None or new_lines == original_lines:
ctx.status.strip = StripStatus.NOT_NEEDED
reason = diag.reason or "Nothing to strip."
ctx.diagnostics.add_info(reason)
return
# Optionally remove a single trailing blank line that TopMark inserted after the header.
# This restores the pre-insert image and makes insert→strip→insert idempotent.
policy: FileTypeHeaderPolicy | None = ctx.file_type.header_policy if ctx.file_type else None
if policy and policy.ensure_blank_after_header:
start: int
_end: int
start, _end = removed
# After removal, the original header start index is where our *own* blank
# separator would remain (if we previously inserted one). Only drop an
# *exact* blank line that matches the file's newline style (e.g., "\n" or "\r\n"),
# and DO NOT drop whitespace-only lines like " \n" - those belong to the user's body.
if 0 <= start < len(new_lines):
nxt: str = new_lines[start]
if nxt == ctx.newline_style:
logger.debug("stripper: dropped exact blank separator after removed header")
new_lines.pop(start)
# If the body after header removal consists only of *exact* blank lines that
# match the file's newline style (e.g., "\n" or "\r\n"), collapse them.
#
# IMPORTANT: preserve final-newline (FNL) semantics. If the original file ended
# with a newline, keep exactly one newline-style blank line. Otherwise collapse
# to truly empty.
#
# Do NOT collapse whitespace-only lines like " \n" - those belong to the user's body.
if new_lines and all(ln == ctx.newline_style for ln in new_lines):
if ctx.ends_with_newline is True:
logger.debug(
"stripper: body is only exact blank lines; preserving one blank line for FNL."
)
new_lines = [ctx.newline_style]
else:
logger.debug("stripper: body is only exact blank lines; collapsing to empty.")
new_lines = []
logger.info("Updated file lines: %s", new_lines[:15])
# If stripping yields an empty image but the original file was a logically-empty
# placeholder (e.g. just a newline), restore that placeholder so round-trips are
# stable (insert→strip→insert).
new_lines = _restore_logical_empty_placeholder_after_strip(new_lines, ctx)
updated_lines: list[str] = _reapply_bom_after_strip(new_lines, ctx)
# Preserve original final-newline (FNL) semantics: if the original file did not
# end with a newline, strip a single trailing newline sequence from the final line
# of the stripped image. This keeps single-line XML round-trips stable.
if ctx.ends_with_newline is False and updated_lines:
# Only trim when you know the original had no final newline
# (ctx.ends_with_newline is neither None nor True):
last: str = updated_lines[-1]
if last.endswith("\r\n"):
updated_lines[-1] = last[:-2]
elif last.endswith("\n") or last.endswith("\r"):
updated_lines[-1] = last[:-1]
# Normalize trailing blanks conservatively.
# IMPORTANT: Never *add* a newline to a BOM-only image here.
# `_reapply_bom_after_strip()` is the single place that may represent
# a BOM-only file as BOM+NL when (and only when) the original file ended
# with a newline. Keeping this invariant is required for insert→strip→insert
# idempotence on BOM-only inputs.
if updated_lines:
# Case 1: BOM-only image - keep as-is for round-trip fidelity.
if len(updated_lines) == 1 and updated_lines[0] == "\ufeff":
pass
else:
# Case 2: If first is BOM-only (possibly with newline), and the rest are *exact*
# blanks, collapse to BOM-only.
# Case 3: If the stripped image contains only *exact* blank lines (and no BOM),
# collapse to truly empty.
first_no_bom: str = updated_lines[0].lstrip("\ufeff")
if (
updated_lines[0].startswith("\ufeff")
and len(updated_lines) > 1
# Only collapse when the first line is truly BOM-only (possibly with a newline)
and (first_no_bom == "" or first_no_bom == ctx.newline_style)
# and everything after is an exact blank line
and all(s == ctx.newline_style for s in updated_lines[1:])
# TODO - dedicated strip WS policy:
# and all(is_pure_spacer(s, policy) for s in updated_lines[1:]
):
# First line is BOM-only, there is at least one trailing line,
# and everything after is blank: collapse to BOM-only.
# Preserve original final-newline semantics.
if ctx.ends_with_newline is True:
updated_lines = ["\ufeff" + ctx.newline_style]
else:
updated_lines = ["\ufeff"]
# elif all(is_pure_spacer(s, policy) for s in updated_lines):
# (TODO - dedicated strip WS policy - see commented-out previous line)
elif all(s == ctx.newline_style for s in updated_lines):
# Preserve original final-newline semantics.
if ctx.ends_with_newline is True: # noqa: SIM108
updated_lines = [ctx.newline_style]
else:
# Case 4: If *all* lines are blank-like and no BOM, collapse to empty.
updated_lines = []
# Case 4: Otherwise, leave as-is (body has non-blank content).
# A header was present and removed
ctx.views.updated = UpdatedView(lines=updated_lines)
ctx.status.strip = StripStatus.READY
logger.debug("stripper: removed header lines at span %d..%d", removed[0], removed[1])
return