| from fastapi import FastAPI, HTTPException, UploadFile, File |
| from fastapi.responses import StreamingResponse, HTMLResponse |
| from pymongo import MongoClient |
| from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError |
| from minio import Minio |
| from bson import ObjectId |
| from typing import Any |
| from datetime import datetime |
| import uuid, io, json |
|
|
| app = FastAPI(title="XroAPI - Generic Database API") |
|
|
| |
|
|
| mongo = MongoClient( |
| "mongodb://127.0.0.1:27017", |
| serverSelectionTimeoutMS=5000, |
| connectTimeoutMS=5000, |
| socketTimeoutMS=10000, |
| maxPoolSize=None, |
| retryWrites=True, |
| retryReads=True |
| ) |
|
|
| minio_client = Minio( |
| "127.0.0.1:9000", |
| access_key="admin", |
| secret_key="password123", |
| secure=False |
| ) |
|
|
| |
|
|
| def serialize(doc) -> dict: |
| if doc is None: |
| return None |
| doc["_id"] = str(doc["_id"]) |
| return doc |
|
|
| def get_col(project: str, collection: str): |
| return mongo[project][collection] |
|
|
| def get_bucket(project: str) -> str: |
| bucket = f"{project}-files" |
| if not minio_client.bucket_exists(bucket): |
| minio_client.make_bucket(bucket) |
| return bucket |
|
|
| def valid_oid(id: str) -> ObjectId: |
| if not ObjectId.is_valid(id): |
| raise HTTPException(status_code=400, detail="Invalid ID") |
| return ObjectId(id) |
|
|
| def retry(func, retries=3): |
| last_err = None |
| for i in range(retries): |
| try: |
| return func() |
| except (ConnectionFailure, ServerSelectionTimeoutError) as e: |
| last_err = e |
| continue |
| except Exception as e: |
| raise e |
| raise HTTPException(status_code=503, detail=f"DB unavailable: {str(last_err)}") |
|
|
| |
|
|
| @app.get("/health") |
| def health(): |
| try: |
| mongo.admin.command('ping') |
| return {"status": "ok", "db": "connected"} |
| except Exception as e: |
| return {"status": "error", "db": str(e)} |
|
|
| |
|
|
| @app.get("/{project}/{collection}") |
| def get_all(project: str, collection: str, filter: str = None): |
| col = get_col(project, collection) |
| query = {} |
| if filter: |
| try: |
| query = json.loads(filter) |
| except: |
| raise HTTPException(status_code=400, detail="Invalid filter JSON") |
| docs = retry(lambda: [serialize(d) for d in col.find(query)]) |
| return {"count": len(docs), "data": docs} |
|
|
| @app.get("/{project}/{collection}/_indexes") |
| def get_indexes(project: str, collection: str): |
| col = get_col(project, collection) |
| indexes = list(col.index_information().keys()) |
| return {"indexes": indexes} |
|
|
| @app.get("/{project}/{collection}/{id}") |
| def get_one(project: str, collection: str, id: str): |
| col = get_col(project, collection) |
| doc = retry(lambda: col.find_one({"_id": valid_oid(id)})) |
| if not doc: |
| raise HTTPException(status_code=404, detail="Not found") |
| return serialize(doc) |
|
|
| @app.post("/{project}/{collection}") |
| async def insert_one(project: str, collection: str, data: dict[str, Any]): |
| col = get_col(project, collection) |
| data["_created_at"] = datetime.utcnow().isoformat() |
| data["_updated_at"] = datetime.utcnow().isoformat() |
| result = retry(lambda: col.insert_one(data)) |
| return {"id": str(result.inserted_id), "message": "Inserted"} |
|
|
| @app.put("/{project}/{collection}/{id}") |
| async def update_one(project: str, collection: str, id: str, data: dict[str, Any]): |
| col = get_col(project, collection) |
| data["_updated_at"] = datetime.utcnow().isoformat() |
| result = retry(lambda: col.update_one({"_id": valid_oid(id)}, {"$set": data})) |
| if result.matched_count == 0: |
| raise HTTPException(status_code=404, detail="Not found") |
| return {"message": "Updated"} |
|
|
| @app.put("/{project}/{collection}/{id}/replace") |
| async def replace_one(project: str, collection: str, id: str, data: dict[str, Any]): |
| col = get_col(project, collection) |
| data["_updated_at"] = datetime.utcnow().isoformat() |
| result = retry(lambda: col.replace_one({"_id": valid_oid(id)}, data)) |
| if result.matched_count == 0: |
| raise HTTPException(status_code=404, detail="Not found") |
| return {"message": "Replaced"} |
|
|
| @app.delete("/{project}/{collection}/{id}") |
| def delete_one(project: str, collection: str, id: str): |
| col = get_col(project, collection) |
| result = retry(lambda: col.delete_one({"_id": valid_oid(id)})) |
| if result.deleted_count == 0: |
| raise HTTPException(status_code=404, detail="Not found") |
| return {"message": "Deleted"} |
|
|
| @app.delete("/{project}/{collection}") |
| def drop_collection(project: str, collection: str): |
| retry(lambda: mongo[project].drop_collection(collection)) |
| return {"message": f"Collection '{collection}' dropped"} |
|
|
| |
|
|
| @app.post("/{project}/{collection}/bulk") |
| async def insert_many(project: str, collection: str, data: list[dict[str, Any]]): |
| col = get_col(project, collection) |
| now = datetime.utcnow().isoformat() |
| for d in data: |
| d["_created_at"] = now |
| d["_updated_at"] = now |
| result = retry(lambda: col.insert_many(data)) |
| return {"inserted": len(result.inserted_ids), "ids": [str(i) for i in result.inserted_ids]} |
|
|
| @app.post("/{project}/{collection}/count") |
| async def count_docs(project: str, collection: str, data: dict[str, Any] = {}): |
| col = get_col(project, collection) |
| count = retry(lambda: col.count_documents(data)) |
| return {"count": count} |
|
|
| @app.post("/{project}/{collection}/find") |
| async def find_with_options(project: str, collection: str, data: dict[str, Any] = {}): |
| col = get_col(project, collection) |
| query = data.get("filter", {}) |
| limit = data.get("limit", 100) |
| skip = data.get("skip", 0) |
| sort_field = data.get("sort", "_created_at") |
| sort_order = data.get("order", -1) |
| docs = retry(lambda: [serialize(d) for d in col.find(query).sort(sort_field, sort_order).skip(skip).limit(limit)]) |
| return {"count": len(docs), "data": docs} |
|
|
| |
|
|
| @app.post("/{project}/files/upload") |
| async def upload_file(project: str, collection: str = "files", file: UploadFile = File(...)): |
| bucket = get_bucket(project) |
| ext = file.filename.split(".")[-1] |
| filename = f"{uuid.uuid4()}.{ext}" |
| data = await file.read() |
| minio_client.put_object(bucket, filename, io.BytesIO(data), len(data), content_type=file.content_type) |
| col = get_col(project, collection) |
| result = retry(lambda: col.insert_one({ |
| "filename": filename, |
| "original_name": file.filename, |
| "content_type": file.content_type, |
| "size": len(data), |
| "url": f"/{project}/files/{filename}", |
| "_created_at": datetime.utcnow().isoformat() |
| })) |
| return {"id": str(result.inserted_id), "filename": filename, "url": f"/{project}/files/{filename}"} |
|
|
| @app.get("/{project}/files/{filename}") |
| def get_file(project: str, filename: str): |
| bucket = get_bucket(project) |
| try: |
| obj = minio_client.get_object(bucket, filename) |
| content_type = obj.headers.get("Content-Type", "application/octet-stream") |
| return StreamingResponse(obj, media_type=content_type) |
| except: |
| raise HTTPException(status_code=404, detail="File not found") |
|
|
| @app.delete("/{project}/files/{filename}") |
| def delete_file(project: str, filename: str): |
| bucket = get_bucket(project) |
| try: |
| minio_client.remove_object(bucket, filename) |
| except: |
| raise HTTPException(status_code=404, detail="File not found") |
| return {"message": "File deleted"} |
|
|
| @app.get("/{project}/files") |
| def list_files(project: str): |
| bucket = get_bucket(project) |
| try: |
| objects = minio_client.list_objects(bucket) |
| files = [{"filename": obj.object_name, "url": f"/{project}/files/{obj.object_name}", "size": obj.size} for obj in objects] |
| return {"count": len(files), "files": files} |
| except: |
| return {"count": 0, "files": []} |
|
|
| |
|
|
| @app.get("/{project}") |
| def project_info(project: str): |
| db = mongo[project] |
| collections = db.list_collection_names() |
| stats = {} |
| for col in collections: |
| stats[col] = db[col].count_documents({}) |
| return {"project": project, "collections": stats} |
|
|
| @app.get("/") |
| def all_projects(): |
| projects = mongo.list_database_names() |
| ignore = ["admin", "local", "config"] |
| projects = [p for p in projects if p not in ignore] |
| return {"projects": projects} |
|
|
| |
|
|
| @app.get("/uptime", response_class=HTMLResponse) |
| def uptime(): |
| with open("index.html", "r") as f: |
| return f.read() |