4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / main.py PY
#!/usr/bin/env python3
"""AAA Security Assessment Framework - CLI Entry Point"""

import asyncio
import os
import typer
import dotenv

dotenv.load_dotenv()

app = typer.Typer(
    name="aaa-security",
    help="AAA Security Assessment Framework for CVE Exploitation Testing",
)


@app.command()
def green(
    host: str = typer.Option(
        default=os.getenv("GREEN_AGENT_HOST", "localhost"),
        help="Host to bind Green Agent",
    ),
    port: int = typer.Option(
        default=int(os.getenv("GREEN_AGENT_PORT", "9001")),
        help="Port for Green Agent",
    ),
):
    """Start the Green Agent (Assessment Orchestrator)."""
    from src.green_agent.agent import start_green_agent

    print(f"Starting Green Agent on {host}:{port}")
    start_green_agent(host=host, port=port)


@app.command()
def white(
    host: str = typer.Option(
        default=os.getenv("WHITE_AGENT_HOST", "localhost"),
        help="Host to bind White Agent",
    ),
    port: int = typer.Option(
        default=int(os.getenv("WHITE_AGENT_PORT", "9002")),
        help="Port for White Agent",
    ),
):
    """Start the White Agent (Task Executor)."""
    from src.white_agent.agent import start_white_agent

    print(f"Starting White Agent on {host}:{port}")
    start_white_agent(host=host, port=port)


@app.command()
def launch(
    task_id: str = typer.Argument(
        default="task-cve-2024-32964-ssrf",
        help="Task ID to evaluate",
    ),
    green_host: str = typer.Option(
        default=os.getenv("GREEN_AGENT_HOST", "localhost"),
        help="Host for Green Agent",
    ),
    green_port: int = typer.Option(
        default=int(os.getenv("GREEN_AGENT_PORT", "9001")),
        help="Port for Green Agent",
    ),
    white_host: str = typer.Option(
        default=os.getenv("WHITE_AGENT_HOST", "localhost"),
        help="Host for White Agent",
    ),
    white_port: int = typer.Option(
        default=int(os.getenv("WHITE_AGENT_PORT", "9002")),
        help="Port for White Agent",
    ),
):
    """Launch complete evaluation with both agents."""
    from src.launcher import launch_evaluation

    asyncio.run(
        launch_evaluation(
            task_id=task_id,
            green_host=green_host,
            green_port=green_port,
            white_host=white_host,
            white_port=white_port,
        )
    )


@app.command()
def tasks():
    """List all available tasks."""
    from src.agentxploit.task_loader import TaskLoader

    loader = TaskLoader()
    task_list = loader.list_tasks()

    print("Available Tasks:")
    print("-" * 60)
    for tid in task_list:
        try:
            summary = loader.get_task_summary(tid)
            print(f"  {tid}")
            print(f"    CVE: {summary['cve']}")
            print(f"    Type: {summary['type']}")
            print(f"    Severity: {summary['severity']}")
            print(f"    Runtime: {summary['runtime']}")
            print()
        except Exception as e:
            print(f"  {tid} (error loading: {e})")
            print()


@app.command()
def info(
    task_id: str = typer.Argument(..., help="Task ID to show info for"),
):
    """Show detailed information about a task."""
    from src.agentxploit.task_loader import TaskLoader

    loader = TaskLoader()
    try:
        config = loader.load_task(task_id)

        print(f"Task: {config.get('task_name', 'Unknown')}")
        print("=" * 60)
        print()

        vuln = config.get("vulnerability", {})
        print("Vulnerability:")
        print(f"  CVE: {vuln.get('cve', 'Unknown')}")
        print(f"  Type: {vuln.get('type', 'Unknown')}")
        print(f"  Severity: {vuln.get('severity', 'Unknown')}")
        print(f"  Summary: {vuln.get('summary', 'Unknown')}")
        print()

        obj = config.get("objective", {})
        print("Objective:")
        print(f"  Goal: {obj.get('goal', 'Unknown')}")
        print(f"  Target: {obj.get('target_endpoint', 'Unknown')}")
        print()

        print(f"Runtime: {config.get('_runtime', 'Unknown')}")
        print(f"Timeout: {config.get('timeout', 300)}s")

    except FileNotFoundError:
        print(f"Error: Task '{task_id}' not found")
        raise typer.Exit(1)


@app.command()
def docker_up(
    task_id: str = typer.Argument(
        default="task-cve-2024-32964-ssrf",
        help="Task ID for Docker environment",
    ),
):
    """Start Docker environment manually (for debugging)."""
    from src.agentxploit.task_loader import TaskLoader
    from src.agentxploit.docker_manager import DockerManager

    loader = TaskLoader()
    try:
        config = loader.load_task(task_id)
    except FileNotFoundError:
        print(f"Error: Task '{task_id}' not found")
        raise typer.Exit(1)

    manager = DockerManager(task_id, config)
    if manager.start_environment():
        print("Docker environment started successfully")
        print(f"Target container: {manager.target_container}")
        print(f"Attacker container: {manager.attacker_container}")
    else:
        print("Failed to start Docker environment")


@app.command()
def docker_down(
    task_id: str = typer.Argument(
        default="task-cve-2024-32964-ssrf",
        help="Task ID for Docker environment",
    ),
):
    """Stop Docker environment manually (for debugging)."""
    from src.agentxploit.task_loader import TaskLoader
    from src.agentxploit.docker_manager import DockerManager

    loader = TaskLoader()
    try:
        config = loader.load_task(task_id)
    except FileNotFoundError:
        print(f"Error: Task '{task_id}' not found")
        raise typer.Exit(1)

    manager = DockerManager(task_id, config)
    if manager.stop_environment():
        print("Docker environment stopped successfully")
    else:
        print("Failed to stop Docker environment")


if __name__ == "__main__":
    app()