615 lines
20 KiB
Python
615 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""Plan and apply guarded parser syncs from upstream subconverter.
|
|
|
|
This script intentionally does not trust upstream diffs by default. It plans
|
|
candidate commits, lets CI/Copilot classify them, applies only low-risk parser
|
|
updates, and records skipped commits so one unsafe upstream commit does not
|
|
block later safe ones.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
ROOT_RESOLVED = ROOT.resolve()
|
|
|
|
SEEN_FILE = ROOT / ".github" / "upstream-subconverter.seen"
|
|
APPLIED_FILE = ROOT / ".github" / "upstream-subconverter.applied.json"
|
|
SKIPPED_FILE = ROOT / ".github" / "upstream-subconverter.skipped.json"
|
|
|
|
ALLOWED_AUTO_PATHS = {
|
|
"src/parser/subparser.cpp",
|
|
"src/parser/subparser.h",
|
|
"src/parser/config/proxy.h",
|
|
}
|
|
|
|
REPORT_ONLY_PATHS = {
|
|
"src/generator/config/subexport.cpp",
|
|
}
|
|
|
|
PROTECTED_PATHS = {
|
|
"src/generator/config/nodemanip.cpp",
|
|
"src/generator/config/nodemanip.h",
|
|
"src/generator/config/subexport.h",
|
|
"src/parser/mihomo_bridge.cpp",
|
|
"src/parser/mihomo_bridge.h",
|
|
"src/parser/mihomo_schemes.h",
|
|
"src/parser/param_compat.h",
|
|
"bridge/converter.go",
|
|
"bridge/go.mod",
|
|
"bridge/go.sum",
|
|
}
|
|
|
|
PROTECTED_PREFIXES = (
|
|
"bridge/",
|
|
)
|
|
|
|
SAFE_DECISION = "safe_parser_only"
|
|
|
|
|
|
def git(*args: str, input_text: str | None = None, check: bool = True) -> str:
|
|
proc = subprocess.run(
|
|
["git", *args],
|
|
cwd=ROOT,
|
|
input=input_text,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
if check and proc.returncode != 0:
|
|
raise RuntimeError(
|
|
f"git {' '.join(args)} failed with {proc.returncode}\n{proc.stderr}"
|
|
)
|
|
return proc.stdout
|
|
|
|
|
|
def run(*args: str, check: bool = True) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
list(args),
|
|
cwd=ROOT,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=check,
|
|
)
|
|
|
|
|
|
def utc_now() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
|
|
|
|
|
|
def read_json_array(path: Path) -> list[dict[str, Any]]:
|
|
if not path.exists():
|
|
return []
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return data if isinstance(data, list) else []
|
|
|
|
|
|
def write_json(path: Path, data: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
|
|
|
|
|
|
def read_seen() -> str:
|
|
if not SEEN_FILE.exists():
|
|
return ""
|
|
return SEEN_FILE.read_text(encoding="utf-8").strip()
|
|
|
|
|
|
def resolve_cursor_file(path_text: str) -> Path:
|
|
path = Path(path_text)
|
|
full = path if path.is_absolute() else ROOT / path
|
|
resolved = full.resolve()
|
|
try:
|
|
resolved.relative_to(ROOT_RESOLVED)
|
|
except ValueError as exc:
|
|
raise ValueError(f"cursor file must stay inside repository: {path_text}") from exc
|
|
return resolved
|
|
|
|
|
|
def display_path(path: Path) -> str:
|
|
try:
|
|
return path.resolve().relative_to(ROOT_RESOLVED).as_posix()
|
|
except ValueError:
|
|
return str(path)
|
|
|
|
|
|
def path_is_protected(path: str) -> bool:
|
|
return path in PROTECTED_PATHS or any(path.startswith(prefix) for prefix in PROTECTED_PREFIXES)
|
|
|
|
|
|
def commit_files(sha: str) -> list[str]:
|
|
output = git("diff-tree", "--no-commit-id", "--name-only", "-r", sha)
|
|
return [line.strip() for line in output.splitlines() if line.strip()]
|
|
|
|
|
|
def commit_subject(sha: str) -> str:
|
|
return git("show", "-s", "--format=%s", sha).strip()
|
|
|
|
|
|
def commit_patch(sha: str, paths: list[str], max_chars: int = 12000) -> str:
|
|
if not paths:
|
|
return ""
|
|
patch = git("show", "--format=fuller", "--stat", "--patch", sha, "--", *paths)
|
|
if len(patch) <= max_chars:
|
|
return patch
|
|
return patch[:max_chars] + "\n\n[diff truncated]\n"
|
|
|
|
|
|
def classify_commit(sha: str) -> dict[str, Any]:
|
|
files = commit_files(sha)
|
|
allowed = [path for path in files if path in ALLOWED_AUTO_PATHS]
|
|
protected = [path for path in files if path_is_protected(path)]
|
|
report_only = [path for path in files if path in REPORT_ONLY_PATHS]
|
|
other = [
|
|
path
|
|
for path in files
|
|
if path not in ALLOWED_AUTO_PATHS
|
|
and path not in REPORT_ONLY_PATHS
|
|
and not path_is_protected(path)
|
|
]
|
|
|
|
if not allowed:
|
|
rule_decision = "ignore_no_parser_changes"
|
|
safe_by_rules = False
|
|
reviewable_by_ai = False
|
|
reason = "Commit does not change parser whitelist files."
|
|
elif protected:
|
|
rule_decision = "skip_protected_path"
|
|
safe_by_rules = False
|
|
reviewable_by_ai = False
|
|
reason = "Commit touches protected project-specific integration paths."
|
|
elif report_only or other:
|
|
rule_decision = "needs_human_or_ai_report"
|
|
safe_by_rules = False
|
|
reviewable_by_ai = True
|
|
reason = "Commit changes parser files plus non-whitelisted files."
|
|
else:
|
|
rule_decision = "candidate"
|
|
safe_by_rules = True
|
|
reviewable_by_ai = True
|
|
reason = "Commit changes only parser whitelist files."
|
|
|
|
return {
|
|
"sha": sha,
|
|
"short_sha": sha[:12],
|
|
"subject": commit_subject(sha),
|
|
"files": files,
|
|
"allowed_paths": allowed,
|
|
"protected_paths": protected,
|
|
"report_only_paths": report_only,
|
|
"other_paths": other,
|
|
"safe_by_rules": safe_by_rules,
|
|
"reviewable_by_ai": reviewable_by_ai,
|
|
"rule_decision": rule_decision,
|
|
"reason": reason,
|
|
"patch_excerpt": commit_patch(sha, allowed or files[:10]),
|
|
}
|
|
|
|
|
|
def plan(args: argparse.Namespace) -> int:
|
|
upstream_head = git("rev-parse", args.upstream_ref).strip()
|
|
cursor_file = resolve_cursor_file(args.cursor_file)
|
|
manual_since = (args.since or "").strip()
|
|
stored_seen = cursor_file.read_text(encoding="utf-8").strip() if cursor_file.exists() else ""
|
|
seen = manual_since or stored_seen
|
|
bootstrap = False
|
|
commits: list[str] = []
|
|
all_commits: list[str] = []
|
|
|
|
if not seen:
|
|
bootstrap = True
|
|
else:
|
|
exists = run("git", "cat-file", "-e", f"{seen}^{{commit}}", check=False)
|
|
if exists.returncode != 0:
|
|
if manual_since:
|
|
raise RuntimeError(f"manual --since commit does not exist: {manual_since}")
|
|
bootstrap = True
|
|
commits = []
|
|
else:
|
|
ancestor = run(
|
|
"git",
|
|
"merge-base",
|
|
"--is-ancestor",
|
|
seen,
|
|
args.upstream_ref,
|
|
check=False,
|
|
)
|
|
if ancestor.returncode != 0:
|
|
if manual_since:
|
|
raise RuntimeError(
|
|
f"manual --since commit is not an ancestor of {args.upstream_ref}: "
|
|
f"{manual_since}"
|
|
)
|
|
bootstrap = True
|
|
else:
|
|
commits_out = git(
|
|
"rev-list",
|
|
"--reverse",
|
|
"--no-merges",
|
|
f"{seen}..{args.upstream_ref}",
|
|
)
|
|
commits = [
|
|
line.strip()
|
|
for line in commits_out.splitlines()
|
|
if line.strip()
|
|
]
|
|
all_commits = commits
|
|
|
|
total_commit_count = len(all_commits)
|
|
if args.max_commits > 0:
|
|
commits = commits[: args.max_commits]
|
|
selected_commit_count = len(commits)
|
|
truncated = total_commit_count > selected_commit_count
|
|
batch_last_sha = commits[-1] if commits else ""
|
|
cursor_update_enabled = not manual_since and not bootstrap
|
|
advance_to = ""
|
|
if cursor_update_enabled:
|
|
if batch_last_sha:
|
|
advance_to = batch_last_sha if truncated else upstream_head
|
|
elif seen and not bootstrap:
|
|
advance_to = upstream_head
|
|
|
|
candidates = [classify_commit(sha) for sha in commits]
|
|
data = {
|
|
"generated_at": utc_now(),
|
|
"upstream_ref": args.upstream_ref,
|
|
"upstream_head": upstream_head,
|
|
"cursor_file": display_path(cursor_file),
|
|
"cursor_update_enabled": cursor_update_enabled,
|
|
"seen": seen,
|
|
"stored_seen": stored_seen,
|
|
"manual_since": manual_since,
|
|
"bootstrap": bootstrap,
|
|
"total_commit_count": total_commit_count,
|
|
"selected_commit_count": selected_commit_count,
|
|
"truncated": truncated,
|
|
"batch_last_sha": batch_last_sha,
|
|
"advance_to": advance_to,
|
|
"allowed_auto_paths": sorted(ALLOWED_AUTO_PATHS),
|
|
"protected_paths": sorted(PROTECTED_PATHS),
|
|
"protected_prefixes": list(PROTECTED_PREFIXES),
|
|
"safe_decision": SAFE_DECISION,
|
|
"candidates": candidates,
|
|
}
|
|
|
|
write_json(Path(args.output), data)
|
|
write_plan_report(Path(args.report), data)
|
|
|
|
safe_count = sum(1 for item in candidates if item["safe_by_rules"])
|
|
reviewable_count = sum(1 for item in candidates if item["reviewable_by_ai"])
|
|
print(
|
|
f"Planned {len(candidates)} upstream commits "
|
|
f"({safe_count} rule-safe, {reviewable_count} AI-reviewable)."
|
|
)
|
|
if bootstrap:
|
|
print("No seen marker was available; plan bootstrapped without candidates.")
|
|
return 0
|
|
|
|
|
|
def write_plan_report(path: Path, data: dict[str, Any]) -> None:
|
|
lines = [
|
|
"# Upstream Parser Sync Plan",
|
|
"",
|
|
f"- Generated: {data['generated_at']}",
|
|
f"- Upstream ref: `{data['upstream_ref']}`",
|
|
f"- Cursor file: `{data['cursor_file']}`",
|
|
f"- Seen: `{data['seen'] or 'none'}`",
|
|
f"- Stored seen: `{data['stored_seen'] or 'none'}`",
|
|
f"- Manual since override: `{data['manual_since'] or 'none'}`",
|
|
f"- Upstream head: `{data['upstream_head']}`",
|
|
f"- Bootstrap: `{data['bootstrap']}`",
|
|
f"- Total pending commits: `{data['total_commit_count']}`",
|
|
f"- Selected commits: `{data['selected_commit_count']}`",
|
|
f"- Truncated: `{data['truncated']}`",
|
|
f"- Advance to: `{data['advance_to'] or 'none'}`",
|
|
"",
|
|
"## Candidates",
|
|
"",
|
|
]
|
|
if not data["candidates"]:
|
|
lines.append("No candidate commits.")
|
|
for item in data["candidates"]:
|
|
lines.extend(
|
|
[
|
|
f"### `{item['short_sha']}` {item['subject']}",
|
|
"",
|
|
f"- Rule decision: `{item['rule_decision']}`",
|
|
f"- Safe by rules: `{item['safe_by_rules']}`",
|
|
f"- Reviewable by AI: `{item['reviewable_by_ai']}`",
|
|
f"- Reason: {item['reason']}",
|
|
f"- Files: {', '.join(f'`{path}`' for path in item['files']) or 'none'}",
|
|
"",
|
|
]
|
|
)
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def load_decisions(path: Path) -> dict[str, dict[str, Any]]:
|
|
text = path.read_text(encoding="utf-8").strip()
|
|
data = json.loads(text)
|
|
decisions = data.get("decisions", data)
|
|
if not isinstance(decisions, list):
|
|
raise ValueError("Copilot decision file must contain a decisions array.")
|
|
result: dict[str, dict[str, Any]] = {}
|
|
for item in decisions:
|
|
if not isinstance(item, dict) or "sha" not in item:
|
|
continue
|
|
result[item["sha"]] = item
|
|
return result
|
|
|
|
|
|
def snapshot_paths(paths: list[str]) -> dict[str, bytes | None]:
|
|
snapshot: dict[str, bytes | None] = {}
|
|
for path in paths:
|
|
full = ROOT / path
|
|
snapshot[path] = full.read_bytes() if full.exists() else None
|
|
return snapshot
|
|
|
|
|
|
def restore_snapshot(snapshot: dict[str, bytes | None]) -> None:
|
|
for path, content in snapshot.items():
|
|
full = ROOT / path
|
|
if content is None:
|
|
if full.exists():
|
|
full.unlink()
|
|
else:
|
|
full.parent.mkdir(parents=True, exist_ok=True)
|
|
full.write_bytes(content)
|
|
run("git", "reset", "--", *snapshot.keys(), check=False)
|
|
|
|
|
|
def apply_patch_for_commit(sha: str, paths: list[str]) -> tuple[bool, str]:
|
|
patch = git("show", "--format=", "--binary", sha, "--", *paths)
|
|
if not patch.strip():
|
|
return False, "No patch content for allowed parser files."
|
|
|
|
check = subprocess.run(
|
|
["git", "apply", "-3", "--check", "--whitespace=nowarn", "-"],
|
|
cwd=ROOT,
|
|
input=patch,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
if check.returncode != 0:
|
|
return False, check.stderr.strip() or "git apply --check failed."
|
|
|
|
applied = subprocess.run(
|
|
["git", "apply", "-3", "--whitespace=nowarn", "-"],
|
|
cwd=ROOT,
|
|
input=patch,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
if applied.returncode != 0:
|
|
return False, applied.stderr.strip() or "git apply failed."
|
|
return True, "Applied."
|
|
|
|
|
|
def run_guards() -> tuple[bool, str]:
|
|
proc = subprocess.run(
|
|
[sys.executable, "scripts/check_sync_guards.py", "--json"],
|
|
cwd=ROOT,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
output = (proc.stdout or "") + (proc.stderr or "")
|
|
return proc.returncode == 0, output.strip()
|
|
|
|
|
|
def append_state(path: Path, entries: list[dict[str, Any]]) -> None:
|
|
if not entries:
|
|
return
|
|
current = read_json_array(path)
|
|
current.extend(entries)
|
|
write_json(path, current)
|
|
|
|
|
|
def apply(args: argparse.Namespace) -> int:
|
|
plan_data = json.loads(Path(args.plan).read_text(encoding="utf-8"))
|
|
decisions = load_decisions(Path(args.decisions))
|
|
applied_entries: list[dict[str, Any]] = []
|
|
skipped_entries: list[dict[str, Any]] = []
|
|
ignored_entries: list[dict[str, Any]] = []
|
|
cursor_update_enabled = bool(plan_data.get("cursor_update_enabled"))
|
|
cursor_file_text = plan_data.get("cursor_file") or display_path(SEEN_FILE)
|
|
cursor_file = resolve_cursor_file(cursor_file_text)
|
|
advance_to = plan_data.get("advance_to") or ""
|
|
|
|
for item in plan_data.get("candidates", []):
|
|
sha = item["sha"]
|
|
decision = decisions.get(sha)
|
|
record_base = {
|
|
"sha": sha,
|
|
"subject": item["subject"],
|
|
"time": utc_now(),
|
|
}
|
|
|
|
if item.get("rule_decision") == "ignore_no_parser_changes":
|
|
ignored_entries.append(
|
|
{**record_base, "reason": "No parser whitelist files changed."}
|
|
)
|
|
continue
|
|
|
|
if not item.get("reviewable_by_ai"):
|
|
skipped_entries.append(
|
|
{
|
|
**record_base,
|
|
"reason": item.get("reason", "Rejected by deterministic rules."),
|
|
"rule_decision": item.get("rule_decision"),
|
|
}
|
|
)
|
|
continue
|
|
|
|
if not decision:
|
|
skipped_entries.append(
|
|
{**record_base, "reason": "No Copilot decision was supplied."}
|
|
)
|
|
continue
|
|
|
|
if decision.get("decision") != SAFE_DECISION or decision.get("risk") != "low":
|
|
skipped_entries.append(
|
|
{
|
|
**record_base,
|
|
"reason": decision.get("reason", "Copilot did not approve automatic sync."),
|
|
"copilot_decision": decision,
|
|
}
|
|
)
|
|
continue
|
|
|
|
paths = item.get("allowed_paths", [])
|
|
if not paths:
|
|
skipped_entries.append(
|
|
{**record_base, "reason": "No parser whitelist paths were available."}
|
|
)
|
|
continue
|
|
backup = snapshot_paths(paths)
|
|
ok, message = apply_patch_for_commit(sha, paths)
|
|
if not ok:
|
|
restore_snapshot(backup)
|
|
skipped_entries.append({**record_base, "reason": message})
|
|
continue
|
|
|
|
guards_ok, guards_output = run_guards()
|
|
if not guards_ok:
|
|
restore_snapshot(backup)
|
|
skipped_entries.append(
|
|
{
|
|
**record_base,
|
|
"reason": "Guard checks failed after applying patch.",
|
|
"guard_output": guards_output,
|
|
}
|
|
)
|
|
continue
|
|
|
|
applied_entries.append(
|
|
{
|
|
**record_base,
|
|
"paths": paths,
|
|
"copilot_reason": decision.get("reason", ""),
|
|
}
|
|
)
|
|
|
|
append_state(APPLIED_FILE, applied_entries)
|
|
append_state(SKIPPED_FILE, skipped_entries)
|
|
|
|
advanced_to = ""
|
|
if cursor_update_enabled and advance_to:
|
|
cursor_file.parent.mkdir(parents=True, exist_ok=True)
|
|
cursor_file.write_text(advance_to + "\n", encoding="utf-8")
|
|
advanced_to = advance_to
|
|
|
|
result = {
|
|
"generated_at": utc_now(),
|
|
"upstream_head": plan_data.get("upstream_head"),
|
|
"cursor_file": display_path(cursor_file),
|
|
"advanced_to": advanced_to,
|
|
"truncated": plan_data.get("truncated", False),
|
|
"applied": applied_entries,
|
|
"skipped": skipped_entries,
|
|
"ignored": ignored_entries,
|
|
}
|
|
write_json(Path(args.result), result)
|
|
write_apply_report(Path(args.report), result)
|
|
|
|
print(
|
|
f"Applied {len(applied_entries)} commits; "
|
|
f"skipped {len(skipped_entries)} commits; "
|
|
f"ignored {len(ignored_entries)} commits."
|
|
)
|
|
return 0
|
|
|
|
|
|
def write_apply_report(path: Path, result: dict[str, Any]) -> None:
|
|
lines = [
|
|
"# Upstream Parser Sync Result",
|
|
"",
|
|
f"- Generated: {result['generated_at']}",
|
|
f"- Upstream head: `{result.get('upstream_head') or 'unknown'}`",
|
|
f"- Cursor file: `{result.get('cursor_file') or 'unknown'}`",
|
|
f"- Advanced to: `{result.get('advanced_to') or 'none'}`",
|
|
f"- Truncated: `{result.get('truncated', False)}`",
|
|
f"- Applied: {len(result['applied'])}",
|
|
f"- Skipped: {len(result['skipped'])}",
|
|
f"- Ignored: {len(result.get('ignored', []))}",
|
|
"",
|
|
"## Applied",
|
|
"",
|
|
]
|
|
if not result["applied"]:
|
|
lines.append("No commits were applied.")
|
|
for item in result["applied"]:
|
|
lines.append(f"- `{item['sha'][:12]}` {item['subject']}")
|
|
|
|
lines.extend(["", "## Skipped", ""])
|
|
if not result["skipped"]:
|
|
lines.append("No commits were skipped.")
|
|
for item in result["skipped"]:
|
|
lines.append(f"- `{item['sha'][:12]}` {item['subject']}: {item['reason']}")
|
|
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
plan_parser = sub.add_parser("plan")
|
|
plan_parser.add_argument("--upstream-ref", required=True)
|
|
plan_parser.add_argument(
|
|
"--cursor-file",
|
|
default=".github/upstream-subconverter.seen",
|
|
help="state file that stores the last processed upstream commit",
|
|
)
|
|
plan_parser.add_argument(
|
|
"--since",
|
|
default="",
|
|
help="override the stored seen marker, primarily for dry-run testing",
|
|
)
|
|
plan_parser.add_argument("--max-commits", type=int, default=20)
|
|
plan_parser.add_argument("--output", default="upstream-sync-candidates.json")
|
|
plan_parser.add_argument("--report", default="upstream-sync-plan.md")
|
|
plan_parser.set_defaults(func=plan)
|
|
|
|
apply_parser = sub.add_parser("apply")
|
|
apply_parser.add_argument("--plan", default="upstream-sync-candidates.json")
|
|
apply_parser.add_argument("--decisions", default="upstream-sync-decisions.json")
|
|
apply_parser.add_argument("--result", default="upstream-sync-result.json")
|
|
apply_parser.add_argument("--report", default="upstream-sync-result.md")
|
|
apply_parser.set_defaults(func=apply)
|
|
|
|
args = parser.parse_args()
|
|
try:
|
|
return args.func(args)
|
|
except Exception as exc:
|
|
print(f"error: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|