README.md
Rendering markdown...
#!/usr/bin/env python3
"""
analyze_triple_cve.py - Analyze Triple CVE Covert Channel Traffic
This script analyzes pcap files captured from the triple CVE covert channel
and generates detailed visualizations of:
- CVE-2023-1206: IPv6 hash collision trigger bursts
- CVE-2025-40040: KSM timing-based key agreement sync
- Phase 3 signals for CVE-2024-49882 hugepage transfer
Usage:
python3 analyze_triple_cve.py capture.pcap
python3 analyze_triple_cve.py capture.pcap --output analysis.png
"""
import sys
import os
import subprocess
import argparse
from pathlib import Path
try:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
except ImportError as e:
print(f"Missing dependency: {e}")
print("Install with: pip3 install pandas matplotlib numpy")
sys.exit(1)
# Magic numbers (little-endian in packet data)
TRIGGER_MAGIC = ['0612feca', 'cafe1206'] # 0xCAFE1206
SYNC_MAGIC = ['4000adde', 'dead0040'] # 0xDEAD0040
MSG_MAGIC = ['3713dec0', 'c0de1337'] # 0xC0DE1337
# Ports
TRIGGER_PORT = 31336
SYNC_PORT = 31337
def extract_packets_from_pcap(pcap_file):
"""Extract packet data from pcap using tshark."""
cmd = [
'tshark', '-r', pcap_file, '-T', 'fields',
'-e', 'frame.number',
'-e', 'frame.time_relative',
'-e', 'frame.time_delta',
'-e', 'ip.src',
'-e', 'ip.dst',
'-e', 'udp.srcport',
'-e', 'udp.dstport',
'-e', 'udp.length',
'-e', 'data.data',
'-E', 'header=y',
'-E', 'separator=,'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse CSV output
lines = result.stdout.strip().split('\n')
if len(lines) < 2:
return pd.DataFrame()
# Create DataFrame
import io
df = pd.read_csv(io.StringIO(result.stdout))
df.columns = ['frame_num', 'time_rel', 'time_delta', 'src_ip', 'dst_ip',
'src_port', 'dst_port', 'length', 'data']
return df
except subprocess.CalledProcessError as e:
print(f"tshark error: {e.stderr}")
return pd.DataFrame()
except FileNotFoundError:
print("tshark not found. Install with: sudo apt install wireshark-cli")
sys.exit(1)
def classify_packet(row):
"""Classify packet based on magic numbers and ports."""
data = str(row.get('data', '')).lower().replace(':', '')
dst_port = row.get('dst_port', 0)
# Check magic numbers
for magic in TRIGGER_MAGIC:
if magic in data:
return 'TRIGGER', 'CVE-2023-1206'
for magic in SYNC_MAGIC:
if magic in data:
# Check if it's key agreement (rounds 0-255) or phase3 signal (0xFF)
return 'SYNC', 'Key Agreement'
for magic in MSG_MAGIC:
if magic in data:
return 'MESSAGE', 'CVE-2024-49882'
# Fallback to port-based classification
if dst_port == TRIGGER_PORT:
return 'TRIGGER', 'CVE-2023-1206'
elif dst_port == SYNC_PORT:
return 'SYNC', 'Key Agreement'
return 'OTHER', 'Unknown'
def extract_sync_round(row):
"""Extract round number from sync packet data."""
data = str(row.get('data', '')).lower().replace(':', '')
# Sync packet format: magic(4) + round(1) + value(1)
# In hex: 8 chars magic + 2 chars round + 2 chars value
for magic in SYNC_MAGIC:
idx = data.find(magic)
if idx >= 0:
try:
round_hex = data[idx + 8:idx + 10]
return int(round_hex, 16)
except:
pass
return -1
def analyze_phases(df):
"""Analyze the three phases of communication."""
phases = {}
# Phase 1: Trigger detection
trigger_df = df[df['type'] == 'TRIGGER']
if len(trigger_df) > 0:
phases['trigger'] = {
'start': trigger_df['time_rel'].min(),
'end': trigger_df['time_rel'].max(),
'count': len(trigger_df),
'duration_ms': (trigger_df['time_rel'].max() - trigger_df['time_rel'].min()) * 1000,
'rate_pps': len(trigger_df) / max(trigger_df['time_rel'].max() - trigger_df['time_rel'].min(), 0.001)
}
# Phase 2: Key agreement
sync_df = df[df['type'] == 'SYNC']
if len(sync_df) > 0:
phases['key_agreement'] = {
'start': sync_df['time_rel'].min(),
'end': sync_df['time_rel'].max(),
'count': len(sync_df),
'duration_s': sync_df['time_rel'].max() - sync_df['time_rel'].min(),
'rounds_detected': sync_df['round'].nunique() if 'round' in sync_df else 0
}
return phases
def plot_analysis(df, output_file, phases):
"""Generate comprehensive visualization."""
fig = plt.figure(figsize=(18, 14))
fig.suptitle('Triple CVE Covert Channel Analysis\n' +
'CVE-2023-1206 (Trigger) → CVE-2025-40040 (Key Agreement) → CVE-2024-49882 (Data)',
fontsize=14, fontweight='bold')
colors = {
'TRIGGER': '#FF4444',
'SYNC': '#4444FF',
'MESSAGE': '#44AA44',
'OTHER': '#888888'
}
# 1. Full timeline
ax1 = fig.add_subplot(4, 2, 1)
for ptype in df['type'].unique():
mask = df['type'] == ptype
y_val = {'TRIGGER': 3, 'SYNC': 2, 'MESSAGE': 1, 'OTHER': 0}.get(ptype, 0)
ax1.scatter(df[mask]['time_rel'], [y_val] * sum(mask),
c=colors.get(ptype, '#888888'), label=ptype, alpha=0.6, s=30)
ax1.set_xlabel('Time (seconds)')
ax1.set_ylabel('Packet Type')
ax1.set_yticks([0, 1, 2, 3])
ax1.set_yticklabels(['Other', 'Message', 'Sync', 'Trigger'])
ax1.set_title('Complete Packet Timeline')
ax1.legend(loc='upper right')
ax1.grid(True, alpha=0.3)
# Add phase markers
if 'trigger' in phases:
ax1.axvspan(phases['trigger']['start'], phases['trigger']['end'],
alpha=0.2, color='red', label='Phase 1')
if 'key_agreement' in phases:
ax1.axvspan(phases['key_agreement']['start'], phases['key_agreement']['end'],
alpha=0.2, color='blue', label='Phase 2')
# 2. Packet rate histogram
ax2 = fig.add_subplot(4, 2, 2)
max_time = df['time_rel'].max()
bins = np.arange(0, max_time + 0.05, 0.05) # 50ms bins
for ptype in ['TRIGGER', 'SYNC']:
mask = df['type'] == ptype
if sum(mask) > 0:
ax2.hist(df[mask]['time_rel'], bins=bins, alpha=0.7,
label=ptype, color=colors.get(ptype))
ax2.set_xlabel('Time (seconds)')
ax2.set_ylabel('Packets per 50ms')
ax2.set_title('Packet Rate - CVE-2023-1206 Burst Detection')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. Trigger burst detail
ax3 = fig.add_subplot(4, 2, 3)
trigger_df = df[df['type'] == 'TRIGGER']
if len(trigger_df) > 1:
times = trigger_df['time_rel'].values
# Zoom to trigger period
ax3.scatter(range(len(times)), (times - times.min()) * 1000,
c='red', s=10, alpha=0.6)
ax3.set_xlabel('Packet Number')
ax3.set_ylabel('Time from first trigger (ms)')
ax3.set_title(f'CVE-2023-1206 Trigger Burst Detail ({len(trigger_df)} packets)')
# Add burst rate annotation
if len(times) > 1:
duration = times.max() - times.min()
rate = len(times) / duration if duration > 0 else 0
ax3.annotate(f'Rate: {rate:.0f} pkt/s\nDuration: {duration*1000:.1f}ms',
xy=(0.95, 0.95), xycoords='axes fraction',
ha='right', va='top', fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat'))
else:
ax3.text(0.5, 0.5, 'No trigger packets', ha='center', va='center')
ax3.grid(True, alpha=0.3)
# 4. Trigger inter-arrival times
ax4 = fig.add_subplot(4, 2, 4)
if len(trigger_df) > 1:
times = trigger_df['time_rel'].values
inter_arrival = np.diff(times) * 1000000 # microseconds
ax4.hist(inter_arrival, bins=50, color='red', alpha=0.7, edgecolor='darkred')
ax4.set_xlabel('Inter-arrival Time (μs)')
ax4.set_ylabel('Count')
ax4.set_title('Trigger Packet Inter-arrival Distribution')
ax4.axvline(np.median(inter_arrival), color='black', linestyle='--',
label=f'Median: {np.median(inter_arrival):.1f}μs')
ax4.legend()
ax4.grid(True, alpha=0.3)
# 5. Key agreement sync timeline
ax5 = fig.add_subplot(4, 2, 5)
sync_df = df[df['type'] == 'SYNC']
if len(sync_df) > 0:
times = sync_df['time_rel'].values
rounds = sync_df['round'].values if 'round' in sync_df else range(len(times))
ax5.scatter(times, rounds, c='blue', s=15, alpha=0.6)
ax5.set_xlabel('Time (seconds)')
ax5.set_ylabel('Round Number')
ax5.set_title(f'Key Agreement Sync Packets ({len(sync_df)} packets)')
ax5.set_ylim(-5, 260)
ax5.axhline(255, color='gray', linestyle='--', alpha=0.5, label='Round 255')
else:
ax5.text(0.5, 0.5, 'No sync packets', ha='center', va='center')
ax5.grid(True, alpha=0.3)
# 6. Sync inter-arrival times
ax6 = fig.add_subplot(4, 2, 6)
if len(sync_df) > 1:
times = sync_df['time_rel'].values
inter_arrival = np.diff(times) * 1000 # milliseconds
ax6.plot(range(len(inter_arrival)), inter_arrival, 'b-', alpha=0.5, linewidth=0.5)
ax6.scatter(range(len(inter_arrival)), inter_arrival, c='blue', s=5, alpha=0.5)
ax6.set_xlabel('Packet Number')
ax6.set_ylabel('Inter-arrival Time (ms)')
ax6.set_title('Key Agreement Timing (CVE-2025-40040 rounds)')
ax6.axhline(30, color='red', linestyle='--', alpha=0.5, label='KSM wait (30ms)')
ax6.legend()
ax6.grid(True, alpha=0.3)
# 7. Bidirectional flow
ax7 = fig.add_subplot(4, 2, 7)
# Determine direction
src_ips = df['src_ip'].unique()
if len(src_ips) >= 2:
ip1, ip2 = sorted(src_ips)[:2]
mask1 = df['src_ip'] == ip1
mask2 = df['src_ip'] == ip2
ax7.scatter(df[mask1]['time_rel'], [1] * sum(mask1),
c=df[mask1]['type'].map(colors), s=20, alpha=0.6, label=f'From {ip1}')
ax7.scatter(df[mask2]['time_rel'], [2] * sum(mask2),
c=df[mask2]['type'].map(colors), s=20, alpha=0.6, label=f'From {ip2}')
ax7.set_xlabel('Time (seconds)')
ax7.set_ylabel('Direction')
ax7.set_yticks([1, 2])
ax7.set_yticklabels([ip1, ip2])
ax7.set_title('Bidirectional Communication Flow')
ax7.legend(loc='upper right')
else:
ax7.text(0.5, 0.5, 'Single direction only', ha='center', va='center')
ax7.grid(True, alpha=0.3)
# 8. Statistics
ax8 = fig.add_subplot(4, 2, 8)
ax8.axis('off')
stats = f"""
╔══════════════════════════════════════════════════════════╗
║ ANALYSIS SUMMARY ║
╠══════════════════════════════════════════════════════════╣
║ Total Packets: {len(df):<43}║
║ Duration: {df['time_rel'].max():.2f} seconds{' '*35}║
╠══════════════════════════════════════════════════════════╣
║ PHASE 1: CVE-2023-1206 (IPv6 Hash Collision Trigger) ║
║ ────────────────────────────────────────────────────── ║"""
if 'trigger' in phases:
p = phases['trigger']
stats += f"""
║ Packets: {p['count']:<46}║
║ Duration: {p['duration_ms']:.1f}ms{' '*40}║
║ Rate: {p['rate_pps']:.0f} packets/second{' '*33}║"""
else:
stats += f"""
║ No trigger packets detected{' '*27}║"""
stats += f"""
╠══════════════════════════════════════════════════════════╣
║ PHASE 2: CVE-2025-40040 (KSM Timing Key Agreement) ║
║ ────────────────────────────────────────────────────── ║"""
if 'key_agreement' in phases:
p = phases['key_agreement']
stats += f"""
║ Sync Packets: {p['count']:<41}║
║ Duration: {p['duration_s']:.1f}s{' '*41}║
║ Expected: ~512 (256 rounds × 2 parties){' '*16}║"""
else:
stats += f"""
║ No sync packets detected{' '*30}║"""
stats += f"""
╠══════════════════════════════════════════════════════════╣
║ PHASE 3: CVE-2024-49882 (Hugepage Data Transfer) ║
║ ────────────────────────────────────────────────────── ║
║ (Data transfer via memory, minimal network traffic) ║
╚══════════════════════════════════════════════════════════╝
"""
ax8.text(0.02, 0.98, stats, transform=ax8.transAxes,
fontfamily='monospace', fontsize=9,
verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8))
plt.tight_layout()
plt.savefig(output_file, dpi=150, bbox_inches='tight')
print(f"\nPlot saved to: {output_file}")
return fig
def main():
parser = argparse.ArgumentParser(
description='Analyze Triple CVE Covert Channel Traffic',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 analyze_triple_cve.py capture.pcap
python3 analyze_triple_cve.py capture.pcap --output report.png
python3 analyze_triple_cve.py capture.pcap --show
"""
)
parser.add_argument('pcap', help='Input pcap file')
parser.add_argument('--output', '-o', help='Output plot file (default: analysis_<pcap>.png)')
parser.add_argument('--show', '-s', action='store_true', help='Show plot interactively')
parser.add_argument('--csv', '-c', help='Export packet data to CSV')
args = parser.parse_args()
if not os.path.exists(args.pcap):
print(f"Error: File not found: {args.pcap}")
sys.exit(1)
print(f"Analyzing: {args.pcap}")
print("=" * 60)
# Extract packets
print("Extracting packets with tshark...")
df = extract_packets_from_pcap(args.pcap)
if len(df) == 0:
print("No packets found in capture!")
sys.exit(1)
print(f"Found {len(df)} packets")
# Classify packets
print("Classifying packets...")
df[['type', 'cve']] = df.apply(lambda row: pd.Series(classify_packet(row)), axis=1)
df['round'] = df.apply(extract_sync_round, axis=1)
# Print summary
print("\nPacket breakdown:")
for ptype in df['type'].unique():
count = sum(df['type'] == ptype)
print(f" {ptype}: {count}")
# Analyze phases
print("\nAnalyzing phases...")
phases = analyze_phases(df)
# Export CSV if requested
if args.csv:
df.to_csv(args.csv, index=False)
print(f"\nCSV exported to: {args.csv}")
# Generate plot
output_file = args.output or f"analysis_{Path(args.pcap).stem}.png"
plot_analysis(df, output_file, phases)
if args.show:
plt.show()
if __name__ == '__main__':
main()