README.md
Rendering markdown...
#!/usr/bin/env python3
import json
import time
import threading
import hashlib
import os
import sys
from datetime import datetime
class ExploitLogger:
def __init__(self, debug=False):
self.debug = debug
self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
self.log_file = f"exploit_log_{self.session_id}.json"
self.logs = []
self.lock = threading.Lock()
def _log(self, level, message, **kwargs):
timestamp = datetime.now().isoformat()
log_entry = {
"timestamp": timestamp,
"session_id": self.session_id,
"level": level,
"message": message,
"data": kwargs
}
with self.lock:
self.logs.append(log_entry)
if self.debug or level in ["ERROR", "CRITICAL", "SUCCESS"]:
print(f"[{timestamp}] [{level}] {message}")
if kwargs:
for key, value in kwargs.items():
print(f" {key}: {value}")
def info(self, message, **kwargs):
self._log("INFO", message, **kwargs)
def warning(self, message, **kwargs):
self._log("WARNING", message, **kwargs)
def error(self, message, **kwargs):
self._log("ERROR", message, **kwargs)
def critical(self, message, **kwargs):
self._log("CRITICAL", message, **kwargs)
def success(self, message, **kwargs):
self._log("SUCCESS", message, **kwargs)
def debug(self, message, **kwargs):
if self.debug:
self._log("DEBUG", message, **kwargs)
def save_logs(self):
try:
with open(self.log_file, 'w') as f:
json.dump(self.logs, f, indent=2)
return self.log_file
except Exception as e:
print(f"Failed to save logs: {e}")
return None
def get_logs(self, level=None):
if level:
return [log for log in self.logs if log["level"] == level]
return self.logs.copy()
class PerformanceMonitor:
def __init__(self):
self.timings = {}
self.active_timers = {}
self.lock = threading.Lock()
def start_monitoring(self, operation):
with self.lock:
self.active_timers[operation] = time.time()
def end_monitoring(self, operation):
with self.lock:
if operation in self.active_timers:
start_time = self.active_timers.pop(operation)
duration = time.time() - start_time
if operation not in self.timings:
self.timings[operation] = []
self.timings[operation].append(duration)
return duration
return None
def get_average_time(self, operation):
if operation in self.timings and self.timings[operation]:
return sum(self.timings[operation]) / len(self.timings[operation])
return None
def get_total_time(self, operation):
if operation in self.timings:
return sum(self.timings[operation])
return None
def get_operation_count(self, operation):
if operation in self.timings:
return len(self.timings[operation])
return 0
def get_all_stats(self):
stats = {}
for operation, times in self.timings.items():
if times:
stats[operation] = {
"count": len(times),
"total_time": sum(times),
"average_time": sum(times) / len(times),
"min_time": min(times),
"max_time": max(times)
}
return stats
class MetricsCollector:
def __init__(self):
self.metrics = {
"connection_attempts": 0,
"connection_successes": 0,
"kernel_leaks": 0,
"privilege_escalations": 0,
"payload_injections": 0,
"persistence_attempts": 0,
"persistence_successes": 0,
"msr_read_attempts": 0,
"msr_read_successes": 0,
"msr_write_attempts": 0,
"msr_write_successes": 0,
"errors": 0,
"crashes": 0
}
self.lock = threading.Lock()
def increment(self, metric):
with self.lock:
if metric in self.metrics:
self.metrics[metric] += 1
def set_metric(self, metric, value):
with self.lock:
self.metrics[metric] = value
def get_metric(self, metric):
with self.lock:
return self.metrics.get(metric, 0)
def get_all_metrics(self):
with self.lock:
return self.metrics.copy()
def calculate_success_rates(self):
with self.lock:
rates = {}
if self.metrics["connection_attempts"] > 0:
rates["connection_success_rate"] = (
self.metrics["connection_successes"] /
self.metrics["connection_attempts"] * 100
)
if self.metrics["persistence_attempts"] > 0:
rates["persistence_success_rate"] = (
self.metrics["persistence_successes"] /
self.metrics["persistence_attempts"] * 100
)
if self.metrics["msr_read_attempts"] > 0:
rates["msr_read_success_rate"] = (
self.metrics["msr_read_successes"] /
self.metrics["msr_read_attempts"] * 100
)
if self.metrics["msr_write_attempts"] > 0:
rates["msr_write_success_rate"] = (
self.metrics["msr_write_successes"] /
self.metrics["msr_write_attempts"] * 100
)
total_operations = sum([
self.metrics["connection_attempts"],
self.metrics["kernel_leaks"],
self.metrics["privilege_escalations"],
self.metrics["payload_injections"],
self.metrics["persistence_attempts"]
])
if total_operations > 0:
rates["overall_success_rate"] = (
(total_operations - self.metrics["errors"]) /
total_operations * 100
)
return rates
class SystemHealthMonitor:
def __init__(self):
self.health_checks = []
def check_memory_usage(self):
try:
import psutil
memory = psutil.virtual_memory()
return {
"status": "healthy" if memory.percent < 90 else "warning",
"usage_percent": memory.percent,
"available_gb": memory.available / (1024**3)
}
except:
return {"status": "unknown", "error": "psutil not available"}
def check_cpu_usage(self):
try:
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
return {
"status": "healthy" if cpu_percent < 80 else "warning",
"usage_percent": cpu_percent
}
except:
return {"status": "unknown", "error": "psutil not available"}
def check_disk_space(self):
try:
import psutil
disk = psutil.disk_usage('/')
free_percent = (disk.free / disk.total) * 100
return {
"status": "healthy" if free_percent > 10 else "warning",
"free_percent": free_percent,
"free_gb": disk.free / (1024**3)
}
except:
return {"status": "unknown", "error": "psutil not available"}
def check_process_count(self):
try:
import psutil
process_count = len(psutil.pids())
return {
"status": "healthy" if process_count < 500 else "warning",
"process_count": process_count
}
except:
return {"status": "unknown", "error": "psutil not available"}
def run_all_checks(self):
checks = {
"memory": self.check_memory_usage(),
"cpu": self.check_cpu_usage(),
"disk": self.check_disk_space(),
"processes": self.check_process_count()
}
overall_status = "healthy"
for check_name, check_result in checks.items():
if check_result.get("status") == "warning":
overall_status = "warning"
elif check_result.get("status") == "unknown":
if overall_status == "healthy":
overall_status = "unknown"
checks["overall_status"] = overall_status
return checks
class ExploitMonitor:
def __init__(self, debug=False):
self.logger = ExploitLogger(debug)
self.perf_monitor = PerformanceMonitor()
self.metrics = MetricsCollector()
self.health_monitor = SystemHealthMonitor()
self.start_time = None
self.end_time = None
def start_monitoring(self):
self.start_time = time.time()
self.logger.info("Exploit monitoring started", session_id=self.logger.session_id)
def stop_monitoring(self):
self.end_time = time.time()
self.logger.info("Exploit monitoring stopped")
def get_session_duration(self):
if self.start_time and self.end_time:
return self.end_time - self.start_time
elif self.start_time:
return time.time() - self.start_time
return None
def run_health_checks(self):
self.logger.info("Running system health checks")
health_status = self.health_monitor.run_all_checks()
if health_status["overall_status"] == "healthy":
self.logger.success("System health checks passed")
return True
elif health_status["overall_status"] == "warning":
self.logger.warning("System health checks show warnings", health=health_status)
return True
else:
self.logger.error("System health checks failed", health=health_status)
return False
def generate_reliability_report(self):
success_rates = self.metrics.calculate_success_rates()
performance_stats = self.perf_monitor.get_all_stats()
reliability_score = 0
total_weight = 0
weights = {
"connection_success_rate": 0.2,
"msr_read_success_rate": 0.2,
"msr_write_success_rate": 0.2,
"persistence_success_rate": 0.15,
"overall_success_rate": 0.25
}
for metric, weight in weights.items():
if metric in success_rates:
reliability_score += success_rates[metric] * weight
total_weight += weight
if total_weight > 0:
reliability_score = reliability_score / total_weight
else:
reliability_score = 0
return {
"reliability_score": reliability_score,
"success_rates": success_rates,
"performance_stats": performance_stats,
"session_duration": self.get_session_duration()
}
def generate_final_report(self):
reliability_report = self.generate_reliability_report()
report = {
"session_info": {
"session_id": self.logger.session_id,
"start_time": datetime.fromtimestamp(self.start_time).isoformat() if self.start_time else None,
"end_time": datetime.fromtimestamp(self.end_time).isoformat() if self.end_time else None,
"duration_seconds": self.get_session_duration()
},
"metrics": {
"raw_metrics": self.metrics.get_all_metrics(),
"success_rates": reliability_report["success_rates"],
"reliability_score": reliability_report["reliability_score"]
},
"performance": reliability_report["performance_stats"],
"logs": {
"total_logs": len(self.logger.logs),
"errors": len(self.logger.get_logs("ERROR")),
"warnings": len(self.logger.get_logs("WARNING")),
"successes": len(self.logger.get_logs("SUCCESS"))
},
"system_health": self.health_monitor.run_all_checks()
}
report_file = f"exploit_report_{self.logger.session_id}.json"
try:
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
except Exception as e:
self.logger.error("Failed to save final report", error=str(e))
self.logger.save_logs()
return report
def log_exploit_attempt(self, operation, success, details=None):
if success:
self.logger.success(f"{operation} completed successfully", details=details)
else:
self.logger.error(f"{operation} failed", details=details)
self.metrics.increment("errors")
def log_security_event(self, event_type, description, severity="medium"):
self.logger.info(f"Security event: {event_type}",
description=description,
severity=severity,
event_type=event_type)
def log_evasion_result(self, technique, detected, details=None):
if detected:
self.logger.warning(f"Evasion technique detected: {technique}", details=details)
else:
self.logger.success(f"Evasion technique successful: {technique}", details=details)
def get_real_time_stats(self):
return {
"session_duration": self.get_session_duration(),
"current_metrics": self.metrics.get_all_metrics(),
"active_operations": list(self.perf_monitor.active_timers.keys()),
"recent_logs": self.logger.logs[-10:] if len(self.logger.logs) > 10 else self.logger.logs
}
def export_logs_csv(self):
import csv
csv_file = f"exploit_logs_{self.logger.session_id}.csv"
try:
with open(csv_file, 'w', newline='') as f:
if self.logger.logs:
fieldnames = ["timestamp", "session_id", "level", "message"]
if self.logger.logs[0].get("data"):
data_keys = set()
for log in self.logger.logs:
if log.get("data"):
data_keys.update(log["data"].keys())
fieldnames.extend(sorted(data_keys))
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for log in self.logger.logs:
row = {
"timestamp": log["timestamp"],
"session_id": log["session_id"],
"level": log["level"],
"message": log["message"]
}
if log.get("data"):
row.update(log["data"])
writer.writerow(row)
return csv_file
except Exception as e:
self.logger.error("Failed to export CSV", error=str(e))
return None