4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / upload_server.py PY
#!/usr/bin/env python3
import argparse, os, time, io, sys, shutil
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, unquote
from datetime import datetime
import cgi

def safe_join(base, *paths):
    # Evita path traversal
    final = os.path.abspath(os.path.join(base, *paths))
    base = os.path.abspath(base)
    if not final.startswith(base + os.sep) and final != base:
        raise ValueError("Path traversal detectado")
    return final

def uniquify(path):
    if not os.path.exists(path):
        return path
    root, ext = os.path.splitext(path)
    ts = datetime.now().strftime("%Y%m%d-%H%M%S")
    return f"{root}_{ts}{ext}"

class UploadHandler(BaseHTTPRequestHandler):
    server_version = "MiniUpload/1.0"

    def logx(self, msg):
        sys.stdout.write(f"[{datetime.now().isoformat(timespec='seconds')}] {self.client_address[0]} - {msg}\n")
        sys.stdout.flush()

    def _common_headers(self, code=200, ct="text/plain; charset=utf-8"):
        self.send_response(code)
        self.send_header("Content-Type", ct)
        self.end_headers()

    def do_GET(self):
        # Pequeña página de estado y lista de archivos
        parsed = urlparse(self.path)
        if parsed.path not in ("/", "/upload"):
            self._common_headers(404)
            self.wfile.write(b"Not found")
            return
        files = sorted(os.listdir(self.server.dest), key=lambda x: os.path.getmtime(os.path.join(self.server.dest, x)), reverse=True)
        out = io.StringIO()
        out.write(f"File upload available at /upload\nSaves in: {self.server.dest}\n\n")
        out.write("Recent files:\n")
        for f in files[:200]:
            p = os.path.join(self.server.dest, f)
            sz = os.path.getsize(p)
            mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(os.path.getmtime(p)))
            out.write(f"- {f}  ({sz} bytes)  [{mtime}]\n")
        data = out.getvalue().encode("utf-8", "ignore")
        self._common_headers(200, "text/plain; charset=utf-8")
        self.wfile.write(data)

    def do_POST(self):
        parsed = urlparse(self.path)
        if parsed.path != "/upload":
            self._common_headers(404)
            self.wfile.write(b"Can only POST to /upload")
            return

        ctype, pdict = cgi.parse_header(self.headers.get('Content-Type', ''))
        clen = int(self.headers.get('Content-Length', '0'))
        ua = self.headers.get('User-Agent', '-')
        if ctype != 'multipart/form-data':
            self._common_headers(400)
            self.wfile.write(b"Expected multipart/form-data")
            return
        boundary = pdict.get('boundary')
        if not boundary:
            self._common_headers(400)
            self.wfile.write(b"Missing multipart boundary")
            return
        fs = cgi.FieldStorage(
            fp=self.rfile,
            headers=self.headers,
            environ={'REQUEST_METHOD': 'POST',
                     'CONTENT_TYPE': self.headers.get('Content-Type'),
                     }
        )

        saved_any = False
        if isinstance(fs.list, list):
            for field in fs.list:
                if field.filename and field.file:
                    original_name = os.path.basename(field.filename)
                    try:
                        dest_path = safe_join(self.server.dest, original_name)
                    except ValueError:
                        self._common_headers(400)
                        self.wfile.write(b"Invalid filename")
                        return
                    dest_path = uniquify(dest_path)
                    with open(dest_path, "wb") as f:
                        shutil.copyfileobj(field.file, f)
                    sz = os.path.getsize(dest_path)
                    self.logx(f'POST /upload -> saved "{os.path.basename(dest_path)}" ({sz} bytes), UA="{ua}"')
                    saved_any = True

        if not saved_any:
            self._common_headers(400)
            self.wfile.write(b"No file parts found")
            return

        self._common_headers(200)
        self.wfile.write(b"OK")

    def do_PUT(self):
        # PUT /upload/filename.ext  (cuerpo = datos)
        parsed = urlparse(self.path)
        if not parsed.path.startswith("/upload/") or parsed.path == "/upload/":
            self._common_headers(404)
            self.wfile.write(b"PUT to /upload/<filename>")
            return

        target_name = os.path.basename(unquote(parsed.path[len("/upload/"):]))
        if not target_name:
            self._common_headers(400)
            self.wfile.write(b"Missing filename")
            return

        try:
            dest_path = safe_join(self.server.dest, target_name)
        except ValueError:
            self._common_headers(400)
            self.wfile.write(b"Invalid filename")
            return

        dest_path = uniquify(dest_path)
        clen = int(self.headers.get('Content-Length', '0'))
        ua = self.headers.get('User-Agent', '-')

        with open(dest_path, "wb") as f:
            remaining = clen
            # lee en chunks por si es grande
            while remaining > 0:
                chunk = self.rfile.read(min(65536, remaining))
                if not chunk:
                    break
                f.write(chunk)
                remaining -= len(chunk)

        sz = os.path.getsize(dest_path)
        self.logx(f'PUT {self.path} -> saved "{os.path.basename(dest_path)}" ({sz} bytes), UA="{ua}"')
        self._common_headers(201)
        self.wfile.write(b"CREATED")

def main():
    ap = argparse.ArgumentParser(description="Mini servidor para recibir archivos por POST/PUT")
    ap.add_argument("--host", default="0.0.0.0")
    ap.add_argument("--port", type=int, default=8000)
    ap.add_argument("--dir", dest="dest", default=".", help="Carpeta donde guardar")
    args = ap.parse_args()

    os.makedirs(args.dest, exist_ok=True)
    httpd = HTTPServer((args.host, args.port), UploadHandler)
    httpd.dest = os.path.abspath(args.dest)

    print(f"File upload available at /upload")
    print(f"Serving HTTP on {args.host} port {args.port} (saving to {httpd.dest}) ...")
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print("\nShutting down...")

if __name__ == "__main__":
    main()