| import os |
| import io |
|
|
| import sqlite3 |
| import zipfile |
|
|
| from pathlib import Path |
|
|
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException |
| from fastapi.responses import JSONResponse, StreamingResponse |
|
|
| from storage.common import validate_token |
|
|
| router = APIRouter(prefix="/db", tags=["Database Manager"]) |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| DB_PATH = Path("/data/db") |
|
|
| @router.get("/download_all_db_files", tags=["Database Manager"]) |
| async def download_all_db_files( |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download all .db files from /data/db as a ZIP archive. |
| |
| Steps: |
| - Validate the token. |
| - Verify that /data/db exists and contains .db files. |
| - Create an in-memory ZIP containing all .db files. |
| - Return the ZIP as a downloadable file. |
| """ |
| validate_token(token) |
|
|
| if not DB_PATH.exists(): |
| raise HTTPException(status_code=404, detail="DB folder does not exist") |
|
|
| db_files = list(DB_PATH.glob("*.db")) |
| if not db_files: |
| raise HTTPException(status_code=404, detail="No .db files found") |
|
|
| |
| zip_buffer = io.BytesIO() |
|
|
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: |
| for file_path in db_files: |
| zipf.write(file_path, arcname=file_path.name) |
|
|
| zip_buffer.seek(0) |
|
|
| return StreamingResponse( |
| zip_buffer, |
| media_type="application/zip", |
| headers={ |
| "Content-Disposition": 'attachment; filename="db_files.zip"' |
| } |
| ) |
|
|
|
|
| @router.post("/upload_db_file", tags=["Database Manager"]) |
| async def upload_db_file( |
| file: UploadFile = File(...), |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Upload a file to the /data/db folder. |
| |
| Steps: |
| - Validate the token. |
| - Ensure /data/db exists (create it if missing). |
| - Save the uploaded file inside /data/db using its original filename. |
| - Return a JSON response confirming the upload. |
| """ |
| validate_token(token) |
|
|
| |
| DB_PATH.mkdir(parents=True, exist_ok=True) |
|
|
| final_path = DB_PATH / file.filename |
|
|
| try: |
| |
| file_bytes = await file.read() |
| with open(final_path, "wb") as f: |
| f.write(file_bytes) |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Failed to save file: {exc}") |
|
|
| return JSONResponse( |
| status_code=200, |
| content={ |
| "status": "ok", |
| "saved_to": str(final_path) |
| } |
| ) |
|
|
| @router.post("/execute_query", tags=["Database Manager"]) |
| async def execute_query( |
| db_filename: str = Query(..., description="SQLite .db file name"), |
| query: str = Query(..., description="SQL query to execute"), |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Execute a SQL query against a specified SQLite database file in /data/db. |
| |
| Steps: |
| - Validate the token. |
| - Ensure the requested .db file exists inside /data/db. |
| - Connect to the database using sqlite3. |
| - Execute the provided query. |
| - Return the results as a list of dictionaries (column: value). |
| - Capture and return errors if query execution fails. |
| """ |
| validate_token(token) |
|
|
| db_file_path = DB_PATH / db_filename |
|
|
| if not db_file_path.exists() or not db_file_path.is_file(): |
| raise HTTPException(status_code=404, detail=f"Database file {db_filename} not found") |
|
|
| try: |
| conn = sqlite3.connect(db_file_path) |
| conn.row_factory = sqlite3.Row |
| cur = conn.cursor() |
|
|
| cur.execute(query) |
| rows = cur.fetchall() |
|
|
| |
| results = [dict(row) for row in rows] |
|
|
| conn.commit() |
| conn.close() |
| except sqlite3.Error as e: |
| raise HTTPException(status_code=400, detail=f"SQL error: {e}") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {e}") |
|
|
| return JSONResponse(content={"results": results}) |