diff options
Diffstat (limited to 'archived/projt-launcher/scripts/patch_maintainer_emails.py')
| -rwxr-xr-x | archived/projt-launcher/scripts/patch_maintainer_emails.py | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/archived/projt-launcher/scripts/patch_maintainer_emails.py b/archived/projt-launcher/scripts/patch_maintainer_emails.py new file mode 100755 index 0000000000..a86f2e9e7b --- /dev/null +++ b/archived/projt-launcher/scripts/patch_maintainer_emails.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +import argparse +import re +import shlex +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path + +EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") +DIFF_GIT_RE = re.compile(r"^diff --git\s+(\S+)\s+(\S+)$") +PLUS_RE = re.compile(r"^\+\+\+\s+b/(.+)$") + + +@dataclass +class CodeownersRule: + pattern: str + owners: list[str] + matcher_kind: str + matcher: re.Pattern[str] + + +@dataclass +class MaintainerRecord: + aliases: set[str] + emails: set[str] + paths: list[str] + + +ALIAS_KEYS = {"github", "user", "username", "nick", "alias", "handle", "name"} + + +def normalize_path(path: str) -> str: + p = path.strip() + if p.startswith("\"") and p.endswith("\"") and len(p) >= 2: + p = p[1:-1] + if p.startswith("./"): + p = p[2:] + return p + + +def normalize_alias(token: str) -> str: + out = token.strip().lower() + if out.startswith("@"): + out = out[1:] + if "/" in out: + out = out.split("/", 1)[1] + out = re.sub(r"[^a-z0-9._-]+", "", out) + return out + + +def glob_to_regex(glob: str) -> str: + parts: list[str] = [] + i = 0 + while i < len(glob): + ch = glob[i] + if ch == "*": + if i + 1 < len(glob) and glob[i + 1] == "*": + while i + 1 < len(glob) and glob[i + 1] == "*": + i += 1 + parts.append(".*") + else: + parts.append("[^/]*") + elif ch == "?": + parts.append("[^/]") + else: + parts.append(re.escape(ch)) + i += 1 + return "".join(parts) + + +def parse_codeowners_line(line: str) -> tuple[str, list[str]] | None: + payload = line.split("#", 1)[0].strip() + if not payload: + return None + + try: + tokens = shlex.split(payload) + except ValueError: + tokens = payload.split() + + if len(tokens) < 2: + return None + + return tokens[0], tokens[1:] + + +def compile_rule(pattern: str, owners: list[str]) -> CodeownersRule: + pat = pattern + if pat.endswith("/"): + pat += "**" + + if pat.startswith("/"): + pat = pat[1:] + + if "/" in pat: + regex = re.compile(r"^" + glob_to_regex(pat) + r"$") + return CodeownersRule(pattern=pattern, owners=owners, matcher_kind="path", matcher=regex) + + regex = re.compile(r"^" + glob_to_regex(pat) + r"$") + return CodeownersRule(pattern=pattern, owners=owners, matcher_kind="component", matcher=regex) + + +def parse_codeowners(path: Path) -> list[CodeownersRule]: + rules: list[CodeownersRule] = [] + for raw_line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): + parsed = parse_codeowners_line(raw_line) + if not parsed: + continue + pattern, owners = parsed + rules.append(compile_rule(pattern, owners)) + return rules + + +def match_rule(path: str, rule: CodeownersRule) -> bool: + if rule.matcher_kind == "path": + return bool(rule.matcher.fullmatch(path)) + + parts = [p for p in path.split("/") if p] + return any(rule.matcher.fullmatch(part) for part in parts) + + +def owners_for_file(path: str, rules: list[CodeownersRule]) -> list[str]: + matched: list[str] = [] + for rule in rules: + if match_rule(path, rule): + matched = rule.owners + return matched + + +def changed_files_from_patch_text(text: str) -> list[str]: + files: dict[str, None] = {} + + for line in text.splitlines(): + m = DIFF_GIT_RE.match(line) + if m: + left = m.group(1) + right = m.group(2) + if left.startswith("a/"): + left = left[2:] + if right.startswith("b/"): + right = right[2:] + picked = right if right != "/dev/null" else left + if picked and picked != "/dev/null": + files[normalize_path(picked)] = None + continue + + m = PLUS_RE.match(line) + if m: + picked = normalize_path(m.group(1)) + if picked and picked != "/dev/null": + files[picked] = None + + return list(files.keys()) + + +def changed_files_from_patch_file(patch_path: str) -> list[str]: + if patch_path == "-": + return changed_files_from_patch_text(sys.stdin.read()) + + text = Path(patch_path).read_text(encoding="utf-8", errors="ignore") + return changed_files_from_patch_text(text) + + +def changed_files_from_patch_files(patch_paths: list[str]) -> list[str]: + if not patch_paths: + return [] + + if len(patch_paths) > 1 and "-" in patch_paths: + raise RuntimeError("'-' stdin patch cannot be combined with other patch files") + + files: dict[str, None] = {} + for patch_path in patch_paths: + for path in changed_files_from_patch_file(patch_path): + files[path] = None + return list(files.keys()) + + +def changed_files_from_git_range(git_range: str) -> list[str]: + result = subprocess.run( + ["git", "diff", "--name-only", git_range], + check=False, + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(result.stderr.strip() or "git diff failed") + + files = [normalize_path(line) for line in result.stdout.splitlines() if line.strip()] + return files + + +def aliases_from_maintainer_file(content: str, stem: str) -> set[str]: + aliases: set[str] = set() + base = normalize_alias(stem) + if base: + aliases.add(base) + + for line in content.splitlines(): + if ":" not in line: + continue + key, value = line.split(":", 1) + key_norm = key.strip().lower() + if key_norm not in ALIAS_KEYS: + continue + for token in re.split(r"[\s,]+", value.strip()): + norm = normalize_alias(token) + if norm: + aliases.add(norm) + for mention in re.findall(r"@[A-Za-z0-9._-]+", value): + norm = normalize_alias(mention) + if norm: + aliases.add(norm) + + return aliases + + +def parse_maintainers_records(path: Path) -> list[MaintainerRecord]: + records: list[MaintainerRecord] = [] + current_aliases: set[str] | None = None + current_emails: set[str] | None = None + current_paths: list[str] | None = None + + def flush_current() -> None: + nonlocal current_aliases, current_emails, current_paths + if current_aliases is None or current_emails is None or current_paths is None: + return + if not current_emails: + current_aliases = None + current_emails = None + current_paths = None + return + if not current_paths: + current_paths = ["**"] + records.append( + MaintainerRecord(aliases=set(current_aliases), emails=set(current_emails), paths=list(current_paths)) + ) + current_aliases = None + current_emails = None + current_paths = None + + for raw_line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): + line = raw_line.strip() + if not line: + continue + + if line.startswith("[") and line.endswith("]"): + flush_current() + current_aliases = set() + current_emails = set() + current_paths = [] + section_name = line[1:-1].strip() + section_alias = normalize_alias(section_name) + if section_alias: + current_aliases.add(section_alias) + continue + + if line.startswith("#"): + continue + + if current_aliases is None or current_emails is None or current_paths is None: + continue + + payload = line.split("#", 1)[0].strip() + if ":" not in payload: + continue + + key, value = payload.split(":", 1) + key_norm = key.strip().lower() + value = value.strip() + if not value: + continue + + for email in EMAIL_RE.findall(value): + current_emails.add(email.lower()) + + if key_norm in {"path", "paths"}: + for token in re.split(r"[\s,]+", value): + candidate = token.strip() + if candidate: + current_paths.append(candidate) + + if key_norm in ALIAS_KEYS: + for token in re.split(r"[\s,]+", value): + alias = normalize_alias(token) + if alias: + current_aliases.add(alias) + for mention in re.findall(r"@[A-Za-z0-9._-]+", value): + alias = normalize_alias(mention) + if alias: + current_aliases.add(alias) + + flush_current() + return records + + +def rules_from_maintainers_file(path: Path) -> list[CodeownersRule]: + if not path.exists() or not path.is_file(): + return [] + + rules: list[CodeownersRule] = [] + for record in parse_maintainers_records(path): + owners = [f"@{alias}" for alias in sorted(record.aliases)] + if not owners: + owners = sorted(record.emails) + for pattern in record.paths: + rules.append(compile_rule(pattern, owners)) + + return rules + + +def build_alias_email_index(maintainers_source: Path) -> dict[str, set[str]]: + index: dict[str, set[str]] = {} + + if not maintainers_source.exists(): + return index + + if maintainers_source.is_file(): + for record in parse_maintainers_records(maintainers_source): + for alias in record.aliases: + index.setdefault(alias, set()).update(record.emails) + return index + + if not maintainers_source.is_dir(): + return index + + for entry in sorted(maintainers_source.rglob("*.txt")): + text = entry.read_text(encoding="utf-8", errors="ignore") + emails = {email.lower() for email in EMAIL_RE.findall(text)} + if emails: + aliases = aliases_from_maintainer_file(text, entry.stem) + for alias in aliases: + index.setdefault(alias, set()).update(emails) + + return index + + +def resolve_owner_emails(owner: str, alias_index: dict[str, set[str]]) -> set[str]: + direct = {email.lower() for email in EMAIL_RE.findall(owner)} + if direct: + return direct + + emails: set[str] = set() + norm = normalize_alias(owner) + if norm in alias_index: + emails.update(alias_index[norm]) + + if owner.startswith("@") and "/" in owner: + tail_norm = normalize_alias(owner.split("/", 1)[1]) + if tail_norm in alias_index: + emails.update(alias_index[tail_norm]) + + return emails + + +def choose_codeowners(explicit: str | None) -> Path | None: + if explicit: + return Path(explicit) + + for candidate in (Path(".github/CODEOWNERS"), Path("CODEOWNERS")): + if candidate.exists(): + return candidate + + return None + + +def main(argv: list[str]) -> int: + parser = argparse.ArgumentParser( + description="Find maintainer emails impacted by a patch using CODEOWNERS-like matching." + ) + source_group = parser.add_mutually_exclusive_group(required=True) + source_group.add_argument( + "--patch", + nargs="+", + default=None, + help="One or more patch file paths. Use '-' to read patch from stdin.", + ) + source_group.add_argument("--git-range", default=None, help="Git diff range, e.g. HEAD~1..HEAD") + parser.add_argument("--codeowners", default=None, help="Path to CODEOWNERS file") + parser.add_argument( + "--maintainers-dir", + default="MAINTAINERS", + help="Maintainer source path (directory with *.txt files or a MAINTAINERS file)", + ) + parser.add_argument("--show-details", action="store_true", help="Print per-file owner and email mapping") + parser.add_argument( + "--strict-unresolved", + action="store_true", + help="Exit with code 2 if at least one owner cannot be resolved to an email", + ) + args = parser.parse_args(argv) + + maintainers_source = Path(args.maintainers_dir) + codeowners_path = choose_codeowners(args.codeowners) + if codeowners_path is not None: + if not codeowners_path.exists(): + print(f"error: CODEOWNERS file not found: {codeowners_path}", file=sys.stderr) + return 1 + rules = parse_codeowners(codeowners_path) + if not rules: + print(f"error: no usable rules found in {codeowners_path}", file=sys.stderr) + return 1 + else: + rules = rules_from_maintainers_file(maintainers_source) + if not rules: + print( + f"error: no ownership rules found. Provide --codeowners or a valid MAINTAINERS file: {maintainers_source}", + file=sys.stderr, + ) + return 1 + + try: + if args.git_range: + changed_files = changed_files_from_git_range(args.git_range) + else: + changed_files = changed_files_from_patch_files(args.patch) + except Exception as exc: + print(f"error: {exc}", file=sys.stderr) + return 1 + + if not changed_files: + return 0 + + alias_index = build_alias_email_index(maintainers_source) + + all_emails: set[str] = set() + unresolved: set[str] = set() + unowned_files: list[str] = [] + per_file: list[tuple[str, list[str], list[str]]] = [] + + for changed in changed_files: + owners = owners_for_file(changed, rules) + file_emails: set[str] = set() + if not owners: + unowned_files.append(changed) + + for owner in owners: + resolved = resolve_owner_emails(owner, alias_index) + if resolved: + file_emails.update(resolved) + elif not EMAIL_RE.search(owner): + unresolved.add(owner) + + sorted_file_emails = sorted(file_emails) + sorted_owners = sorted(set(owners)) + per_file.append((changed, sorted_owners, sorted_file_emails)) + all_emails.update(file_emails) + + if args.show_details: + for changed, owners, emails in per_file: + owners_text = ", ".join(owners) if owners else "-" + emails_text = ", ".join(emails) if emails else "-" + print(f"{changed}\towners={owners_text}\temails={emails_text}") + + for email in sorted(all_emails): + print(email) + + if unowned_files: + preview = ", ".join(unowned_files[:5]) + if len(unowned_files) > 5: + preview += ", ..." + print( + f"warning: no ownership rule match for {len(unowned_files)} changed file(s): {preview}", + file=sys.stderr, + ) + + if not all_emails: + print( + "warning: no maintainer emails resolved. Check ownership rules and MAINTAINERS aliases.", + file=sys.stderr, + ) + + if unresolved: + print( + "warning: unresolved owners: " + ", ".join(sorted(unresolved)), + file=sys.stderr, + ) + + if args.strict_unresolved and (unresolved or unowned_files or not all_emails): + return 2 + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) |
