Files
SubConverter-Extended/scripts/sync_upstream_parser.py
2026-05-21 10:18:08 +08:00

615 lines
20 KiB
Python

#!/usr/bin/env python3
"""Plan and apply guarded parser syncs from upstream subconverter.
This script intentionally does not trust upstream diffs by default. It plans
candidate commits, lets CI/Copilot classify them, applies only low-risk parser
updates, and records skipped commits so one unsafe upstream commit does not
block later safe ones.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
ROOT_RESOLVED = ROOT.resolve()
SEEN_FILE = ROOT / ".github" / "upstream-subconverter.seen"
APPLIED_FILE = ROOT / ".github" / "upstream-subconverter.applied.json"
SKIPPED_FILE = ROOT / ".github" / "upstream-subconverter.skipped.json"
ALLOWED_AUTO_PATHS = {
"src/parser/subparser.cpp",
"src/parser/subparser.h",
"src/parser/config/proxy.h",
}
REPORT_ONLY_PATHS = {
"src/generator/config/subexport.cpp",
}
PROTECTED_PATHS = {
"src/generator/config/nodemanip.cpp",
"src/generator/config/nodemanip.h",
"src/generator/config/subexport.h",
"src/parser/mihomo_bridge.cpp",
"src/parser/mihomo_bridge.h",
"src/parser/mihomo_schemes.h",
"src/parser/param_compat.h",
"bridge/converter.go",
"bridge/go.mod",
"bridge/go.sum",
}
PROTECTED_PREFIXES = (
"bridge/",
)
SAFE_DECISION = "safe_parser_only"
def git(*args: str, input_text: str | None = None, check: bool = True) -> str:
proc = subprocess.run(
["git", *args],
cwd=ROOT,
input=input_text,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if check and proc.returncode != 0:
raise RuntimeError(
f"git {' '.join(args)} failed with {proc.returncode}\n{proc.stderr}"
)
return proc.stdout
def run(*args: str, check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run(
list(args),
cwd=ROOT,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=check,
)
def utc_now() -> str:
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
def read_json_array(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return []
return data if isinstance(data, list) else []
def write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def read_seen() -> str:
if not SEEN_FILE.exists():
return ""
return SEEN_FILE.read_text(encoding="utf-8").strip()
def resolve_cursor_file(path_text: str) -> Path:
path = Path(path_text)
full = path if path.is_absolute() else ROOT / path
resolved = full.resolve()
try:
resolved.relative_to(ROOT_RESOLVED)
except ValueError as exc:
raise ValueError(f"cursor file must stay inside repository: {path_text}") from exc
return resolved
def display_path(path: Path) -> str:
try:
return path.resolve().relative_to(ROOT_RESOLVED).as_posix()
except ValueError:
return str(path)
def path_is_protected(path: str) -> bool:
return path in PROTECTED_PATHS or any(path.startswith(prefix) for prefix in PROTECTED_PREFIXES)
def commit_files(sha: str) -> list[str]:
output = git("diff-tree", "--no-commit-id", "--name-only", "-r", sha)
return [line.strip() for line in output.splitlines() if line.strip()]
def commit_subject(sha: str) -> str:
return git("show", "-s", "--format=%s", sha).strip()
def commit_patch(sha: str, paths: list[str], max_chars: int = 12000) -> str:
if not paths:
return ""
patch = git("show", "--format=fuller", "--stat", "--patch", sha, "--", *paths)
if len(patch) <= max_chars:
return patch
return patch[:max_chars] + "\n\n[diff truncated]\n"
def classify_commit(sha: str) -> dict[str, Any]:
files = commit_files(sha)
allowed = [path for path in files if path in ALLOWED_AUTO_PATHS]
protected = [path for path in files if path_is_protected(path)]
report_only = [path for path in files if path in REPORT_ONLY_PATHS]
other = [
path
for path in files
if path not in ALLOWED_AUTO_PATHS
and path not in REPORT_ONLY_PATHS
and not path_is_protected(path)
]
if not allowed:
rule_decision = "ignore_no_parser_changes"
safe_by_rules = False
reviewable_by_ai = False
reason = "Commit does not change parser whitelist files."
elif protected:
rule_decision = "skip_protected_path"
safe_by_rules = False
reviewable_by_ai = False
reason = "Commit touches protected project-specific integration paths."
elif report_only or other:
rule_decision = "needs_human_or_ai_report"
safe_by_rules = False
reviewable_by_ai = True
reason = "Commit changes parser files plus non-whitelisted files."
else:
rule_decision = "candidate"
safe_by_rules = True
reviewable_by_ai = True
reason = "Commit changes only parser whitelist files."
return {
"sha": sha,
"short_sha": sha[:12],
"subject": commit_subject(sha),
"files": files,
"allowed_paths": allowed,
"protected_paths": protected,
"report_only_paths": report_only,
"other_paths": other,
"safe_by_rules": safe_by_rules,
"reviewable_by_ai": reviewable_by_ai,
"rule_decision": rule_decision,
"reason": reason,
"patch_excerpt": commit_patch(sha, allowed or files[:10]),
}
def plan(args: argparse.Namespace) -> int:
upstream_head = git("rev-parse", args.upstream_ref).strip()
cursor_file = resolve_cursor_file(args.cursor_file)
manual_since = (args.since or "").strip()
stored_seen = cursor_file.read_text(encoding="utf-8").strip() if cursor_file.exists() else ""
seen = manual_since or stored_seen
bootstrap = False
commits: list[str] = []
all_commits: list[str] = []
if not seen:
bootstrap = True
else:
exists = run("git", "cat-file", "-e", f"{seen}^{{commit}}", check=False)
if exists.returncode != 0:
if manual_since:
raise RuntimeError(f"manual --since commit does not exist: {manual_since}")
bootstrap = True
commits = []
else:
ancestor = run(
"git",
"merge-base",
"--is-ancestor",
seen,
args.upstream_ref,
check=False,
)
if ancestor.returncode != 0:
if manual_since:
raise RuntimeError(
f"manual --since commit is not an ancestor of {args.upstream_ref}: "
f"{manual_since}"
)
bootstrap = True
else:
commits_out = git(
"rev-list",
"--reverse",
"--no-merges",
f"{seen}..{args.upstream_ref}",
)
commits = [
line.strip()
for line in commits_out.splitlines()
if line.strip()
]
all_commits = commits
total_commit_count = len(all_commits)
if args.max_commits > 0:
commits = commits[: args.max_commits]
selected_commit_count = len(commits)
truncated = total_commit_count > selected_commit_count
batch_last_sha = commits[-1] if commits else ""
cursor_update_enabled = not manual_since and not bootstrap
advance_to = ""
if cursor_update_enabled:
if batch_last_sha:
advance_to = batch_last_sha if truncated else upstream_head
elif seen and not bootstrap:
advance_to = upstream_head
candidates = [classify_commit(sha) for sha in commits]
data = {
"generated_at": utc_now(),
"upstream_ref": args.upstream_ref,
"upstream_head": upstream_head,
"cursor_file": display_path(cursor_file),
"cursor_update_enabled": cursor_update_enabled,
"seen": seen,
"stored_seen": stored_seen,
"manual_since": manual_since,
"bootstrap": bootstrap,
"total_commit_count": total_commit_count,
"selected_commit_count": selected_commit_count,
"truncated": truncated,
"batch_last_sha": batch_last_sha,
"advance_to": advance_to,
"allowed_auto_paths": sorted(ALLOWED_AUTO_PATHS),
"protected_paths": sorted(PROTECTED_PATHS),
"protected_prefixes": list(PROTECTED_PREFIXES),
"safe_decision": SAFE_DECISION,
"candidates": candidates,
}
write_json(Path(args.output), data)
write_plan_report(Path(args.report), data)
safe_count = sum(1 for item in candidates if item["safe_by_rules"])
reviewable_count = sum(1 for item in candidates if item["reviewable_by_ai"])
print(
f"Planned {len(candidates)} upstream commits "
f"({safe_count} rule-safe, {reviewable_count} AI-reviewable)."
)
if bootstrap:
print("No seen marker was available; plan bootstrapped without candidates.")
return 0
def write_plan_report(path: Path, data: dict[str, Any]) -> None:
lines = [
"# Upstream Parser Sync Plan",
"",
f"- Generated: {data['generated_at']}",
f"- Upstream ref: `{data['upstream_ref']}`",
f"- Cursor file: `{data['cursor_file']}`",
f"- Seen: `{data['seen'] or 'none'}`",
f"- Stored seen: `{data['stored_seen'] or 'none'}`",
f"- Manual since override: `{data['manual_since'] or 'none'}`",
f"- Upstream head: `{data['upstream_head']}`",
f"- Bootstrap: `{data['bootstrap']}`",
f"- Total pending commits: `{data['total_commit_count']}`",
f"- Selected commits: `{data['selected_commit_count']}`",
f"- Truncated: `{data['truncated']}`",
f"- Advance to: `{data['advance_to'] or 'none'}`",
"",
"## Candidates",
"",
]
if not data["candidates"]:
lines.append("No candidate commits.")
for item in data["candidates"]:
lines.extend(
[
f"### `{item['short_sha']}` {item['subject']}",
"",
f"- Rule decision: `{item['rule_decision']}`",
f"- Safe by rules: `{item['safe_by_rules']}`",
f"- Reviewable by AI: `{item['reviewable_by_ai']}`",
f"- Reason: {item['reason']}",
f"- Files: {', '.join(f'`{path}`' for path in item['files']) or 'none'}",
"",
]
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def load_decisions(path: Path) -> dict[str, dict[str, Any]]:
text = path.read_text(encoding="utf-8").strip()
data = json.loads(text)
decisions = data.get("decisions", data)
if not isinstance(decisions, list):
raise ValueError("Copilot decision file must contain a decisions array.")
result: dict[str, dict[str, Any]] = {}
for item in decisions:
if not isinstance(item, dict) or "sha" not in item:
continue
result[item["sha"]] = item
return result
def snapshot_paths(paths: list[str]) -> dict[str, bytes | None]:
snapshot: dict[str, bytes | None] = {}
for path in paths:
full = ROOT / path
snapshot[path] = full.read_bytes() if full.exists() else None
return snapshot
def restore_snapshot(snapshot: dict[str, bytes | None]) -> None:
for path, content in snapshot.items():
full = ROOT / path
if content is None:
if full.exists():
full.unlink()
else:
full.parent.mkdir(parents=True, exist_ok=True)
full.write_bytes(content)
run("git", "reset", "--", *snapshot.keys(), check=False)
def apply_patch_for_commit(sha: str, paths: list[str]) -> tuple[bool, str]:
patch = git("show", "--format=", "--binary", sha, "--", *paths)
if not patch.strip():
return False, "No patch content for allowed parser files."
check = subprocess.run(
["git", "apply", "-3", "--check", "--whitespace=nowarn", "-"],
cwd=ROOT,
input=patch,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if check.returncode != 0:
return False, check.stderr.strip() or "git apply --check failed."
applied = subprocess.run(
["git", "apply", "-3", "--whitespace=nowarn", "-"],
cwd=ROOT,
input=patch,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if applied.returncode != 0:
return False, applied.stderr.strip() or "git apply failed."
return True, "Applied."
def run_guards() -> tuple[bool, str]:
proc = subprocess.run(
[sys.executable, "scripts/check_sync_guards.py", "--json"],
cwd=ROOT,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
output = (proc.stdout or "") + (proc.stderr or "")
return proc.returncode == 0, output.strip()
def append_state(path: Path, entries: list[dict[str, Any]]) -> None:
if not entries:
return
current = read_json_array(path)
current.extend(entries)
write_json(path, current)
def apply(args: argparse.Namespace) -> int:
plan_data = json.loads(Path(args.plan).read_text(encoding="utf-8"))
decisions = load_decisions(Path(args.decisions))
applied_entries: list[dict[str, Any]] = []
skipped_entries: list[dict[str, Any]] = []
ignored_entries: list[dict[str, Any]] = []
cursor_update_enabled = bool(plan_data.get("cursor_update_enabled"))
cursor_file_text = plan_data.get("cursor_file") or display_path(SEEN_FILE)
cursor_file = resolve_cursor_file(cursor_file_text)
advance_to = plan_data.get("advance_to") or ""
for item in plan_data.get("candidates", []):
sha = item["sha"]
decision = decisions.get(sha)
record_base = {
"sha": sha,
"subject": item["subject"],
"time": utc_now(),
}
if item.get("rule_decision") == "ignore_no_parser_changes":
ignored_entries.append(
{**record_base, "reason": "No parser whitelist files changed."}
)
continue
if not item.get("reviewable_by_ai"):
skipped_entries.append(
{
**record_base,
"reason": item.get("reason", "Rejected by deterministic rules."),
"rule_decision": item.get("rule_decision"),
}
)
continue
if not decision:
skipped_entries.append(
{**record_base, "reason": "No Copilot decision was supplied."}
)
continue
if decision.get("decision") != SAFE_DECISION or decision.get("risk") != "low":
skipped_entries.append(
{
**record_base,
"reason": decision.get("reason", "Copilot did not approve automatic sync."),
"copilot_decision": decision,
}
)
continue
paths = item.get("allowed_paths", [])
if not paths:
skipped_entries.append(
{**record_base, "reason": "No parser whitelist paths were available."}
)
continue
backup = snapshot_paths(paths)
ok, message = apply_patch_for_commit(sha, paths)
if not ok:
restore_snapshot(backup)
skipped_entries.append({**record_base, "reason": message})
continue
guards_ok, guards_output = run_guards()
if not guards_ok:
restore_snapshot(backup)
skipped_entries.append(
{
**record_base,
"reason": "Guard checks failed after applying patch.",
"guard_output": guards_output,
}
)
continue
applied_entries.append(
{
**record_base,
"paths": paths,
"copilot_reason": decision.get("reason", ""),
}
)
append_state(APPLIED_FILE, applied_entries)
append_state(SKIPPED_FILE, skipped_entries)
advanced_to = ""
if cursor_update_enabled and advance_to:
cursor_file.parent.mkdir(parents=True, exist_ok=True)
cursor_file.write_text(advance_to + "\n", encoding="utf-8")
advanced_to = advance_to
result = {
"generated_at": utc_now(),
"upstream_head": plan_data.get("upstream_head"),
"cursor_file": display_path(cursor_file),
"advanced_to": advanced_to,
"truncated": plan_data.get("truncated", False),
"applied": applied_entries,
"skipped": skipped_entries,
"ignored": ignored_entries,
}
write_json(Path(args.result), result)
write_apply_report(Path(args.report), result)
print(
f"Applied {len(applied_entries)} commits; "
f"skipped {len(skipped_entries)} commits; "
f"ignored {len(ignored_entries)} commits."
)
return 0
def write_apply_report(path: Path, result: dict[str, Any]) -> None:
lines = [
"# Upstream Parser Sync Result",
"",
f"- Generated: {result['generated_at']}",
f"- Upstream head: `{result.get('upstream_head') or 'unknown'}`",
f"- Cursor file: `{result.get('cursor_file') or 'unknown'}`",
f"- Advanced to: `{result.get('advanced_to') or 'none'}`",
f"- Truncated: `{result.get('truncated', False)}`",
f"- Applied: {len(result['applied'])}",
f"- Skipped: {len(result['skipped'])}",
f"- Ignored: {len(result.get('ignored', []))}",
"",
"## Applied",
"",
]
if not result["applied"]:
lines.append("No commits were applied.")
for item in result["applied"]:
lines.append(f"- `{item['sha'][:12]}` {item['subject']}")
lines.extend(["", "## Skipped", ""])
if not result["skipped"]:
lines.append("No commits were skipped.")
for item in result["skipped"]:
lines.append(f"- `{item['sha'][:12]}` {item['subject']}: {item['reason']}")
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser()
sub = parser.add_subparsers(dest="command", required=True)
plan_parser = sub.add_parser("plan")
plan_parser.add_argument("--upstream-ref", required=True)
plan_parser.add_argument(
"--cursor-file",
default=".github/upstream-subconverter.seen",
help="state file that stores the last processed upstream commit",
)
plan_parser.add_argument(
"--since",
default="",
help="override the stored seen marker, primarily for dry-run testing",
)
plan_parser.add_argument("--max-commits", type=int, default=20)
plan_parser.add_argument("--output", default="upstream-sync-candidates.json")
plan_parser.add_argument("--report", default="upstream-sync-plan.md")
plan_parser.set_defaults(func=plan)
apply_parser = sub.add_parser("apply")
apply_parser.add_argument("--plan", default="upstream-sync-candidates.json")
apply_parser.add_argument("--decisions", default="upstream-sync-decisions.json")
apply_parser.add_argument("--result", default="upstream-sync-result.json")
apply_parser.add_argument("--report", default="upstream-sync-result.md")
apply_parser.set_defaults(func=apply)
args = parser.parse_args()
try:
return args.func(args)
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())