| from fastapi import FastAPI, Request, UploadFile, File |
| from fastapi.responses import JSONResponse |
| import os |
| from datetime import datetime |
|
|
| app = FastAPI() |
|
|
| SAVE_DIR = "files" |
| os.makedirs(SAVE_DIR, exist_ok=True) |
|
|
|
|
| @app.get("/") |
| def home(): |
| """Health check endpoint""" |
| print("✅ Health check") |
| return {"status": "running", "service": "Telegram File Storage"} |
|
|
|
|
| @app.post("/upload") |
| async def upload(request: Request): |
| """ |
| دریافت فایل از تلگرام بات (از طریق Koyeb) |
| """ |
| try: |
| |
| data = await request.body() |
| |
| if not data: |
| return JSONResponse( |
| {"error": "No file data received"}, |
| status_code=400 |
| ) |
|
|
| |
| filename = request.headers.get("X-Filename") |
| if not filename: |
| filename = f"file_{datetime.now().strftime('%Y%m%d_%H%M%S')}.bin" |
|
|
| size_kb = round(len(data) / 1024, 2) |
| print(f"📥 File received: {filename} | {size_kb} KB") |
|
|
| |
| file_path = os.path.join(SAVE_DIR, filename) |
| |
| with open(file_path, "wb") as f: |
| f.write(data) |
|
|
| print(f"💾 Saved to: {file_path}") |
|
|
| return { |
| "status": "success", |
| "saved": filename, |
| "size_kb": size_kb, |
| "path": file_path |
| } |
| |
| except Exception as e: |
| print(f"❌ Error: {e}") |
| return JSONResponse( |
| {"error": str(e)}, |
| status_code=500 |
| ) |
|
|
|
|
| @app.get("/files") |
| def list_files(): |
| """لیست فایلهای ذخیره شده""" |
| try: |
| files = os.listdir(SAVE_DIR) |
| file_info = [] |
| |
| for f in files: |
| path = os.path.join(SAVE_DIR, f) |
| size = os.path.getsize(path) |
| file_info.append({ |
| "name": f, |
| "size_kb": round(size / 1024, 2) |
| }) |
| |
| return { |
| "total": len(files), |
| "files": file_info |
| } |
| except Exception as e: |
| return {"error": str(e)} |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|