5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / main.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Symfony Generic Risk Scanner (safe PoC)
Autor: m10sec ([email protected])
      Se enfocan en detección no invasiva y evidencia segura.
"""

import argparse
import json
import socket
import ssl
from urllib.parse import urlparse, urljoin

import requests

try:
    from colorama import init as colorama_init, Fore, Style
    colorama_init()
except Exception:
    # Fallback si no está colorama
    class _F: RESET=""; RED=""; GREEN=""; YELLOW=""; CYAN=""; MAGENTA=""
    class _S: BRIGHT=""; RESET_ALL=""
    Fore=_F(); Style=_S()

def banner():
    ascii_art = f"""
{Fore.CYAN}{Style.BRIGHT}
 ▗▄▄▖▄   ▄ ▄▄▄▄  ▗▞▀▀▘▄▄▄  ▄▄▄▄  ▄   ▄  ▗▄▄▖▗▞▀▘▗▞▀▜▌▄▄▄▄  
▐▌   █   █ █ █ █ ▐▌  █   █ █   █ █   █ ▐▌   ▝▚▄▖▝▚▄▟▌█   █ 
 ▝▀▚▖ ▀▀▀█ █   █ ▐▛▀▘▀▄▄▄▀ █   █  ▀▀▀█  ▝▀▚▖         █   █ 
▗▄▄▞▘▄   █       ▐▌              ▄   █ ▗▄▄▞▘               
      ▀▀▀                         ▀▀▀                      
{Style.RESET_ALL}
    """
    print(ascii_art)
    print(Style.BRIGHT + Fore.CYAN + "="*54)
    print("        Symfony Generic Risk Scanner (safe PoC)")
    print("                 [email protected]")
    print("="*54 + Style.RESET_ALL)
    print("[1] Inyección de cabeceras (CRLF, genérico)")
    print("[2] Host Header Injection (genérico)")
    print("[3] Symfony Profiler expuesto")
    print("[4] Exposición de /_fragment (firma/errores)")
    print("[9] Ejecutar todas las pruebas")
    print("="*54 + "\n")

def norm_url(u: str) -> str:
    if not u.startswith(("http://", "https://")):
        return "http://" + u
    return u.rstrip("/")

def result(ok: bool, msg: str):
    color = Fore.GREEN if ok else Fore.RED
    print(color + ("[+]" if ok else "[-]") + " " + msg + Style.RESET_ALL)

def info(msg: str):
    print(Fore.CYAN + "[*] " + msg + Style.RESET_ALL)

def warn(msg: str):
    print(Fore.YELLOW + "[!] " + msg + Style.RESET_ALL)

def build_session(insecure: bool=False, proxy: str=None, timeout: int=8) -> requests.Session:
    s = requests.Session()
    s.verify = not insecure
    s.headers.update({"User-Agent": "SymfonySafeScanner/1.0"})
    if proxy:
        s.proxies = {"http": proxy, "https": proxy}
    s.timeout = timeout
    return s

def test_crlf_raw(url: str, timeout: int=6):
    """
    Intenta inyectar un header adicional usando CRLF en un header controlado.
    Evidencia: si la respuesta devuelve 'X-CVE-Test: injected' en headers.
    """
    info("Probando inyección de cabeceras (CRLF, genérico)...")
    parsed = urlparse(url)
    host = parsed.hostname
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    path = parsed.path or "/"

    # Header con CRLF para intentar romper la línea y añadir un header nuevo
    evil_header = "example.com%0d%0aX-CVE-Test: injected"
    req_lines = [
        f"GET {path} HTTP/1.1",
        f"Host: {host}",
        f"X-Forwarded-Host: {evil_header}",
        "Connection: close",
        "", ""
    ]
    payload = "\r\n".join(req_lines).encode()

    try:
        s = socket.create_connection((host, port), timeout=timeout)
        if parsed.scheme == "https":
            ctx = ssl.create_default_context()
            s = ctx.wrap_socket(s, server_hostname=host)
        s.sendall(payload)
        data = b""
        while True:
            chunk = s.recv(4096)
            if not chunk: break
            data += chunk
        s.close()
        head = data.split(b"\r\n\r\n", 1)[0].decode("iso-8859-1", errors="ignore")
        if "X-CVE-Test: injected" in head:
            result(True, "Posible CRLF/resp smuggling: header inyectado reflejado.")
            return {"vulnerable": True, "evidence": "X-CVE-Test header found in response"}
        else:
            result(False, "No se observó inyección de header en respuesta.")
            return {"vulnerable": False}
    except Exception as e:
        warn(f"Error en prueba CRLF: {e}")
        return {"error": str(e)}

def test_host_header_injection(url: str, session: requests.Session):
    """
    Cambia Host y observa si el servidor lo refleja (en Location, enlaces o body).
    """
    info("Probando Host Header Injection (genérico)...")
    evil = "evil.com"
    try:
        r = session.get(url, headers={"Host": evil, "X-Forwarded-Host": evil}, allow_redirects=False)
        indicators = []
        loc = r.headers.get("Location", "")
        if evil in loc:
            indicators.append("Location refleja Host malicioso")
        if evil in r.text[:5000]:
            indicators.append("Body refleja Host malicioso (primeros 5KB)")
        if r.status_code in (301,302,303,307,308) and indicators:
            result(True, f"Posible HHI: {', '.join(indicators)} (HTTP {r.status_code}).")
            return {"vulnerable": True, "indicators": indicators, "status": r.status_code}
        elif indicators:
            result(True, f"Posible HHI (sin redirección): {', '.join(indicators)}.")
            return {"vulnerable": True, "indicators": indicators, "status": r.status_code}
        else:
            result(False, "No se observaron reflejos/redirect influenciados por Host.")
            return {"vulnerable": False, "status": r.status_code}
    except Exception as e:
        warn(f"Error en HHI: {e}")
        return {"error": str(e)}

def test_profiler(url: str, session: requests.Session):
    """
    Detecta exposición de Symfony Profiler/WDT.
    """
    info("Probando exposición de Symfony Profiler...")
    endpoints = ["/_profiler", "/_wdt"]
    found = []
    try:
        for ep in endpoints:
            r = session.get(urljoin(url+"/", ep))
            if ("Symfony Profiler" in r.text) or ("Web Debug Toolbar" in r.text) or ("_profiler_search" in r.text):
                found.append(ep)
        if found:
            result(True, f"Profiler expuesto en: {', '.join(found)}")
            return {"exposed": True, "endpoints": found}
        result(False, "No se detectó Profiler/WDT.")
        return {"exposed": False}
    except Exception as e:
        warn(f"Error en Profiler: {e}")
        return {"error": str(e)}

def test_fragment_exposure(url: str, session: requests.Session):
    """
    Prueba SEGURA en /_fragment:
    - No intenta leer archivos ni hacer SSRF.
    - Solo comprueba si el endpoint responde y revela mensajes de firma/errores
      que indiquen superficie de ataque (p.ej., firma faltante).
    """
    info("Probando exposición de /_fragment (seguro, sin exfiltración)...")
    target = urljoin(url+"/", "/_fragment")
    try:
        # Petición mínima con datos inocuos
        r = session.post(target, data={"_path": "controller:index"}, allow_redirects=False)
        txt = (r.text or "")[:1000]
        clues = []
        for needle in ["Invalid signature", "An error occurred", "Fragment", "signature", "URI must be absolute", "Invalid _path"]:
            if needle.lower() in txt.lower():
                clues.append(needle)
        if r.status_code in (400, 403, 404, 500) and clues:
            result(True, f"/_fragment accesible con mensajes diagnósticos: {', '.join(set(clues))}")
            return {"exposed": True, "status": r.status_code, "clues": list(set(clues))}
        elif r.status_code == 200:
            # Responder 200 aquí puede indicar configuración muy laxa
            result(True, "Respuesta 200 en /_fragment (revisar firma/config).")
            return {"exposed": True, "status": r.status_code}
        else:
            result(False, f"/_fragment no parece accesible (HTTP {r.status_code}).")
            return {"exposed": False, "status": r.status_code}
    except Exception as e:
        warn(f"Error en /_fragment: {e}")
        return {"error": str(e)}

def ejecutar_todo(url: str, session: requests.Session):
    print(Fore.MAGENTA + "\n[+] Ejecutando todas las pruebas disponibles...\n" + Style.RESET_ALL)
    summary = {}
    summary["crlf"] = test_crlf_raw(url)
    summary["host_header"] = test_host_header_injection(url, session)
    summary["profiler"] = test_profiler(url, session)
    summary["fragment"] = test_fragment_exposure(url, session)
    print(Fore.MAGENTA + "\n[✓] Escaneo completo.\n" + Style.RESET_ALL)
    return summary

def main():
    banner()
    parser = argparse.ArgumentParser(description="Symfony Generic Risk Scanner (safe PoC)")
    parser.add_argument("url", help="URL completa del sitio (http(s)://...)")
    parser.add_argument("-o","--option", choices=["1","2","3","4","9"], default="9",
                        help="Prueba a ejecutar (1..4) o 9 para todas (default).")
    parser.add_argument("--timeout", type=int, default=8, help="Timeout en segundos (default 8)")
    parser.add_argument("--insecure", action="store_true", help="No verificar TLS")
    parser.add_argument("--proxy", help="Proxy (ej. http://127.0.0.1:8080)")
    parser.add_argument("--json", action="store_true", help="Salida en JSON (además de texto)")
    args = parser.parse_args()

    base = norm_url(args.url)
    session = build_session(insecure=args.insecure, proxy=args.proxy, timeout=args.timeout)

    if args.option == "1":
        out = {"crlf": test_crlf_raw(base, timeout=args.timeout)}
    elif args.option == "2":
        out = {"host_header": test_host_header_injection(base, session)}
    elif args.option == "3":
        out = {"profiler": test_profiler(base, session)}
    elif args.option == "4":
        out = {"fragment": test_fragment_exposure(base, session)}
    else:
        out = ejecutar_todo(base, session)

    if args.json:
        print(json.dumps(out, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()