4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / analyze_triple_cve.py PY
#!/usr/bin/env python3
"""
analyze_triple_cve.py - Analyze Triple CVE Covert Channel Traffic

This script analyzes pcap files captured from the triple CVE covert channel
and generates detailed visualizations of:
- CVE-2023-1206: IPv6 hash collision trigger bursts
- CVE-2025-40040: KSM timing-based key agreement sync
- Phase 3 signals for CVE-2024-49882 hugepage transfer

Usage:
    python3 analyze_triple_cve.py capture.pcap
    python3 analyze_triple_cve.py capture.pcap --output analysis.png
"""

import sys
import os
import subprocess
import argparse
from pathlib import Path

try:
    import pandas as pd
    import matplotlib.pyplot as plt
    import matplotlib.patches as mpatches
    import numpy as np
except ImportError as e:
    print(f"Missing dependency: {e}")
    print("Install with: pip3 install pandas matplotlib numpy")
    sys.exit(1)


# Magic numbers (little-endian in packet data)
TRIGGER_MAGIC = ['0612feca', 'cafe1206']  # 0xCAFE1206
SYNC_MAGIC = ['4000adde', 'dead0040']      # 0xDEAD0040
MSG_MAGIC = ['3713dec0', 'c0de1337']       # 0xC0DE1337

# Ports
TRIGGER_PORT = 31336
SYNC_PORT = 31337


def extract_packets_from_pcap(pcap_file):
    """Extract packet data from pcap using tshark."""
    
    cmd = [
        'tshark', '-r', pcap_file, '-T', 'fields',
        '-e', 'frame.number',
        '-e', 'frame.time_relative',
        '-e', 'frame.time_delta',
        '-e', 'ip.src',
        '-e', 'ip.dst',
        '-e', 'udp.srcport',
        '-e', 'udp.dstport',
        '-e', 'udp.length',
        '-e', 'data.data',
        '-E', 'header=y',
        '-E', 'separator=,'
    ]
    
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        
        # Parse CSV output
        lines = result.stdout.strip().split('\n')
        if len(lines) < 2:
            return pd.DataFrame()
        
        # Create DataFrame
        import io
        df = pd.read_csv(io.StringIO(result.stdout))
        df.columns = ['frame_num', 'time_rel', 'time_delta', 'src_ip', 'dst_ip', 
                      'src_port', 'dst_port', 'length', 'data']
        
        return df
        
    except subprocess.CalledProcessError as e:
        print(f"tshark error: {e.stderr}")
        return pd.DataFrame()
    except FileNotFoundError:
        print("tshark not found. Install with: sudo apt install wireshark-cli")
        sys.exit(1)


def classify_packet(row):
    """Classify packet based on magic numbers and ports."""
    data = str(row.get('data', '')).lower().replace(':', '')
    dst_port = row.get('dst_port', 0)
    
    # Check magic numbers
    for magic in TRIGGER_MAGIC:
        if magic in data:
            return 'TRIGGER', 'CVE-2023-1206'
    
    for magic in SYNC_MAGIC:
        if magic in data:
            # Check if it's key agreement (rounds 0-255) or phase3 signal (0xFF)
            return 'SYNC', 'Key Agreement'
    
    for magic in MSG_MAGIC:
        if magic in data:
            return 'MESSAGE', 'CVE-2024-49882'
    
    # Fallback to port-based classification
    if dst_port == TRIGGER_PORT:
        return 'TRIGGER', 'CVE-2023-1206'
    elif dst_port == SYNC_PORT:
        return 'SYNC', 'Key Agreement'
    
    return 'OTHER', 'Unknown'


def extract_sync_round(row):
    """Extract round number from sync packet data."""
    data = str(row.get('data', '')).lower().replace(':', '')
    
    # Sync packet format: magic(4) + round(1) + value(1)
    # In hex: 8 chars magic + 2 chars round + 2 chars value
    for magic in SYNC_MAGIC:
        idx = data.find(magic)
        if idx >= 0:
            try:
                round_hex = data[idx + 8:idx + 10]
                return int(round_hex, 16)
            except:
                pass
    return -1


def analyze_phases(df):
    """Analyze the three phases of communication."""
    
    phases = {}
    
    # Phase 1: Trigger detection
    trigger_df = df[df['type'] == 'TRIGGER']
    if len(trigger_df) > 0:
        phases['trigger'] = {
            'start': trigger_df['time_rel'].min(),
            'end': trigger_df['time_rel'].max(),
            'count': len(trigger_df),
            'duration_ms': (trigger_df['time_rel'].max() - trigger_df['time_rel'].min()) * 1000,
            'rate_pps': len(trigger_df) / max(trigger_df['time_rel'].max() - trigger_df['time_rel'].min(), 0.001)
        }
    
    # Phase 2: Key agreement
    sync_df = df[df['type'] == 'SYNC']
    if len(sync_df) > 0:
        phases['key_agreement'] = {
            'start': sync_df['time_rel'].min(),
            'end': sync_df['time_rel'].max(),
            'count': len(sync_df),
            'duration_s': sync_df['time_rel'].max() - sync_df['time_rel'].min(),
            'rounds_detected': sync_df['round'].nunique() if 'round' in sync_df else 0
        }
    
    return phases


def plot_analysis(df, output_file, phases):
    """Generate comprehensive visualization."""
    
    fig = plt.figure(figsize=(18, 14))
    fig.suptitle('Triple CVE Covert Channel Analysis\n' + 
                 'CVE-2023-1206 (Trigger) → CVE-2025-40040 (Key Agreement) → CVE-2024-49882 (Data)',
                 fontsize=14, fontweight='bold')
    
    colors = {
        'TRIGGER': '#FF4444',
        'SYNC': '#4444FF',
        'MESSAGE': '#44AA44',
        'OTHER': '#888888'
    }
    
    # 1. Full timeline
    ax1 = fig.add_subplot(4, 2, 1)
    for ptype in df['type'].unique():
        mask = df['type'] == ptype
        y_val = {'TRIGGER': 3, 'SYNC': 2, 'MESSAGE': 1, 'OTHER': 0}.get(ptype, 0)
        ax1.scatter(df[mask]['time_rel'], [y_val] * sum(mask),
                   c=colors.get(ptype, '#888888'), label=ptype, alpha=0.6, s=30)
    
    ax1.set_xlabel('Time (seconds)')
    ax1.set_ylabel('Packet Type')
    ax1.set_yticks([0, 1, 2, 3])
    ax1.set_yticklabels(['Other', 'Message', 'Sync', 'Trigger'])
    ax1.set_title('Complete Packet Timeline')
    ax1.legend(loc='upper right')
    ax1.grid(True, alpha=0.3)
    
    # Add phase markers
    if 'trigger' in phases:
        ax1.axvspan(phases['trigger']['start'], phases['trigger']['end'], 
                   alpha=0.2, color='red', label='Phase 1')
    if 'key_agreement' in phases:
        ax1.axvspan(phases['key_agreement']['start'], phases['key_agreement']['end'],
                   alpha=0.2, color='blue', label='Phase 2')
    
    # 2. Packet rate histogram
    ax2 = fig.add_subplot(4, 2, 2)
    max_time = df['time_rel'].max()
    bins = np.arange(0, max_time + 0.05, 0.05)  # 50ms bins
    
    for ptype in ['TRIGGER', 'SYNC']:
        mask = df['type'] == ptype
        if sum(mask) > 0:
            ax2.hist(df[mask]['time_rel'], bins=bins, alpha=0.7,
                    label=ptype, color=colors.get(ptype))
    
    ax2.set_xlabel('Time (seconds)')
    ax2.set_ylabel('Packets per 50ms')
    ax2.set_title('Packet Rate - CVE-2023-1206 Burst Detection')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # 3. Trigger burst detail
    ax3 = fig.add_subplot(4, 2, 3)
    trigger_df = df[df['type'] == 'TRIGGER']
    if len(trigger_df) > 1:
        times = trigger_df['time_rel'].values
        # Zoom to trigger period
        ax3.scatter(range(len(times)), (times - times.min()) * 1000, 
                   c='red', s=10, alpha=0.6)
        ax3.set_xlabel('Packet Number')
        ax3.set_ylabel('Time from first trigger (ms)')
        ax3.set_title(f'CVE-2023-1206 Trigger Burst Detail ({len(trigger_df)} packets)')
        
        # Add burst rate annotation
        if len(times) > 1:
            duration = times.max() - times.min()
            rate = len(times) / duration if duration > 0 else 0
            ax3.annotate(f'Rate: {rate:.0f} pkt/s\nDuration: {duration*1000:.1f}ms',
                        xy=(0.95, 0.95), xycoords='axes fraction',
                        ha='right', va='top', fontsize=10,
                        bbox=dict(boxstyle='round', facecolor='wheat'))
    else:
        ax3.text(0.5, 0.5, 'No trigger packets', ha='center', va='center')
    ax3.grid(True, alpha=0.3)
    
    # 4. Trigger inter-arrival times
    ax4 = fig.add_subplot(4, 2, 4)
    if len(trigger_df) > 1:
        times = trigger_df['time_rel'].values
        inter_arrival = np.diff(times) * 1000000  # microseconds
        ax4.hist(inter_arrival, bins=50, color='red', alpha=0.7, edgecolor='darkred')
        ax4.set_xlabel('Inter-arrival Time (μs)')
        ax4.set_ylabel('Count')
        ax4.set_title('Trigger Packet Inter-arrival Distribution')
        ax4.axvline(np.median(inter_arrival), color='black', linestyle='--', 
                   label=f'Median: {np.median(inter_arrival):.1f}μs')
        ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # 5. Key agreement sync timeline
    ax5 = fig.add_subplot(4, 2, 5)
    sync_df = df[df['type'] == 'SYNC']
    if len(sync_df) > 0:
        times = sync_df['time_rel'].values
        rounds = sync_df['round'].values if 'round' in sync_df else range(len(times))
        
        ax5.scatter(times, rounds, c='blue', s=15, alpha=0.6)
        ax5.set_xlabel('Time (seconds)')
        ax5.set_ylabel('Round Number')
        ax5.set_title(f'Key Agreement Sync Packets ({len(sync_df)} packets)')
        ax5.set_ylim(-5, 260)
        ax5.axhline(255, color='gray', linestyle='--', alpha=0.5, label='Round 255')
    else:
        ax5.text(0.5, 0.5, 'No sync packets', ha='center', va='center')
    ax5.grid(True, alpha=0.3)
    
    # 6. Sync inter-arrival times
    ax6 = fig.add_subplot(4, 2, 6)
    if len(sync_df) > 1:
        times = sync_df['time_rel'].values
        inter_arrival = np.diff(times) * 1000  # milliseconds
        ax6.plot(range(len(inter_arrival)), inter_arrival, 'b-', alpha=0.5, linewidth=0.5)
        ax6.scatter(range(len(inter_arrival)), inter_arrival, c='blue', s=5, alpha=0.5)
        ax6.set_xlabel('Packet Number')
        ax6.set_ylabel('Inter-arrival Time (ms)')
        ax6.set_title('Key Agreement Timing (CVE-2025-40040 rounds)')
        ax6.axhline(30, color='red', linestyle='--', alpha=0.5, label='KSM wait (30ms)')
        ax6.legend()
    ax6.grid(True, alpha=0.3)
    
    # 7. Bidirectional flow
    ax7 = fig.add_subplot(4, 2, 7)
    
    # Determine direction
    src_ips = df['src_ip'].unique()
    if len(src_ips) >= 2:
        ip1, ip2 = sorted(src_ips)[:2]
        
        mask1 = df['src_ip'] == ip1
        mask2 = df['src_ip'] == ip2
        
        ax7.scatter(df[mask1]['time_rel'], [1] * sum(mask1), 
                   c=df[mask1]['type'].map(colors), s=20, alpha=0.6, label=f'From {ip1}')
        ax7.scatter(df[mask2]['time_rel'], [2] * sum(mask2),
                   c=df[mask2]['type'].map(colors), s=20, alpha=0.6, label=f'From {ip2}')
        
        ax7.set_xlabel('Time (seconds)')
        ax7.set_ylabel('Direction')
        ax7.set_yticks([1, 2])
        ax7.set_yticklabels([ip1, ip2])
        ax7.set_title('Bidirectional Communication Flow')
        ax7.legend(loc='upper right')
    else:
        ax7.text(0.5, 0.5, 'Single direction only', ha='center', va='center')
    ax7.grid(True, alpha=0.3)
    
    # 8. Statistics
    ax8 = fig.add_subplot(4, 2, 8)
    ax8.axis('off')
    
    stats = f"""
╔══════════════════════════════════════════════════════════╗
║                    ANALYSIS SUMMARY                       ║
╠══════════════════════════════════════════════════════════╣
║  Total Packets: {len(df):<43}║
║  Duration: {df['time_rel'].max():.2f} seconds{' '*35}║
╠══════════════════════════════════════════════════════════╣
║  PHASE 1: CVE-2023-1206 (IPv6 Hash Collision Trigger)    ║
║  ──────────────────────────────────────────────────────  ║"""
    
    if 'trigger' in phases:
        p = phases['trigger']
        stats += f"""
║    Packets: {p['count']:<46}║
║    Duration: {p['duration_ms']:.1f}ms{' '*40}║
║    Rate: {p['rate_pps']:.0f} packets/second{' '*33}║"""
    else:
        stats += f"""
║    No trigger packets detected{' '*27}║"""
    
    stats += f"""
╠══════════════════════════════════════════════════════════╣
║  PHASE 2: CVE-2025-40040 (KSM Timing Key Agreement)      ║
║  ──────────────────────────────────────────────────────  ║"""
    
    if 'key_agreement' in phases:
        p = phases['key_agreement']
        stats += f"""
║    Sync Packets: {p['count']:<41}║
║    Duration: {p['duration_s']:.1f}s{' '*41}║
║    Expected: ~512 (256 rounds × 2 parties){' '*16}║"""
    else:
        stats += f"""
║    No sync packets detected{' '*30}║"""
    
    stats += f"""
╠══════════════════════════════════════════════════════════╣
║  PHASE 3: CVE-2024-49882 (Hugepage Data Transfer)        ║
║  ──────────────────────────────────────────────────────  ║
║    (Data transfer via memory, minimal network traffic)   ║
╚══════════════════════════════════════════════════════════╝
"""
    
    ax8.text(0.02, 0.98, stats, transform=ax8.transAxes,
            fontfamily='monospace', fontsize=9,
            verticalalignment='top',
            bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8))
    
    plt.tight_layout()
    plt.savefig(output_file, dpi=150, bbox_inches='tight')
    print(f"\nPlot saved to: {output_file}")
    
    return fig


def main():
    parser = argparse.ArgumentParser(
        description='Analyze Triple CVE Covert Channel Traffic',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    python3 analyze_triple_cve.py capture.pcap
    python3 analyze_triple_cve.py capture.pcap --output report.png
    python3 analyze_triple_cve.py capture.pcap --show
        """
    )
    parser.add_argument('pcap', help='Input pcap file')
    parser.add_argument('--output', '-o', help='Output plot file (default: analysis_<pcap>.png)')
    parser.add_argument('--show', '-s', action='store_true', help='Show plot interactively')
    parser.add_argument('--csv', '-c', help='Export packet data to CSV')
    
    args = parser.parse_args()
    
    if not os.path.exists(args.pcap):
        print(f"Error: File not found: {args.pcap}")
        sys.exit(1)
    
    print(f"Analyzing: {args.pcap}")
    print("=" * 60)
    
    # Extract packets
    print("Extracting packets with tshark...")
    df = extract_packets_from_pcap(args.pcap)
    
    if len(df) == 0:
        print("No packets found in capture!")
        sys.exit(1)
    
    print(f"Found {len(df)} packets")
    
    # Classify packets
    print("Classifying packets...")
    df[['type', 'cve']] = df.apply(lambda row: pd.Series(classify_packet(row)), axis=1)
    df['round'] = df.apply(extract_sync_round, axis=1)
    
    # Print summary
    print("\nPacket breakdown:")
    for ptype in df['type'].unique():
        count = sum(df['type'] == ptype)
        print(f"  {ptype}: {count}")
    
    # Analyze phases
    print("\nAnalyzing phases...")
    phases = analyze_phases(df)
    
    # Export CSV if requested
    if args.csv:
        df.to_csv(args.csv, index=False)
        print(f"\nCSV exported to: {args.csv}")
    
    # Generate plot
    output_file = args.output or f"analysis_{Path(args.pcap).stem}.png"
    plot_analysis(df, output_file, phases)
    
    if args.show:
        plt.show()


if __name__ == '__main__':
    main()