| from datetime import datetime, timezone |
| import logging |
| from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query, Response |
| from typing import List, Dict, Any, Optional |
| from ...workers.celery_worker import celery_app |
|
|
| logger = logging.getLogger(__name__) |
|
|
| from ...core.neo4j_store import Neo4jStore |
| from ...retrieval.agent import AgentRetrievalSystem |
| from ...ingestion.pipeline import IngestionPipeline |
| from ...config import settings |
| from ...api.models import * |
| from ...api.auth import get_current_user, User |
| import redis |
| from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client |
| from pydantic import BaseModel |
| from pathlib import Path |
| import httpx |
|
|
| class SettingsUpdateRequest(BaseModel): |
| default_llm_provider: Optional[str] = None |
| embedding_provider: Optional[str] = None |
| openai_api_key: Optional[str] = None |
| openai_model: Optional[str] = None |
| anthropic_api_key: Optional[str] = None |
| anthropic_model: Optional[str] = None |
| google_api_key: Optional[str] = None |
| gemini_model: Optional[str] = None |
| ollama_base_url: Optional[str] = None |
| ollama_model: Optional[str] = None |
| ollama_embedding_model: Optional[str] = None |
| huggingface_api_key: Optional[str] = None |
| huggingface_model: Optional[str] = None |
| huggingface_embedding_model: Optional[str] = None |
|
|
| router = APIRouter() |
|
|
| from ...core.storage import get_storage |
| storage = get_storage() |
|
|
| @router.get("/api/system/health", response_model=SystemHealthResponse, tags=["System"]) |
| async def health_check(request: Request, response: Response): |
| """System health check""" |
| |
| neo4j_connected = False |
| redis_connected = False |
| workers_active = 0 |
| gds_version = None |
| |
| try: |
| |
| await request.app.state.graph_store.execute_query("RETURN 1") |
| neo4j_connected = True |
| |
| |
| try: |
| gds_res = await request.app.state.graph_store.execute_query("RETURN gds.version() as version") |
| if gds_res: |
| gds_version = gds_res[0]["version"] |
| except Exception: |
| pass |
| |
| except Exception as e: |
| logger.error(f"Neo4j health check failed: {e}") |
| |
| try: |
| |
| if hasattr(request.app.state, 'redis_client'): |
| await request.app.state.redis_client.ping() |
| redis_connected = True |
| else: |
| redis_connected = True |
| except Exception as e: |
| logger.error(f"Redis health check failed: {e}") |
| |
| try: |
| |
| inspect = celery_app.control.inspect() |
| active = inspect.active() |
| if active: |
| workers_active = len(active) |
| except Exception as e: |
| logger.warning(f"Celery health check failed: {e}") |
| |
| overall_status = "healthy" if (neo4j_connected and redis_connected) else "degraded" |
| |
| if overall_status == "degraded": |
| response.status_code = 503 |
| |
| return SystemHealthResponse( |
| status=overall_status, |
| version=settings.app_version, |
| neo4j_connected=neo4j_connected, |
| redis_connected=redis_connected, |
| workers_active=workers_active, |
| gds_version=gds_version, |
| timestamp=datetime.now(timezone.utc).replace(tzinfo=None) |
| ) |
|
|
|
|
|
|
| @router.get("/api/system/stats", response_model=SystemStatsResponse, tags=["System"]) |
| async def get_system_stats(request: Request, current_user: User = Depends(get_current_user)): |
| """Get system statistics""" |
| |
| tenant_id = current_user.tenant_id |
| params = {"tenant_id": tenant_id} if tenant_id else {} |
| |
| |
| doc_query = "MATCH (d:Document {tenant_id: $tenant_id}) RETURN count(d) as count" if tenant_id else "MATCH (d:Document) RETURN count(d) as count" |
| doc_result = await request.app.state.graph_store.execute_query(doc_query, params) |
| documents_count = doc_result[0]["count"] if doc_result else 0 |
| |
| |
| entity_query = "MATCH (e:Entity {tenant_id: $tenant_id}) RETURN count(e) as count" if tenant_id else "MATCH (e:Entity) RETURN count(e) as count" |
| entity_result = await request.app.state.graph_store.execute_query(entity_query, params) |
| entities_count = entity_result[0]["count"] if entity_result else 0 |
| |
| |
| rel_query = "MATCH ()-[r {tenant_id: $tenant_id}]->() RETURN count(r) as count" if tenant_id else "MATCH ()-[r]->() RETURN count(r) as count" |
| rel_result = await request.app.state.graph_store.execute_query(rel_query, params) |
| relationships_count = rel_result[0]["count"] if rel_result else 0 |
| |
| |
| chunk_query = "MATCH (c:Chunk {tenant_id: $tenant_id}) RETURN count(c) as count" if tenant_id else "MATCH (c:Chunk) RETURN count(c) as count" |
| chunk_result = await request.app.state.graph_store.execute_query(chunk_query, params) |
| chunks_count = chunk_result[0]["count"] if chunk_result else 0 |
| |
| ontology = request.app.state.ingestion_pipeline.get_ontology() |
| ontology_version = ontology.version if ontology else "none" |
| |
| return SystemStatsResponse( |
| documents_count=documents_count, |
| entities_count=entities_count, |
| relationships_count=relationships_count, |
| chunks_count=chunks_count, |
| ontology_version=ontology_version |
| ) |
|
|
|
|
|
|
| @router.get("/api/system/my-stats", tags=["System"]) |
| async def get_my_stats(request: Request, current_user: User = Depends(get_current_user)): |
| """Get activity stats for the currently authenticated user.""" |
| from fastapi.responses import JSONResponse |
|
|
| username = current_user.username |
|
|
| conv_q = """ |
| MATCH (u:User {username: $username})-[:HAS_CONVERSATION]->(c:Conversation) |
| RETURN count(DISTINCT c) as conversation_count |
| """ |
| msg_q = """ |
| MATCH (u:User {username: $username})-[:HAS_CONVERSATION]->(c:Conversation)-[:HAS_MESSAGE]->(m) |
| WHERE m.role = 'user' |
| RETURN count(m) as message_count, max(m.created_at) as last_active |
| """ |
| try: |
| conv_rows = await request.app.state.graph_store.execute_query(conv_q, {"username": username}) |
| msg_rows = await request.app.state.graph_store.execute_query(msg_q, {"username": username}) |
| conversation_count = conv_rows[0]["conversation_count"] if conv_rows else 0 |
| message_count = msg_rows[0]["message_count"] if msg_rows else 0 |
| last_active = msg_rows[0]["last_active"] if msg_rows else None |
| if hasattr(last_active, "isoformat"): |
| last_active = last_active.isoformat() |
| except Exception: |
| conversation_count = 0 |
| message_count = 0 |
| last_active = None |
|
|
| return JSONResponse({ |
| "username": username, |
| "conversation_count": conversation_count, |
| "message_count": message_count, |
| "last_active": last_active, |
| }) |
|
|
|
|
|
|
| @router.get("/api/system/formats", response_model=SupportedFormatsResponse, tags=["System"]) |
| async def get_supported_formats(request: Request): |
| """List supported ingestion file formats""" |
| return SupportedFormatsResponse( |
| formats=settings.allowed_file_types, |
| descriptions={ |
| ".pdf": "PDF documents (LlamaParse or pypdf)", |
| ".txt": "Plain text files", |
| ".md": "Markdown files", |
| ".docx": "Microsoft Word documents", |
| ".csv": "CSV spreadsheets (rows β entity facts)", |
| ".xlsx": "Excel spreadsheets (all sheets processed)", |
| ".pptx": "PowerPoint presentations (slides + notes)", |
| ".json": "JSON data files (nested structures flattened)", |
| } |
| ) |
|
|
|
|
| |
|
|
| @router.get("/api/system/settings", tags=["System"]) |
| async def get_settings(current_user: User = Depends(get_current_user)): |
| """Get current global settings""" |
| return { |
| "default_llm_provider": settings.default_llm_provider, |
| "embedding_provider": settings.embedding_provider, |
| "openai_api_key": settings.openai_api_key, |
| "openai_model": settings.openai_model, |
| "anthropic_api_key": settings.anthropic_api_key, |
| "anthropic_model": settings.anthropic_model, |
| "google_api_key": settings.google_api_key, |
| "gemini_model": settings.gemini_model, |
| "ollama_base_url": settings.ollama_base_url, |
| "ollama_model": settings.ollama_model, |
| "ollama_embedding_model": settings.ollama_embedding_model, |
| "huggingface_api_key": getattr(settings, 'huggingface_api_key', None), |
| "huggingface_model": getattr(settings, 'huggingface_model', None), |
| "huggingface_embedding_model": getattr(settings, 'huggingface_embedding_model', None), |
| } |
|
|
| @router.post("/api/system/settings", tags=["System"]) |
| async def update_settings(update_req: SettingsUpdateRequest, current_user: User = Depends(get_current_user)): |
| """Update global settings dynamically and persist to .env""" |
| update_data = update_req.model_dump(exclude_unset=True) |
| |
| |
| for key, value in update_data.items(): |
| if hasattr(settings, key): |
| setattr(settings, key, value) |
| |
| |
| env_path = Path(".env") |
| if env_path.exists(): |
| with open(env_path, "r") as f: |
| lines = f.readlines() |
| |
| new_lines = [] |
| updated_keys = set() |
| for line in lines: |
| stripped = line.strip() |
| if not stripped or stripped.startswith("#"): |
| new_lines.append(line) |
| continue |
| |
| if "=" in line: |
| k, v = line.split("=", 1) |
| k = k.strip() |
| if k.lower() in update_data: |
| new_val = update_data[k.lower()] |
| if new_val is not None: |
| new_lines.append(f"{k.upper()}={new_val}\n") |
| else: |
| new_lines.append(f"{k.upper()}=\n") |
| updated_keys.add(k.lower()) |
| else: |
| new_lines.append(line) |
| else: |
| new_lines.append(line) |
| |
| |
| for k, v in update_data.items(): |
| if k not in updated_keys and v is not None: |
| new_lines.append(f"{k.upper()}={v}\n") |
| |
| with open(env_path, "w") as f: |
| f.writelines(new_lines) |
| else: |
| |
| new_lines = [] |
| for k, v in update_data.items(): |
| if v is not None: |
| new_lines.append(f"{k.upper()}={v}\n") |
| with open(env_path, "w") as f: |
| f.writelines(new_lines) |
| |
| return {"message": "Settings updated successfully"} |
|
|
| @router.get("/api/system/ollama/models", tags=["System"]) |
| async def get_ollama_models(base_url: str = Query(..., description="Ollama Base URL")): |
| """Fetch available models from an Ollama instance""" |
| models = [] |
| try: |
| async with httpx.AsyncClient() as client: |
| resp = await client.get(f"{base_url.rstrip('/')}/api/tags", timeout=5.0) |
| if resp.status_code == 200: |
| data = resp.json() |
| models = [m["name"] for m in data.get("models", [])] |
| except Exception as e: |
| logger.error(f"Failed to fetch Ollama models: {e}") |
| |
| |
| popular = ["llama3:8b", "llama3.1:8b", "mistral:7b", "gemma:7b", "phi3:mini", "deepseek-coder:6.7b", "nomic-embed-text"] |
| for p in popular: |
| if p not in models: |
| models.append(p) |
| |
| return {"models": models} |
|
|
| @router.get("/api/system/huggingface/models", tags=["System"]) |
| async def get_hf_models(): |
| """Fetch trending HuggingFace models""" |
| models = [ |
| "meta-llama/Meta-Llama-3-8B-Instruct", |
| "mistralai/Mistral-7B-Instruct-v0.2", |
| "google/gemma-7b-it", |
| "BAAI/bge-large-en-v1.5", |
| "sentence-transformers/all-MiniLM-L6-v2" |
| ] |
| try: |
| async with httpx.AsyncClient() as client: |
| resp = await client.get("https://huggingface.co/api/models?sort=trending&limit=30", timeout=5.0) |
| if resp.status_code == 200: |
| data = resp.json() |
| for m in data: |
| if m.get("id") and m.get("id") not in models: |
| models.append(m["id"]) |
| except Exception as e: |
| logger.error(f"Failed to fetch HuggingFace models: {e}") |
| |
| return {"models": models} |
|
|
| |
|
|
|
|
|
|