def run(self, ctx: ProcessingContext) -> None:
"""Plan insert/replace/remove of the TopMark header for the current file (view-based).
Behavior by case:
- **Strip fast-path**: When ``status.strip == READY``, keep the precomputed
``ctx.views.updated.lines``, reattach a BOM if the reader saw one, and mark
``write=REMOVED`` (or ``PREVIEWED`` when not applying).
- **Already up-to-date**: When ``status.comparison == UNCHANGED``, do nothing,
set ``write=SKIPPED``, and mirror the original image into ``ctx.views.updated``.
- **Replace**: If a header range is known (``ctx.views.header.range``), splice
the rendered header lines (``ctx.views.render.lines``) over that range; reattach BOM
if needed; if the result equals the original, set ``write=SKIPPED``; otherwise
``write=REPLACED`` (or ``PREVIEWED``).
- **Insert (text-based)**: If the processor provides a character offset, insert
the rendered header text there (after optional ``prepare_header_for_insertion_text``),
reattach BOM if needed; if identical to original, ``SKIPPED``; else ``INSERTED``.
- **Insert (line-based fallback)**: Use ``compute_insertion_anchor`` (line index),
optionally adjust whitespace via ``prepare_header_for_insertion``, reattach BOM
if needed; set ``INSERTED`` unless the result is identical, in which case ``SKIPPED``.
Args:
ctx: The processing context.
Notes:
This function performs no I/O; it only populates ``ctx.views.updated`` and
``ctx.status.write`` for downstream patch/apply steps.
"""
logger.debug("ctx: %s", ctx)
logger.debug("ctx.run_options.apply_changes = %s", ctx.run_options.apply_changes)
apply: bool = (
False if ctx.run_options.apply_changes is None else ctx.run_options.apply_changes
)
# Materialize original image once (list[str]) for splice operations.
original_lines: list[str] = list(ctx.iter_image_lines())
if ctx.status.content != ContentStatus.OK and not allow_insert_into_empty_like(ctx):
ctx.status.plan = PlanStatus.SKIPPED
reason: str = f"Could not update file (status: {ctx.status.content.value})."
ctx.diagnostics.add_info(reason)
ctx.request_halt(reason=reason, at_step=self)
return
# TODO: enable updating based on future allow_XXX_by_policy() policy:
if ctx.status.header in {
HeaderStatus.MALFORMED_ALL_FIELDS,
HeaderStatus.MALFORMED_SOME_FIELDS,
}:
ctx.status.plan = PlanStatus.SKIPPED
ctx.views.updated = UpdatedView(lines=original_lines)
reason = "Existing header has malformed fields; TopMark will not update it."
ctx.diagnostics.add_warning(reason)
ctx.request_halt(reason=reason, at_step=self)
return
logger.debug("ctx: %s", ctx.to_dict())
# --- strip fast-path (must run before any add/replace logic) ---
if ctx.status.strip == StripStatus.READY:
# Previous step computed updated_file_lines for a removal.
updated_view: UpdatedView | None = ctx.views.updated
if not updated_view or updated_view.lines is None:
# TODO: check whether this can be the case when the file to be stripped
# would become empty after stripping!
logger.error(
"Stripper might have omitted to set ctx.views.updated, "
"or the resulting updated file might become empty after stripping. "
"ctx.views.updated is %s, ctx.views.updated.file_lines is %s",
"defined" if updated_view else "not set",
"defined" if updated_view and updated_view.lines else "not set",
)
ctx.status.plan = PlanStatus.FAILED # TODO FIXME
reason = "No updated file lines available for stripping."
ctx.request_halt(reason=reason, at_step=self)
return
# âś… Preserve empty list as a valid updated image
seq: Sequence[str] | Iterable[str] = updated_view.lines
stripped_lines: list[str] = seq if isinstance(seq, list) else list(seq)
# Re-attach BOM only if needed (no-op for empty)
stripped_lines = _prepend_bom_to_lines_if_needed(stripped_lines, ctx)
ctx.views.updated = UpdatedView(lines=stripped_lines)
ctx.status.plan = PlanStatus.REMOVED if apply else PlanStatus.PREVIEWED
return
# If the comparer determined the file is already compliant, do nothing.
if ctx.status.comparison == ComparisonStatus.UNCHANGED:
ctx.status.plan = PlanStatus.SKIPPED
# Preserve the original image as the "updated" content for downstream steps.
ctx.views.updated = UpdatedView(lines=original_lines)
logger.trace("Updater: no-op (comparison=UNCHANGED) for %s", ctx.path)
return
# Non-strip processing
render_view: RenderView | None = ctx.views.render
if render_view is None or render_view.lines is None:
ctx.status.plan = PlanStatus.FAILED
reason = "Cannot update header: no rendered header available"
ctx.diagnostics.add_error(reason)
ctx.request_halt(reason=reason, at_step=self)
return
rendered_expected_header_lines: list[str] = list(render_view.lines)
if ctx.header_processor is None:
ctx.status.plan = PlanStatus.FAILED
reason = "Cannot update header: no header processor assigned"
ctx.diagnostics.add_error(reason)
ctx.request_halt(reason=reason, at_step=self)
return
ft: FileType | None = ctx.file_type
checker: InsertChecker | None = ft.pre_insert_checker if ft else None
# --- Pre-insert capability check (authoritative) --------------------------
# Only evaluate here if the advisory checker did not run (UNEVALUATED).
if checker is not None:
if ctx.pre_insert_capability == InsertCapability.UNEVALUATED:
try:
view = PreInsertViewAdapter(ctx)
result: InsertCheckResult = checker(view) or {}
except Exception as exc:
logger.exception(
"pre-insert checker failed for %s: %s", getattr(ft, "name", ft), exc
)
# Obtain the name of the checker
checker_name = getattr(checker, "__qualname__", None) or getattr(
checker, "__name__", None
)
if checker_name is None:
checker_name = type(checker).__name__
result = InsertCheckResult(
capability=InsertCapability.SKIP_OTHER,
reason=f"checker error: {checker_name}, {exc}",
origin=__name__,
)
# Persist the authoritative view for downstream bucketing/rendering
cap: InsertCapability = result.get("capability", InsertCapability.OK)
ctx.pre_insert_capability = cap
ctx.pre_insert_reason = result.get("reason", ctx.pre_insert_reason)
ctx.pre_insert_origin = result.get("origin", __name__)
if cap != InsertCapability.OK:
pre_insert_reason = ctx.pre_insert_reason or "pre-insert checker skipped update"
origin = ctx.pre_insert_origin or __name__
logger.debug(
"pre-insert: %s - %s", getattr(cap, "value", cap), pre_insert_reason
)
# Preserve original image; mark as skipped
ctx.views.updated = UpdatedView(lines=original_lines)
ctx.status.plan = PlanStatus.SKIPPED
reason = f"{pre_insert_reason} (origin: {origin})"
ctx.diagnostics.add_warning(reason)
ctx.request_halt(reason=reason, at_step=self)
return
if ctx.pre_insert_capability == InsertCapability.SKIP_IDEMPOTENCE_RISK:
# TODO - align with reader.ReaderStep.run()
if ctx.status.content == ContentStatus.OK and allow_content_reflow(ctx):
pass
else:
# Advisory-only pre-insert probe already ran in reader step.
# Here we enforce the gate: if the advisory is not OK, skip updating.
pre_insert_reason: str = (
ctx.pre_insert_reason or "pre-insert check refused insertion"
)
origin: str = ctx.pre_insert_origin or __name__
logger.debug(
"pre-insert (advisory): %s - %s",
getattr(ctx.pre_insert_capability, "value", ctx.pre_insert_capability),
pre_insert_reason,
)
ctx.views.updated = UpdatedView(lines=original_lines)
ctx.status.plan = PlanStatus.SKIPPED
reason = f"{pre_insert_reason} (origin: {origin})"
ctx.diagnostics.add_warning(reason)
ctx.request_halt(reason=reason, at_step=self)
return
# --- Pre-insert capability gate (authoritative) ---------------------------
elif ctx.pre_insert_capability != InsertCapability.OK:
# Advisory-only pre-insert probe already ran in reader step.
# Here we enforce the gate: if the advisory is not OK, skip updating.
pre_insert_reason: str = (
ctx.pre_insert_reason or "pre-insert check refused insertion"
)
origin: str = ctx.pre_insert_origin or __name__
logger.debug(
"pre-insert (advisory): %s - %s",
getattr(ctx.pre_insert_capability, "value", ctx.pre_insert_capability),
pre_insert_reason,
)
ctx.views.updated = UpdatedView(lines=original_lines)
ctx.status.plan = PlanStatus.SKIPPED
reason = f"{pre_insert_reason} (origin: {origin})"
ctx.diagnostics.add_warning(reason)
ctx.request_halt(reason=reason, at_step=self)
return
# --- Replace path (view-based) ---
header_view: HeaderView | None = ctx.views.header
existing_range: tuple[int, int] | None = header_view.range if header_view else None
if existing_range is not None:
# Replace existing header: remove old header lines and insert new header in place
start: int
end: int
start, end = existing_range
new_lines: list[str] = (
original_lines[:start] + rendered_expected_header_lines + original_lines[end + 1 :]
)
# Prepend BOM if needed
new_lines = _prepend_bom_to_lines_if_needed(new_lines, ctx)
# If replacement is identical to the original, treat as a no-op.
if new_lines == original_lines:
ctx.status.plan = PlanStatus.SKIPPED
ctx.views.updated = UpdatedView(lines=original_lines)
logger.trace("Updater: replacement yields no changes for %s", ctx.path)
return
ctx.status.plan = PlanStatus.REPLACED if apply else PlanStatus.PREVIEWED
ctx.views.updated = UpdatedView(lines=new_lines)
logger.trace("Updated file (replace):\n%s", "".join(new_lines))
return
# --- Insert: text-based first ---
try:
logger.debug("upd.path: try=text; file=%s", ctx.path)
original_text: str = "".join(original_lines)
char_offset: int | None = None
if hasattr(ctx.header_processor, "get_header_insertion_char_offset"):
char_offset = ctx.header_processor.get_header_insertion_char_offset(original_text)
logger.debug("upd.text: offset=%s; head[:40]=%r", char_offset, original_text[:40])
if char_offset is not None:
header_text: str = "".join(rendered_expected_header_lines)
if hasattr(ctx.header_processor, "prepare_header_for_insertion_text"):
try:
header_text = ctx.header_processor.prepare_header_for_insertion_text(
original_text=original_text,
insert_offset=char_offset,
rendered_header_text=header_text,
newline_style=ctx.newline_style or "\n",
)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(
"prepare_header_for_insertion_text failed for %s: %s", ctx.path, e
)
logger.debug(
"upd.text.pad: insert_offset=%d; header_head[:40]=%r",
char_offset,
header_text[:40],
)
logger.debug(
"upd.text.splice: pre_tail[:10]=%r post_head[:10]=%r",
original_text[max(0, char_offset - 10) : char_offset],
original_text[char_offset : char_offset + 10],
)
new_text: str = (
original_text[:char_offset] + header_text + original_text[char_offset:]
)
# If header text ends at EOF with trailing blank lines, drop them all.
# The blank-after-header policy should only apply when body text follows.
while new_text.endswith("\r\n\r\n"):
new_text = new_text[:-2] # drop one CRLF pair
while new_text.endswith("\n\n") or new_text.endswith("\r\r"):
new_text = new_text[:-1] # drop one LF or CR
logger.trace("Updater (text): trimmed EOF blanks; len=%d", len(new_text))
# Prepend BOM if needed
if getattr(ctx, "leading_bom", False) and not new_text.startswith("\ufeff"):
new_text = "\ufeff" + new_text
# Canonicalize logically-empty placeholder bodies (e.g. a file that was just "\n")
# so insert→strip→insert round-trips are stable (avoid a double trailing newline).
if getattr(ctx, "is_logically_empty", False):
insert_index_lines: int = (
len(original_text[:char_offset].splitlines(keepends=True))
if char_offset
else 0
)
header_len_lines: int = len(header_text.splitlines(keepends=True))
new_lines_tmp: list[str] = new_text.splitlines(keepends=True)
new_lines_tmp = _canonicalize_logically_empty_body_after_insert(
lines=new_lines_tmp,
insert_index=insert_index_lines,
header_len=header_len_lines,
ctx=ctx,
)
new_text = "".join(new_lines_tmp)
if new_text == original_text:
ctx.views.updated = UpdatedView(lines=original_lines)
# ctx.status.write = WriteStatus.SKIPPED
ctx.status.plan = PlanStatus.SKIPPED
logger.trace("Updater: text-based insertion yields no changes for %s", ctx.path)
return
ctx.views.updated = UpdatedView(lines=new_text.splitlines(keepends=True))
ctx.status.plan = PlanStatus.INSERTED if apply else PlanStatus.PREVIEWED
return
except (ValueError, TypeError, AttributeError) as e:
logger.warning("text-based insertion failed for %s: %s", ctx.path, e)
# --- Insert: line-based fallback ---
insert_index: int = ctx.header_processor.compute_insertion_anchor(original_lines)
if insert_index == NO_LINE_ANCHOR:
ctx.status.plan = PlanStatus.FAILED
reason = f"No line-based insertion anchor for file: {ctx.path}"
ctx.diagnostics.add_error(reason)
ctx.request_halt(reason=reason, at_step=self)
return
# defensive clamp
if insert_index < 0:
insert_index = 0
elif insert_index > len(original_lines):
insert_index = len(original_lines)
# optional whitespace adjustments
try:
header_lines: list[str] = ctx.header_processor.prepare_header_for_insertion(
original_lines=original_lines,
insert_index=insert_index,
rendered_header_lines=rendered_expected_header_lines,
newline_style=ctx.newline_style or "\n",
)
except (ValueError, TypeError, AttributeError) as e:
logger.warning("prepare_header_for_insertion failed for %s: %s", ctx.path, e)
header_lines = rendered_expected_header_lines
# Splice header; if inserting at BOF and the first original line is a BOM-only blank,
# consume it so we don't leave a dangling BOM+blank after the header.
body_start: int = insert_index
if insert_index == 0 and original_lines:
first: str = original_lines[0]
# Consume only a BOM-bearing blank at BOF (e.g., "\ufeff" or "\ufeff\n"),
# but preserve a user-authored plain blank line.
is_bom_blank: bool = (
first.startswith("\ufeff") and first.replace("\ufeff", "").strip() == ""
)
if is_bom_blank:
body_start = 1
logger.trace(
"Updater (line): consuming BOM-only/blank line at BOF after header insertion"
)
new_lines = original_lines[:insert_index] + header_lines + original_lines[body_start:]
# If header occupies the tail and last line is blank, drop all trailing blanks.
new_lines = _drop_trailing_blank_if_header_at_eof(
new_lines, insert_index, len(header_lines)
)
# Canonicalize logically-empty placeholder bodies to keep round-trips stable.
new_lines = _canonicalize_logically_empty_body_after_insert(
lines=new_lines,
insert_index=insert_index,
header_len=len(header_lines),
ctx=ctx,
)
# Prepend BOM if needed
new_lines = _prepend_bom_to_lines_if_needed(new_lines, ctx)
if new_lines == original_lines:
ctx.views.updated = UpdatedView(lines=original_lines)
ctx.status.plan = PlanStatus.SKIPPED
logger.trace("Updater: line-based insertion yields no changes for %s", ctx.path)
return
ctx.views.updated = UpdatedView(lines=new_lines)
ctx.status.plan = PlanStatus.INSERTED if apply else PlanStatus.PREVIEWED
logger.trace("Updated file (line-based):\n%s", "".join(new_lines))
return