README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse, os, time, io, sys, shutil
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, unquote
from datetime import datetime
import cgi
def safe_join(base, *paths):
# Evita path traversal
final = os.path.abspath(os.path.join(base, *paths))
base = os.path.abspath(base)
if not final.startswith(base + os.sep) and final != base:
raise ValueError("Path traversal detectado")
return final
def uniquify(path):
if not os.path.exists(path):
return path
root, ext = os.path.splitext(path)
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
return f"{root}_{ts}{ext}"
class UploadHandler(BaseHTTPRequestHandler):
server_version = "MiniUpload/1.0"
def logx(self, msg):
sys.stdout.write(f"[{datetime.now().isoformat(timespec='seconds')}] {self.client_address[0]} - {msg}\n")
sys.stdout.flush()
def _common_headers(self, code=200, ct="text/plain; charset=utf-8"):
self.send_response(code)
self.send_header("Content-Type", ct)
self.end_headers()
def do_GET(self):
# Pequeña página de estado y lista de archivos
parsed = urlparse(self.path)
if parsed.path not in ("/", "/upload"):
self._common_headers(404)
self.wfile.write(b"Not found")
return
files = sorted(os.listdir(self.server.dest), key=lambda x: os.path.getmtime(os.path.join(self.server.dest, x)), reverse=True)
out = io.StringIO()
out.write(f"File upload available at /upload\nSaves in: {self.server.dest}\n\n")
out.write("Recent files:\n")
for f in files[:200]:
p = os.path.join(self.server.dest, f)
sz = os.path.getsize(p)
mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(os.path.getmtime(p)))
out.write(f"- {f} ({sz} bytes) [{mtime}]\n")
data = out.getvalue().encode("utf-8", "ignore")
self._common_headers(200, "text/plain; charset=utf-8")
self.wfile.write(data)
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path != "/upload":
self._common_headers(404)
self.wfile.write(b"Can only POST to /upload")
return
ctype, pdict = cgi.parse_header(self.headers.get('Content-Type', ''))
clen = int(self.headers.get('Content-Length', '0'))
ua = self.headers.get('User-Agent', '-')
if ctype != 'multipart/form-data':
self._common_headers(400)
self.wfile.write(b"Expected multipart/form-data")
return
boundary = pdict.get('boundary')
if not boundary:
self._common_headers(400)
self.wfile.write(b"Missing multipart boundary")
return
fs = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers.get('Content-Type'),
}
)
saved_any = False
if isinstance(fs.list, list):
for field in fs.list:
if field.filename and field.file:
original_name = os.path.basename(field.filename)
try:
dest_path = safe_join(self.server.dest, original_name)
except ValueError:
self._common_headers(400)
self.wfile.write(b"Invalid filename")
return
dest_path = uniquify(dest_path)
with open(dest_path, "wb") as f:
shutil.copyfileobj(field.file, f)
sz = os.path.getsize(dest_path)
self.logx(f'POST /upload -> saved "{os.path.basename(dest_path)}" ({sz} bytes), UA="{ua}"')
saved_any = True
if not saved_any:
self._common_headers(400)
self.wfile.write(b"No file parts found")
return
self._common_headers(200)
self.wfile.write(b"OK")
def do_PUT(self):
# PUT /upload/filename.ext (cuerpo = datos)
parsed = urlparse(self.path)
if not parsed.path.startswith("/upload/") or parsed.path == "/upload/":
self._common_headers(404)
self.wfile.write(b"PUT to /upload/<filename>")
return
target_name = os.path.basename(unquote(parsed.path[len("/upload/"):]))
if not target_name:
self._common_headers(400)
self.wfile.write(b"Missing filename")
return
try:
dest_path = safe_join(self.server.dest, target_name)
except ValueError:
self._common_headers(400)
self.wfile.write(b"Invalid filename")
return
dest_path = uniquify(dest_path)
clen = int(self.headers.get('Content-Length', '0'))
ua = self.headers.get('User-Agent', '-')
with open(dest_path, "wb") as f:
remaining = clen
# lee en chunks por si es grande
while remaining > 0:
chunk = self.rfile.read(min(65536, remaining))
if not chunk:
break
f.write(chunk)
remaining -= len(chunk)
sz = os.path.getsize(dest_path)
self.logx(f'PUT {self.path} -> saved "{os.path.basename(dest_path)}" ({sz} bytes), UA="{ua}"')
self._common_headers(201)
self.wfile.write(b"CREATED")
def main():
ap = argparse.ArgumentParser(description="Mini servidor para recibir archivos por POST/PUT")
ap.add_argument("--host", default="0.0.0.0")
ap.add_argument("--port", type=int, default=8000)
ap.add_argument("--dir", dest="dest", default=".", help="Carpeta donde guardar")
args = ap.parse_args()
os.makedirs(args.dest, exist_ok=True)
httpd = HTTPServer((args.host, args.port), UploadHandler)
httpd.dest = os.path.abspath(args.dest)
print(f"File upload available at /upload")
print(f"Serving HTTP on {args.host} port {args.port} (saving to {httpd.dest}) ...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
if __name__ == "__main__":
main()