Mgidb / app.py
proti0070's picture
Update app.py
fe32493 verified
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.responses import StreamingResponse, HTMLResponse
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from minio import Minio
from bson import ObjectId
from typing import Any
from datetime import datetime
import uuid, io, json
app = FastAPI(title="XroAPI - Generic Database API")
# ─── Setup ──────────────────────────────────────────────
mongo = MongoClient(
"mongodb://127.0.0.1:27017",
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000,
socketTimeoutMS=10000,
maxPoolSize=None, # ← unlimited
retryWrites=True,
retryReads=True
)
minio_client = Minio(
"127.0.0.1:9000",
access_key="admin",
secret_key="password123",
secure=False
)
# ─── Helpers ────────────────────────────────────────────
def serialize(doc) -> dict:
if doc is None:
return None
doc["_id"] = str(doc["_id"])
return doc
def get_col(project: str, collection: str):
return mongo[project][collection]
def get_bucket(project: str) -> str:
bucket = f"{project}-files"
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
return bucket
def valid_oid(id: str) -> ObjectId:
if not ObjectId.is_valid(id):
raise HTTPException(status_code=400, detail="Invalid ID")
return ObjectId(id)
def retry(func, retries=3):
last_err = None
for i in range(retries):
try:
return func()
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
last_err = e
continue
except Exception as e:
raise e
raise HTTPException(status_code=503, detail=f"DB unavailable: {str(last_err)}")
# ─── Health ──────────────────────────────────────────────
@app.get("/health")
def health():
try:
mongo.admin.command('ping')
return {"status": "ok", "db": "connected"}
except Exception as e:
return {"status": "error", "db": str(e)}
# ─── Generic Database Endpoints ─────────────────────────
@app.get("/{project}/{collection}")
def get_all(project: str, collection: str, filter: str = None):
col = get_col(project, collection)
query = {}
if filter:
try:
query = json.loads(filter)
except:
raise HTTPException(status_code=400, detail="Invalid filter JSON")
docs = retry(lambda: [serialize(d) for d in col.find(query)])
return {"count": len(docs), "data": docs}
@app.get("/{project}/{collection}/_indexes")
def get_indexes(project: str, collection: str):
col = get_col(project, collection)
indexes = list(col.index_information().keys())
return {"indexes": indexes}
@app.get("/{project}/{collection}/{id}")
def get_one(project: str, collection: str, id: str):
col = get_col(project, collection)
doc = retry(lambda: col.find_one({"_id": valid_oid(id)}))
if not doc:
raise HTTPException(status_code=404, detail="Not found")
return serialize(doc)
@app.post("/{project}/{collection}")
async def insert_one(project: str, collection: str, data: dict[str, Any]):
col = get_col(project, collection)
data["_created_at"] = datetime.utcnow().isoformat()
data["_updated_at"] = datetime.utcnow().isoformat()
result = retry(lambda: col.insert_one(data))
return {"id": str(result.inserted_id), "message": "Inserted"}
@app.put("/{project}/{collection}/{id}")
async def update_one(project: str, collection: str, id: str, data: dict[str, Any]):
col = get_col(project, collection)
data["_updated_at"] = datetime.utcnow().isoformat()
result = retry(lambda: col.update_one({"_id": valid_oid(id)}, {"$set": data}))
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Not found")
return {"message": "Updated"}
@app.put("/{project}/{collection}/{id}/replace")
async def replace_one(project: str, collection: str, id: str, data: dict[str, Any]):
col = get_col(project, collection)
data["_updated_at"] = datetime.utcnow().isoformat()
result = retry(lambda: col.replace_one({"_id": valid_oid(id)}, data))
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Not found")
return {"message": "Replaced"}
@app.delete("/{project}/{collection}/{id}")
def delete_one(project: str, collection: str, id: str):
col = get_col(project, collection)
result = retry(lambda: col.delete_one({"_id": valid_oid(id)}))
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="Not found")
return {"message": "Deleted"}
@app.delete("/{project}/{collection}")
def drop_collection(project: str, collection: str):
retry(lambda: mongo[project].drop_collection(collection))
return {"message": f"Collection '{collection}' dropped"}
# ─── Bulk Operations ─────────────────────────────────────
@app.post("/{project}/{collection}/bulk")
async def insert_many(project: str, collection: str, data: list[dict[str, Any]]):
col = get_col(project, collection)
now = datetime.utcnow().isoformat()
for d in data:
d["_created_at"] = now
d["_updated_at"] = now
result = retry(lambda: col.insert_many(data))
return {"inserted": len(result.inserted_ids), "ids": [str(i) for i in result.inserted_ids]}
@app.post("/{project}/{collection}/count")
async def count_docs(project: str, collection: str, data: dict[str, Any] = {}):
col = get_col(project, collection)
count = retry(lambda: col.count_documents(data))
return {"count": count}
@app.post("/{project}/{collection}/find")
async def find_with_options(project: str, collection: str, data: dict[str, Any] = {}):
col = get_col(project, collection)
query = data.get("filter", {})
limit = data.get("limit", 100)
skip = data.get("skip", 0)
sort_field = data.get("sort", "_created_at")
sort_order = data.get("order", -1)
docs = retry(lambda: [serialize(d) for d in col.find(query).sort(sort_field, sort_order).skip(skip).limit(limit)])
return {"count": len(docs), "data": docs}
# ─── File Upload ─────────────────────────────────────────
@app.post("/{project}/files/upload")
async def upload_file(project: str, collection: str = "files", file: UploadFile = File(...)):
bucket = get_bucket(project)
ext = file.filename.split(".")[-1]
filename = f"{uuid.uuid4()}.{ext}"
data = await file.read()
minio_client.put_object(bucket, filename, io.BytesIO(data), len(data), content_type=file.content_type)
col = get_col(project, collection)
result = retry(lambda: col.insert_one({
"filename": filename,
"original_name": file.filename,
"content_type": file.content_type,
"size": len(data),
"url": f"/{project}/files/{filename}",
"_created_at": datetime.utcnow().isoformat()
}))
return {"id": str(result.inserted_id), "filename": filename, "url": f"/{project}/files/{filename}"}
@app.get("/{project}/files/{filename}")
def get_file(project: str, filename: str):
bucket = get_bucket(project)
try:
obj = minio_client.get_object(bucket, filename)
content_type = obj.headers.get("Content-Type", "application/octet-stream")
return StreamingResponse(obj, media_type=content_type)
except:
raise HTTPException(status_code=404, detail="File not found")
@app.delete("/{project}/files/{filename}")
def delete_file(project: str, filename: str):
bucket = get_bucket(project)
try:
minio_client.remove_object(bucket, filename)
except:
raise HTTPException(status_code=404, detail="File not found")
return {"message": "File deleted"}
@app.get("/{project}/files")
def list_files(project: str):
bucket = get_bucket(project)
try:
objects = minio_client.list_objects(bucket)
files = [{"filename": obj.object_name, "url": f"/{project}/files/{obj.object_name}", "size": obj.size} for obj in objects]
return {"count": len(files), "files": files}
except:
return {"count": 0, "files": []}
# ─── Project Info ────────────────────────────────────────
@app.get("/{project}")
def project_info(project: str):
db = mongo[project]
collections = db.list_collection_names()
stats = {}
for col in collections:
stats[col] = db[col].count_documents({})
return {"project": project, "collections": stats}
@app.get("/")
def all_projects():
projects = mongo.list_database_names()
ignore = ["admin", "local", "config"]
projects = [p for p in projects if p not in ignore]
return {"projects": projects}
# ─── Uptime ──────────────────────────────────────────────
@app.get("/uptime", response_class=HTMLResponse)
def uptime():
with open("index.html", "r") as f:
return f.read()