README.md
Rendering markdown...
#!/usr/bin/env python3
"""AAA Security Assessment Framework - CLI Entry Point"""
import asyncio
import os
import typer
import dotenv
dotenv.load_dotenv()
app = typer.Typer(
name="aaa-security",
help="AAA Security Assessment Framework for CVE Exploitation Testing",
)
@app.command()
def green(
host: str = typer.Option(
default=os.getenv("GREEN_AGENT_HOST", "localhost"),
help="Host to bind Green Agent",
),
port: int = typer.Option(
default=int(os.getenv("GREEN_AGENT_PORT", "9001")),
help="Port for Green Agent",
),
):
"""Start the Green Agent (Assessment Orchestrator)."""
from src.green_agent.agent import start_green_agent
print(f"Starting Green Agent on {host}:{port}")
start_green_agent(host=host, port=port)
@app.command()
def white(
host: str = typer.Option(
default=os.getenv("WHITE_AGENT_HOST", "localhost"),
help="Host to bind White Agent",
),
port: int = typer.Option(
default=int(os.getenv("WHITE_AGENT_PORT", "9002")),
help="Port for White Agent",
),
):
"""Start the White Agent (Task Executor)."""
from src.white_agent.agent import start_white_agent
print(f"Starting White Agent on {host}:{port}")
start_white_agent(host=host, port=port)
@app.command()
def launch(
task_id: str = typer.Argument(
default="task-cve-2024-32964-ssrf",
help="Task ID to evaluate",
),
green_host: str = typer.Option(
default=os.getenv("GREEN_AGENT_HOST", "localhost"),
help="Host for Green Agent",
),
green_port: int = typer.Option(
default=int(os.getenv("GREEN_AGENT_PORT", "9001")),
help="Port for Green Agent",
),
white_host: str = typer.Option(
default=os.getenv("WHITE_AGENT_HOST", "localhost"),
help="Host for White Agent",
),
white_port: int = typer.Option(
default=int(os.getenv("WHITE_AGENT_PORT", "9002")),
help="Port for White Agent",
),
):
"""Launch complete evaluation with both agents."""
from src.launcher import launch_evaluation
asyncio.run(
launch_evaluation(
task_id=task_id,
green_host=green_host,
green_port=green_port,
white_host=white_host,
white_port=white_port,
)
)
@app.command()
def tasks():
"""List all available tasks."""
from src.agentxploit.task_loader import TaskLoader
loader = TaskLoader()
task_list = loader.list_tasks()
print("Available Tasks:")
print("-" * 60)
for tid in task_list:
try:
summary = loader.get_task_summary(tid)
print(f" {tid}")
print(f" CVE: {summary['cve']}")
print(f" Type: {summary['type']}")
print(f" Severity: {summary['severity']}")
print(f" Runtime: {summary['runtime']}")
print()
except Exception as e:
print(f" {tid} (error loading: {e})")
print()
@app.command()
def info(
task_id: str = typer.Argument(..., help="Task ID to show info for"),
):
"""Show detailed information about a task."""
from src.agentxploit.task_loader import TaskLoader
loader = TaskLoader()
try:
config = loader.load_task(task_id)
print(f"Task: {config.get('task_name', 'Unknown')}")
print("=" * 60)
print()
vuln = config.get("vulnerability", {})
print("Vulnerability:")
print(f" CVE: {vuln.get('cve', 'Unknown')}")
print(f" Type: {vuln.get('type', 'Unknown')}")
print(f" Severity: {vuln.get('severity', 'Unknown')}")
print(f" Summary: {vuln.get('summary', 'Unknown')}")
print()
obj = config.get("objective", {})
print("Objective:")
print(f" Goal: {obj.get('goal', 'Unknown')}")
print(f" Target: {obj.get('target_endpoint', 'Unknown')}")
print()
print(f"Runtime: {config.get('_runtime', 'Unknown')}")
print(f"Timeout: {config.get('timeout', 300)}s")
except FileNotFoundError:
print(f"Error: Task '{task_id}' not found")
raise typer.Exit(1)
@app.command()
def docker_up(
task_id: str = typer.Argument(
default="task-cve-2024-32964-ssrf",
help="Task ID for Docker environment",
),
):
"""Start Docker environment manually (for debugging)."""
from src.agentxploit.task_loader import TaskLoader
from src.agentxploit.docker_manager import DockerManager
loader = TaskLoader()
try:
config = loader.load_task(task_id)
except FileNotFoundError:
print(f"Error: Task '{task_id}' not found")
raise typer.Exit(1)
manager = DockerManager(task_id, config)
if manager.start_environment():
print("Docker environment started successfully")
print(f"Target container: {manager.target_container}")
print(f"Attacker container: {manager.attacker_container}")
else:
print("Failed to start Docker environment")
@app.command()
def docker_down(
task_id: str = typer.Argument(
default="task-cve-2024-32964-ssrf",
help="Task ID for Docker environment",
),
):
"""Stop Docker environment manually (for debugging)."""
from src.agentxploit.task_loader import TaskLoader
from src.agentxploit.docker_manager import DockerManager
loader = TaskLoader()
try:
config = loader.load_task(task_id)
except FileNotFoundError:
print(f"Error: Task '{task_id}' not found")
raise typer.Exit(1)
manager = DockerManager(task_id, config)
if manager.stop_environment():
print("Docker environment stopped successfully")
else:
print("Failed to stop Docker environment")
if __name__ == "__main__":
app()