Spaces:
Sleeping
Sleeping
| import logging | |
| import json | |
| import traceback | |
| from datetime import datetime, timedelta, timezone | |
| import time | |
| from functools import lru_cache | |
| from pathlib import Path as pathlib_Path # Import Path from pathlib with a different name | |
| from fastapi import APIRouter, HTTPException, Depends, Query, Body, Response, File, UploadFile, Form, BackgroundTasks | |
| from fastapi.params import Path # Import Path explicitly from fastapi.params instead | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from typing import List, Optional, Dict, Any | |
| import logging | |
| import traceback | |
| from datetime import datetime | |
| from sqlalchemy import text, inspect, func | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from sqlalchemy import desc, func | |
| from cachetools import TTLCache | |
| import uuid | |
| from app.database.postgresql import get_db | |
| from app.database.models import FAQItem, EmergencyItem, EventItem, AboutPixity, SolanaSummit, DaNangBucketList, ApiKey, VectorDatabase, Document, VectorStatus, TelegramBot, ChatEngine, BotEngine, EngineVectorDb, DocumentContent | |
| from pydantic import BaseModel, Field, ConfigDict | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| # Create router | |
| router = APIRouter( | |
| prefix="/postgres", | |
| tags=["PostgreSQL"], | |
| ) | |
| # Initialize caches for frequently used data | |
| # Cache for 5 minutes (300 seconds) | |
| faqs_cache = TTLCache(maxsize=1, ttl=300) | |
| emergencies_cache = TTLCache(maxsize=1, ttl=300) | |
| events_cache = TTLCache(maxsize=10, ttl=300) # Cache for different page sizes | |
| about_pixity_cache = TTLCache(maxsize=1, ttl=300) | |
| solana_summit_cache = TTLCache(maxsize=1, ttl=300) | |
| danang_bucket_list_cache = TTLCache(maxsize=1, ttl=300) | |
| # --- Pydantic models for request/response --- | |
| # Information models | |
| class InfoContentBase(BaseModel): | |
| content: str | |
| class InfoContentCreate(InfoContentBase): | |
| pass | |
| class InfoContentUpdate(InfoContentBase): | |
| pass | |
| class InfoContentResponse(InfoContentBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| # FAQ models | |
| class FAQBase(BaseModel): | |
| question: str | |
| answer: str | |
| is_active: bool = True | |
| class FAQCreate(FAQBase): | |
| pass | |
| class FAQUpdate(BaseModel): | |
| question: Optional[str] = None | |
| answer: Optional[str] = None | |
| is_active: Optional[bool] = None | |
| class FAQResponse(FAQBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| # Emergency contact models | |
| class EmergencyBase(BaseModel): | |
| name: str | |
| phone_number: str | |
| description: Optional[str] = None | |
| address: Optional[str] = None | |
| location: Optional[str] = None | |
| priority: int = 0 | |
| is_active: bool = True | |
| section: Optional[str] = None | |
| section_id: Optional[int] = None | |
| class EmergencyCreate(EmergencyBase): | |
| pass | |
| class EmergencyUpdate(BaseModel): | |
| name: Optional[str] = None | |
| phone_number: Optional[str] = None | |
| description: Optional[str] = None | |
| address: Optional[str] = None | |
| location: Optional[str] = None | |
| priority: Optional[int] = None | |
| is_active: Optional[bool] = None | |
| section: Optional[str] = None | |
| section_id: Optional[int] = None | |
| class EmergencyResponse(EmergencyBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| # Event models | |
| class EventBase(BaseModel): | |
| name: str | |
| description: str | |
| address: str | |
| location: Optional[str] = None | |
| date_start: datetime | |
| date_end: Optional[datetime] = None | |
| price: Optional[List[dict]] = None | |
| url: Optional[str] = None | |
| is_active: bool = True | |
| featured: bool = False | |
| class EventCreate(EventBase): | |
| pass | |
| class EventUpdate(BaseModel): | |
| name: Optional[str] = None | |
| description: Optional[str] = None | |
| address: Optional[str] = None | |
| location: Optional[str] = None | |
| date_start: Optional[datetime] = None | |
| date_end: Optional[datetime] = None | |
| price: Optional[List[dict]] = None | |
| url: Optional[str] = None | |
| is_active: Optional[bool] = None | |
| featured: Optional[bool] = None | |
| class EventResponse(EventBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| # --- Batch operations for better performance --- | |
| class BatchEventCreate(BaseModel): | |
| events: List[EventCreate] | |
| class BatchUpdateResult(BaseModel): | |
| success_count: int | |
| failed_ids: List[int] = [] | |
| message: str | |
| # --- FAQ endpoints --- | |
| async def get_faqs( | |
| skip: int = 0, | |
| limit: int = 100, | |
| active_only: bool = False, | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all FAQ items. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **active_only**: If true, only return active items | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key based on query parameters | |
| cache_key = f"faqs_{skip}_{limit}_{active_only}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = faqs_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Build query directly without excessive logging or inspection | |
| query = db.query(FAQItem) | |
| # Add filter if needed | |
| if active_only: | |
| query = query.filter(FAQItem.is_active == True) | |
| # Get total count for pagination | |
| count_query = query.with_entities(func.count(FAQItem.id)) | |
| total_count = count_query.scalar() | |
| # Execute query with pagination | |
| faqs = query.offset(skip).limit(limit).all() | |
| # Convert to Pydantic models | |
| result = [FAQResponse.model_validate(faq, from_attributes=True) for faq in faqs] | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| faqs_cache[cache_key] = result | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_faqs: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in get_faqs: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
| async def create_faq( | |
| faq: FAQCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new FAQ item. | |
| - **question**: Question text | |
| - **answer**: Answer text | |
| - **is_active**: Whether the FAQ is active (default: True) | |
| """ | |
| try: | |
| # Create new FAQ item | |
| db_faq = FAQItem(**faq.model_dump()) | |
| db.add(db_faq) | |
| db.commit() | |
| db.refresh(db_faq) | |
| # Invalidate FAQ cache after creating a new item | |
| faqs_cache.clear() | |
| # Convert to Pydantic model | |
| return FAQResponse.model_validate(db_faq, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in create_faq: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def get_faq( | |
| faq_id: int = Path(..., gt=0), | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get a single FAQ item by ID. | |
| - **faq_id**: ID of the FAQ item | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key | |
| cache_key = f"faq_{faq_id}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = faqs_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Use direct SQL query for better performance on single item lookup | |
| stmt = text("SELECT * FROM faq_item WHERE id = :id") | |
| result = db.execute(stmt, {"id": faq_id}).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="FAQ item not found") | |
| # Create a FAQItem model instance manually | |
| faq = FAQItem() | |
| for key, value in result._mapping.items(): | |
| if hasattr(faq, key): | |
| setattr(faq, key, value) | |
| # Convert to Pydantic model | |
| response = FAQResponse.model_validate(faq, from_attributes=True) | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| faqs_cache[cache_key] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_faq: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def update_faq( | |
| faq_id: int = Path(..., gt=0), | |
| faq_update: FAQUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update an existing FAQ item. | |
| - **faq_id**: ID of the FAQ item to update | |
| - **question**: Updated question text (optional) | |
| - **answer**: Updated answer text (optional) | |
| - **is_active**: Updated active status (optional) | |
| """ | |
| try: | |
| # Check if FAQ exists | |
| faq = db.query(FAQItem).filter(FAQItem.id == faq_id).first() | |
| if faq is None: | |
| raise HTTPException(status_code=404, detail=f"FAQ with ID {faq_id} not found") | |
| # Update fields with optimized dict handling | |
| update_data = faq_update.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| setattr(faq, key, value) | |
| # Commit changes | |
| db.commit() | |
| db.refresh(faq) | |
| # Invalidate specific cache entries | |
| faqs_cache.delete(f"faq_{faq_id}") | |
| faqs_cache.clear() # Clear all list caches | |
| # Convert to Pydantic model | |
| return FAQResponse.model_validate(faq, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in update_faq: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def delete_faq( | |
| faq_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete an FAQ item. | |
| - **faq_id**: ID of the FAQ item to delete | |
| """ | |
| try: | |
| # Use optimized query with proper error handling | |
| result = db.execute( | |
| text("DELETE FROM faq_item WHERE id = :id RETURNING id"), | |
| {"id": faq_id} | |
| ).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="FAQ item not found") | |
| db.commit() | |
| # Invalidate cache entries | |
| faqs_cache.delete(f"faq_{faq_id}") | |
| faqs_cache.clear() # Clear all list caches | |
| return {"status": "success", "message": f"FAQ item {faq_id} deleted"} | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in delete_faq: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # --- Emergency contact endpoints --- | |
| async def get_emergency_contacts( | |
| skip: int = 0, | |
| limit: int = 100, | |
| active_only: bool = False, | |
| section: Optional[str] = None, | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all emergency contacts. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **active_only**: If true, only return active items | |
| - **section**: Filter by section (16.1, 16.2.1, 16.2.2, 16.3) | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key based on query parameters | |
| cache_key = f"emergency_{skip}_{limit}_{active_only}_{section}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = emergencies_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Build query directly without excessive inspection and logging | |
| query = db.query(EmergencyItem) | |
| # Add filters if needed | |
| if active_only: | |
| query = query.filter(EmergencyItem.is_active == True) | |
| if section: | |
| query = query.filter(EmergencyItem.section == section) | |
| # Get total count for pagination info | |
| count_query = query.with_entities(func.count(EmergencyItem.id)) | |
| total_count = count_query.scalar() | |
| # Order by priority for proper sorting | |
| emergency_contacts = query.order_by(EmergencyItem.priority.desc()).offset(skip).limit(limit).all() | |
| # Convert to Pydantic models efficiently | |
| result = [EmergencyResponse.model_validate(contact, from_attributes=True) for contact in emergency_contacts] | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| emergencies_cache[cache_key] = result | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_emergency_contacts: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in get_emergency_contacts: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
| async def get_emergency_sections( | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all available emergency sections. | |
| Returns a list of section information including ID and name. | |
| """ | |
| try: | |
| # Generate cache key | |
| cache_key = "emergency_sections" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = emergencies_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Query distinct sections with their IDs | |
| stmt = text(""" | |
| SELECT DISTINCT section_id, section | |
| FROM emergency_item | |
| WHERE section IS NOT NULL | |
| ORDER BY section_id | |
| """) | |
| result = db.execute(stmt) | |
| # Extract section info | |
| sections = [{"id": row[0], "name": row[1]} for row in result] | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| emergencies_cache[cache_key] = sections | |
| return sections | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_emergency_sections: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in get_emergency_sections: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
| async def get_emergency_contacts_by_section_id( | |
| section_id: int = Path(..., description="Section ID (1, 2, 3, or 4)"), | |
| active_only: bool = True, | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get emergency contacts for a specific section ID. | |
| - **section_id**: Section ID (1: Tourist support, 2: Emergency numbers, 3: Emergency situations, 4: Tourist scams) | |
| - **active_only**: If true, only return active items | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key based on query parameters | |
| cache_key = f"emergency_section_id_{section_id}_{active_only}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = emergencies_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Build query | |
| query = db.query(EmergencyItem).filter(EmergencyItem.section_id == section_id) | |
| # Add active filter if needed | |
| if active_only: | |
| query = query.filter(EmergencyItem.is_active == True) | |
| # Order by priority for proper sorting | |
| emergency_contacts = query.order_by(EmergencyItem.priority.desc()).all() | |
| # Convert to Pydantic models efficiently | |
| result = [EmergencyResponse.model_validate(contact, from_attributes=True) for contact in emergency_contacts] | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| emergencies_cache[cache_key] = result | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_emergency_contacts_by_section_id: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in get_emergency_contacts_by_section_id: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
| async def create_emergency_contact( | |
| emergency: EmergencyCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new emergency contact. | |
| - **name**: Contact name | |
| - **phone_number**: Phone number | |
| - **description**: Description (optional) | |
| - **address**: Address (optional) | |
| - **location**: Location coordinates (optional) | |
| - **priority**: Priority order (default: 0) | |
| - **is_active**: Whether the contact is active (default: True) | |
| """ | |
| try: | |
| db_emergency = EmergencyItem(**emergency.model_dump()) | |
| db.add(db_emergency) | |
| db.commit() | |
| db.refresh(db_emergency) | |
| # Invalidate emergency cache after creating a new item | |
| emergencies_cache.clear() | |
| # Convert SQLAlchemy model to Pydantic model before returning | |
| result = EmergencyResponse.model_validate(db_emergency, from_attributes=True) | |
| return result | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in create_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def get_emergency_contact( | |
| emergency_id: int = Path(..., gt=0), | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get a single emergency contact by ID. | |
| - **emergency_id**: ID of the emergency contact | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key | |
| cache_key = f"emergency_{emergency_id}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = emergencies_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Use direct SQL query for better performance on single item lookup | |
| stmt = text("SELECT * FROM emergency_item WHERE id = :id") | |
| result = db.execute(stmt, {"id": emergency_id}).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Emergency contact not found") | |
| # Create an EmergencyItem model instance manually | |
| emergency = EmergencyItem() | |
| for key, value in result._mapping.items(): | |
| if hasattr(emergency, key): | |
| setattr(emergency, key, value) | |
| # Convert to Pydantic model | |
| response = EmergencyResponse.model_validate(emergency, from_attributes=True) | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| emergencies_cache[cache_key] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def update_emergency_contact( | |
| emergency_id: int = Path(..., gt=0), | |
| emergency_update: EmergencyUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update a specific emergency contact. | |
| - **emergency_id**: ID of the emergency contact to update | |
| - **name**: New name (optional) | |
| - **phone_number**: New phone number (optional) | |
| - **description**: New description (optional) | |
| - **address**: New address (optional) | |
| - **location**: New location coordinates (optional) | |
| - **priority**: New priority order (optional) | |
| - **is_active**: New active status (optional) | |
| """ | |
| try: | |
| emergency = db.query(EmergencyItem).filter(EmergencyItem.id == emergency_id).first() | |
| if not emergency: | |
| raise HTTPException(status_code=404, detail="Emergency contact not found") | |
| # Update fields if provided | |
| update_data = emergency_update.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| setattr(emergency, key, value) | |
| db.commit() | |
| db.refresh(emergency) | |
| # Invalidate specific cache entries | |
| emergencies_cache.delete(f"emergency_{emergency_id}") | |
| emergencies_cache.clear() # Clear all list caches | |
| # Convert to Pydantic model | |
| return EmergencyResponse.model_validate(emergency, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in update_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def delete_emergency_contact( | |
| emergency_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete a specific emergency contact. | |
| - **emergency_id**: ID of the emergency contact to delete | |
| """ | |
| try: | |
| # Use optimized direct SQL with RETURNING for better performance | |
| result = db.execute( | |
| text("DELETE FROM emergency_item WHERE id = :id RETURNING id"), | |
| {"id": emergency_id} | |
| ).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Emergency contact not found") | |
| db.commit() | |
| # Invalidate cache entries | |
| emergencies_cache.delete(f"emergency_{emergency_id}") | |
| emergencies_cache.clear() # Clear all list caches | |
| return {"status": "success", "message": f"Emergency contact {emergency_id} deleted"} | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in delete_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_update_emergency_status( | |
| emergency_ids: List[int] = Body(..., embed=True), | |
| is_active: bool = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the active status of multiple emergency contacts at once. | |
| This is much more efficient than updating emergency contacts one at a time. | |
| """ | |
| try: | |
| if not emergency_ids: | |
| raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
| # Prepare the update statement | |
| stmt = text(""" | |
| UPDATE emergency_item | |
| SET is_active = :is_active, updated_at = NOW() | |
| WHERE id = ANY(:emergency_ids) | |
| RETURNING id | |
| """) | |
| # Execute the update in a single query | |
| result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) | |
| updated_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in emergency_ids if id not in updated_ids] | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(updated_ids), | |
| failed_ids=failed_ids, | |
| message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_update_emergency_status: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_delete_emergency_contacts( | |
| emergency_ids: List[int] = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete multiple emergency contacts at once. | |
| This is much more efficient than deleting emergency contacts one at a time with separate API calls. | |
| """ | |
| try: | |
| if not emergency_ids: | |
| raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
| # Prepare and execute the delete statement with RETURNING to get deleted IDs | |
| stmt = text(""" | |
| DELETE FROM emergency_item | |
| WHERE id = ANY(:emergency_ids) | |
| RETURNING id | |
| """) | |
| result = db.execute(stmt, {"emergency_ids": emergency_ids}) | |
| deleted_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in emergency_ids if id not in deleted_ids] | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(deleted_ids), | |
| failed_ids=failed_ids, | |
| message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_delete_emergency_contacts: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # --- Event endpoints --- | |
| async def get_events( | |
| skip: int = 0, | |
| limit: int = 100, | |
| active_only: bool = False, | |
| featured_only: bool = False, | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all events. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **active_only**: If true, only return active items | |
| - **featured_only**: If true, only return featured items | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key based on query parameters | |
| cache_key = f"events_{skip}_{limit}_{active_only}_{featured_only}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = events_cache.get(cache_key) | |
| if cached_result: | |
| return cached_result | |
| # Build query directly without excessive inspection and logging | |
| query = db.query(EventItem) | |
| # Add filters if needed | |
| if active_only: | |
| query = query.filter(EventItem.is_active == True) | |
| if featured_only: | |
| query = query.filter(EventItem.featured == True) | |
| # To improve performance, first fetch just IDs with COUNT | |
| count_query = query.with_entities(func.count(EventItem.id)) | |
| total_count = count_query.scalar() | |
| # Now get the actual data with pagination | |
| events = query.order_by(EventItem.date_start.desc()).offset(skip).limit(limit).all() | |
| # Convert to Pydantic models efficiently | |
| result = [EventResponse.model_validate(event, from_attributes=True) for event in events] | |
| # Store in cache if caching is enabled (30 seconds TTL for events list) | |
| if use_cache: | |
| events_cache[cache_key] = result | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_events: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in get_events: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
| async def create_event( | |
| event: EventCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new event. | |
| - **name**: Event name | |
| - **description**: Event description | |
| - **address**: Event address | |
| - **location**: Location coordinates (optional) | |
| - **date_start**: Start date and time | |
| - **date_end**: End date and time (optional) | |
| - **price**: Price information (optional JSON object) | |
| - **is_active**: Whether the event is active (default: True) | |
| - **featured**: Whether the event is featured (default: False) | |
| """ | |
| try: | |
| db_event = EventItem(**event.model_dump()) | |
| db.add(db_event) | |
| db.commit() | |
| db.refresh(db_event) | |
| # Invalidate relevant caches on create | |
| events_cache.clear() | |
| # Convert SQLAlchemy model to Pydantic model before returning | |
| result = EventResponse.model_validate(db_event, from_attributes=True) | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in create_event: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def get_event( | |
| event_id: int = Path(..., gt=0), | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get a single event by ID. | |
| - **event_id**: ID of the event | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key | |
| cache_key = f"event_{event_id}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = events_cache.get(cache_key) | |
| if cached_result: | |
| return cached_result | |
| # Use direct SQL query for better performance on single item lookup | |
| # This avoids SQLAlchemy overhead and takes advantage of primary key lookup | |
| stmt = text("SELECT * FROM event_item WHERE id = :id") | |
| result = db.execute(stmt, {"id": event_id}).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Event not found") | |
| # Create an EventItem model instance manually from the result | |
| event = EventItem() | |
| for key, value in result._mapping.items(): | |
| if hasattr(event, key): | |
| setattr(event, key, value) | |
| # Convert SQLAlchemy model to Pydantic model | |
| response = EventResponse.model_validate(event, from_attributes=True) | |
| # Store in cache if caching is enabled (60 seconds TTL for single event) | |
| if use_cache: | |
| events_cache[cache_key] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_event: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| def update_event( | |
| event_id: int, | |
| event: EventUpdate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update an existing event.""" | |
| try: | |
| db_event = db.query(EventItem).filter(EventItem.id == event_id).first() | |
| if not db_event: | |
| raise HTTPException(status_code=404, detail="Event not found") | |
| # Update event fields | |
| for key, value in event.model_dump(exclude_unset=True).items(): | |
| setattr(db_event, key, value) | |
| db.commit() | |
| db.refresh(db_event) | |
| # Invalidate specific cache entries | |
| events_cache.delete(f"event_{event_id}") | |
| events_cache.clear() # Clear all list caches | |
| # Convert SQLAlchemy model to Pydantic model before returning | |
| result = EventResponse.model_validate(db_event, from_attributes=True) | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in update_event: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def delete_event( | |
| event_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Delete a specific event.""" | |
| try: | |
| event = db.query(EventItem).filter(EventItem.id == event_id).first() | |
| if event is None: | |
| raise HTTPException(status_code=404, detail=f"Event with ID {event_id} not found") | |
| db.delete(event) | |
| db.commit() | |
| # Invalidate cache entries | |
| events_cache.delete(f"event_{event_id}") | |
| events_cache.clear() # Clear all list caches | |
| return {"status": "success", "message": f"Event {event_id} deleted"} | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # --- Batch operations for better performance --- | |
| async def batch_create_events( | |
| batch: BatchEventCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create multiple events in a single database transaction. | |
| This is much more efficient than creating events one at a time with separate API calls. | |
| """ | |
| try: | |
| db_events = [] | |
| for event_data in batch.events: | |
| db_event = EventItem(**event_data.model_dump()) | |
| db.add(db_event) | |
| db_events.append(db_event) | |
| # Commit all events in a single transaction | |
| db.commit() | |
| # Refresh all events to get their IDs and other generated fields | |
| for db_event in db_events: | |
| db.refresh(db_event) | |
| # Convert SQLAlchemy models to Pydantic models | |
| result = [EventResponse.model_validate(event, from_attributes=True) for event in db_events] | |
| return result | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_create_events: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_update_event_status( | |
| event_ids: List[int] = Body(..., embed=True), | |
| is_active: bool = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the active status of multiple events at once. | |
| This is much more efficient than updating events one at a time. | |
| """ | |
| try: | |
| if not event_ids: | |
| raise HTTPException(status_code=400, detail="No event IDs provided") | |
| # Prepare the update statement | |
| stmt = text(""" | |
| UPDATE event_item | |
| SET is_active = :is_active, updated_at = NOW() | |
| WHERE id = ANY(:event_ids) | |
| RETURNING id | |
| """) | |
| # Execute the update in a single query | |
| result = db.execute(stmt, {"is_active": is_active, "event_ids": event_ids}) | |
| updated_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in event_ids if id not in updated_ids] | |
| return BatchUpdateResult( | |
| success_count=len(updated_ids), | |
| failed_ids=failed_ids, | |
| message=f"Updated {len(updated_ids)} events" if updated_ids else "No events were updated" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_update_event_status: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_delete_events( | |
| event_ids: List[int] = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete multiple events at once. | |
| This is much more efficient than deleting events one at a time with separate API calls. | |
| """ | |
| try: | |
| if not event_ids: | |
| raise HTTPException(status_code=400, detail="No event IDs provided") | |
| # Prepare and execute the delete statement with RETURNING to get deleted IDs | |
| stmt = text(""" | |
| DELETE FROM event_item | |
| WHERE id = ANY(:event_ids) | |
| RETURNING id | |
| """) | |
| result = db.execute(stmt, {"event_ids": event_ids}) | |
| deleted_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in event_ids if id not in deleted_ids] | |
| return BatchUpdateResult( | |
| success_count=len(deleted_ids), | |
| failed_ids=failed_ids, | |
| message=f"Deleted {len(deleted_ids)} events" if deleted_ids else "No events were deleted" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_delete_events: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # Health check endpoint | |
| async def health_check(db: Session = Depends(get_db)): | |
| """ | |
| Check health of PostgreSQL connection. | |
| """ | |
| try: | |
| # Perform a simple database query to check health | |
| # Use text() to wrap the SQL query for SQLAlchemy 2.0 compatibility | |
| db.execute(text("SELECT 1")).first() | |
| return {"status": "healthy", "message": "PostgreSQL connection is working", "timestamp": datetime.now().isoformat()} | |
| except Exception as e: | |
| logger.error(f"PostgreSQL health check failed: {e}") | |
| raise HTTPException(status_code=503, detail=f"PostgreSQL connection failed: {str(e)}") | |
| # Add BatchFAQCreate class to model definitions | |
| class BatchFAQCreate(BaseModel): | |
| faqs: List[FAQCreate] | |
| # Add after delete_faq endpoint | |
| async def batch_create_faqs( | |
| batch: BatchFAQCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create multiple FAQ items in a single database transaction. | |
| This is much more efficient than creating FAQ items one at a time with separate API calls. | |
| """ | |
| try: | |
| db_faqs = [] | |
| for faq_data in batch.faqs: | |
| db_faq = FAQItem(**faq_data.model_dump()) | |
| db.add(db_faq) | |
| db_faqs.append(db_faq) | |
| # Commit all FAQ items in a single transaction | |
| db.commit() | |
| # Refresh all FAQ items to get their IDs and other generated fields | |
| for db_faq in db_faqs: | |
| db.refresh(db_faq) | |
| # Invalidate FAQ cache | |
| faqs_cache.clear() | |
| # Convert SQLAlchemy models to Pydantic models | |
| result = [FAQResponse.model_validate(faq, from_attributes=True) for faq in db_faqs] | |
| return result | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_create_faqs: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_update_faq_status( | |
| faq_ids: List[int] = Body(..., embed=True), | |
| is_active: bool = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the active status of multiple FAQ items at once. | |
| This is much more efficient than updating FAQ items one at a time. | |
| """ | |
| try: | |
| if not faq_ids: | |
| raise HTTPException(status_code=400, detail="No FAQ IDs provided") | |
| # Prepare the update statement | |
| stmt = text(""" | |
| UPDATE faq_item | |
| SET is_active = :is_active, updated_at = NOW() | |
| WHERE id = ANY(:faq_ids) | |
| RETURNING id | |
| """) | |
| # Execute the update in a single query | |
| result = db.execute(stmt, {"is_active": is_active, "faq_ids": faq_ids}) | |
| updated_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in faq_ids if id not in updated_ids] | |
| # Invalidate FAQ cache | |
| faqs_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(updated_ids), | |
| failed_ids=failed_ids, | |
| message=f"Updated {len(updated_ids)} FAQ items" if updated_ids else "No FAQ items were updated" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_update_faq_status: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_delete_faqs( | |
| faq_ids: List[int] = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete multiple FAQ items at once. | |
| This is much more efficient than deleting FAQ items one at a time with separate API calls. | |
| """ | |
| try: | |
| if not faq_ids: | |
| raise HTTPException(status_code=400, detail="No FAQ IDs provided") | |
| # Prepare and execute the delete statement with RETURNING to get deleted IDs | |
| stmt = text(""" | |
| DELETE FROM faq_item | |
| WHERE id = ANY(:faq_ids) | |
| RETURNING id | |
| """) | |
| result = db.execute(stmt, {"faq_ids": faq_ids}) | |
| deleted_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in faq_ids if id not in deleted_ids] | |
| # Invalidate FAQ cache | |
| faqs_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(deleted_ids), | |
| failed_ids=failed_ids, | |
| message=f"Deleted {len(deleted_ids)} FAQ items" if deleted_ids else "No FAQ items were deleted" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_delete_faqs: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # Add BatchEmergencyCreate class to the Pydantic models section | |
| class BatchEmergencyCreate(BaseModel): | |
| emergency_contacts: List[EmergencyCreate] | |
| async def batch_create_emergency_contacts( | |
| batch: BatchEmergencyCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create multiple emergency contacts in a single database transaction. | |
| This is much more efficient than creating emergency contacts one at a time with separate API calls. | |
| """ | |
| try: | |
| db_emergency_contacts = [] | |
| for emergency_data in batch.emergency_contacts: | |
| db_emergency = EmergencyItem(**emergency_data.model_dump()) | |
| db.add(db_emergency) | |
| db_emergency_contacts.append(db_emergency) | |
| # Commit all emergency contacts in a single transaction | |
| db.commit() | |
| # Refresh all items to get their IDs and other generated fields | |
| for db_emergency in db_emergency_contacts: | |
| db.refresh(db_emergency) | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| # Convert SQLAlchemy models to Pydantic models | |
| result = [EmergencyResponse.model_validate(emergency, from_attributes=True) for emergency in db_emergency_contacts] | |
| return result | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_create_emergency_contacts: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_update_emergency_status( | |
| emergency_ids: List[int] = Body(..., embed=True), | |
| is_active: bool = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the active status of multiple emergency contacts at once. | |
| This is much more efficient than updating emergency contacts one at a time. | |
| """ | |
| try: | |
| if not emergency_ids: | |
| raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
| # Prepare the update statement | |
| stmt = text(""" | |
| UPDATE emergency_item | |
| SET is_active = :is_active, updated_at = NOW() | |
| WHERE id = ANY(:emergency_ids) | |
| RETURNING id | |
| """) | |
| # Execute the update in a single query | |
| result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) | |
| updated_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in emergency_ids if id not in updated_ids] | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(updated_ids), | |
| failed_ids=failed_ids, | |
| message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_update_emergency_status: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_delete_emergency_contacts( | |
| emergency_ids: List[int] = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete multiple emergency contacts at once. | |
| This is much more efficient than deleting emergency contacts one at a time with separate API calls. | |
| """ | |
| try: | |
| if not emergency_ids: | |
| raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
| # Prepare and execute the delete statement with RETURNING to get deleted IDs | |
| stmt = text(""" | |
| DELETE FROM emergency_item | |
| WHERE id = ANY(:emergency_ids) | |
| RETURNING id | |
| """) | |
| result = db.execute(stmt, {"emergency_ids": emergency_ids}) | |
| deleted_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in emergency_ids if id not in deleted_ids] | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(deleted_ids), | |
| failed_ids=failed_ids, | |
| message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_delete_emergency_contacts: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # --- About Pixity endpoints --- | |
| async def get_about_pixity( | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get the About Pixity information. | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = about_pixity_cache.get("about_pixity") | |
| if cached_result: | |
| logger.info("Cache hit for about_pixity") | |
| return cached_result | |
| # Get the first record (or create if none exists) | |
| about = db.query(AboutPixity).first() | |
| if not about: | |
| # Create default content if none exists | |
| about = AboutPixity( | |
| content="""PiXity is your smart, AI-powered local companion designed to help foreigners navigate life in any city of Vietnam with ease, starting with Da Nang. From finding late-night eats to handling visas, housing, and healthcare, PiXity bridges the gap in language, culture, and local know-how — so you can explore the city like a true insider. | |
| PiXity is proudly built by PiX.teq, the tech team behind PiX — a multidisciplinary collective based in Da Nang. | |
| X: x.com/pixity_bot | |
| Instagram: instagram.com/pixity.aibot/ | |
| Tiktok: tiktok.com/@pixity.aibot""" | |
| ) | |
| db.add(about) | |
| db.commit() | |
| db.refresh(about) | |
| # Convert to Pydantic model | |
| response = InfoContentResponse( | |
| id=about.id, | |
| content=about.content, | |
| created_at=about.created_at, | |
| updated_at=about.updated_at | |
| ) | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| about_pixity_cache["about_pixity"] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_about_pixity: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def update_about_pixity( | |
| data: InfoContentUpdate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the About Pixity information. | |
| - **content**: New content text | |
| """ | |
| try: | |
| # Get the first record (or create if none exists) | |
| about = db.query(AboutPixity).first() | |
| if not about: | |
| # Create new record if none exists | |
| about = AboutPixity(content=data.content) | |
| db.add(about) | |
| else: | |
| # Update existing record | |
| about.content = data.content | |
| db.commit() | |
| db.refresh(about) | |
| # Invalidate cache | |
| about_pixity_cache.clear() | |
| # Convert to Pydantic model | |
| response = InfoContentResponse( | |
| id=about.id, | |
| content=about.content, | |
| created_at=about.created_at, | |
| updated_at=about.updated_at | |
| ) | |
| return response | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in update_about_pixity: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| # --- Da Nang Bucket List Pydantic models --- | |
| class DaNangBucketListBase(BaseModel): | |
| content: str | |
| class DaNangBucketListResponse(DaNangBucketListBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| class DaNangBucketListCreate(DaNangBucketListBase): | |
| pass | |
| class DaNangBucketListUpdate(BaseModel): | |
| content: str | |
| # --- Da Nang Bucket List Endpoints --- | |
| async def get_danang_bucket_list( | |
| db: Session = Depends(get_db), | |
| use_cache: bool = True | |
| ): | |
| """ | |
| Retrieve the Da Nang Bucket List information. | |
| If none exists, creates a default entry. | |
| """ | |
| cache_key = "danang_bucket_list" | |
| # Try to get from cache if caching is enabled | |
| if use_cache and cache_key in danang_bucket_list_cache: | |
| cached_result = danang_bucket_list_cache[cache_key] | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| try: | |
| # Try to get the first bucket list entry | |
| db_bucket_list = db.query(DaNangBucketList).first() | |
| # If no entry exists, create a default one | |
| if not db_bucket_list: | |
| default_content = json.dumps({ | |
| "title": "Da Nang Bucket List", | |
| "description": "Must-visit places and experiences in Da Nang", | |
| "items": [ | |
| {"name": "Ba Na Hills", "description": "Visit the famous Golden Bridge"}, | |
| {"name": "Marble Mountains", "description": "Explore caves and temples"}, | |
| {"name": "My Khe Beach", "description": "Relax at one of the most beautiful beaches in Vietnam"}, | |
| {"name": "Dragon Bridge", "description": "Watch the fire-breathing show on weekends"}, | |
| {"name": "Son Tra Peninsula", "description": "See the Lady Buddha statue and lookout point"} | |
| ] | |
| }) | |
| new_bucket_list = DaNangBucketList(content=default_content) | |
| db.add(new_bucket_list) | |
| db.commit() | |
| db.refresh(new_bucket_list) | |
| db_bucket_list = new_bucket_list | |
| # Convert to Pydantic model | |
| response = DaNangBucketListResponse.model_validate(db_bucket_list, from_attributes=True) | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| danang_bucket_list_cache[cache_key] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| error_msg = f"Database error in get_danang_bucket_list: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| async def update_danang_bucket_list( | |
| bucket_list_data: DaNangBucketListUpdate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the Da Nang Bucket List information. | |
| If none exists, creates a new entry. | |
| """ | |
| try: | |
| # Try to get the first bucket list entry | |
| db_bucket_list = db.query(DaNangBucketList).first() | |
| # If no entry exists, create a new one | |
| if not db_bucket_list: | |
| db_bucket_list = DaNangBucketList(content=bucket_list_data.content) | |
| db.add(db_bucket_list) | |
| else: | |
| # Update existing entry | |
| db_bucket_list.content = bucket_list_data.content | |
| db_bucket_list.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(db_bucket_list) | |
| # Clear cache | |
| if "danang_bucket_list" in danang_bucket_list_cache: | |
| del danang_bucket_list_cache["danang_bucket_list"] | |
| # Convert to Pydantic model | |
| return DaNangBucketListResponse.model_validate(db_bucket_list, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| error_msg = f"Database error in update_danang_bucket_list: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| # --- Solana Summit Pydantic models --- | |
| class SolanaSummitBase(BaseModel): | |
| content: str | |
| class SolanaSummitResponse(SolanaSummitBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| class SolanaSummitCreate(SolanaSummitBase): | |
| pass | |
| class SolanaSummitUpdate(BaseModel): | |
| content: str | |
| # --- Solana Summit Endpoints --- | |
| async def get_solana_summit( | |
| db: Session = Depends(get_db), | |
| use_cache: bool = True | |
| ): | |
| """ | |
| Retrieve the Solana Summit information. | |
| If none exists, creates a default entry. | |
| """ | |
| cache_key = "solana_summit" | |
| # Try to get from cache if caching is enabled | |
| if use_cache and cache_key in solana_summit_cache: | |
| cached_result = solana_summit_cache[cache_key] | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| try: | |
| # Try to get the first solana summit entry | |
| db_solana_summit = db.query(SolanaSummit).first() | |
| # If no entry exists, create a default one | |
| if not db_solana_summit: | |
| default_content = json.dumps({ | |
| "title": "Solana Summit Vietnam", | |
| "description": "Information about Solana Summit Vietnam event in Da Nang", | |
| "date": "2023-11-04T09:00:00+07:00", | |
| "location": "Hyatt Regency, Da Nang", | |
| "details": "The Solana Summit is a gathering of developers, entrepreneurs, and enthusiasts in the Solana ecosystem.", | |
| "agenda": [ | |
| {"time": "09:00", "activity": "Registration & Networking"}, | |
| {"time": "10:00", "activity": "Opening Keynote"}, | |
| {"time": "12:00", "activity": "Lunch Break"}, | |
| {"time": "13:30", "activity": "Developer Workshops"}, | |
| {"time": "17:00", "activity": "Closing Remarks & Networking"} | |
| ], | |
| "registration_url": "https://example.com/solana-summit-registration" | |
| }) | |
| new_solana_summit = SolanaSummit(content=default_content) | |
| db.add(new_solana_summit) | |
| db.commit() | |
| db.refresh(new_solana_summit) | |
| db_solana_summit = new_solana_summit | |
| # Convert to Pydantic model | |
| response = SolanaSummitResponse.model_validate(db_solana_summit, from_attributes=True) | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| solana_summit_cache[cache_key] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| error_msg = f"Database error in get_solana_summit: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| async def update_solana_summit( | |
| summit_data: SolanaSummitUpdate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the Solana Summit information. | |
| If none exists, creates a new entry. | |
| """ | |
| try: | |
| # Try to get the first solana summit entry | |
| db_solana_summit = db.query(SolanaSummit).first() | |
| # If no entry exists, create a new one | |
| if not db_solana_summit: | |
| db_solana_summit = SolanaSummit(content=summit_data.content) | |
| db.add(db_solana_summit) | |
| else: | |
| # Update existing entry | |
| db_solana_summit.content = summit_data.content | |
| db_solana_summit.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(db_solana_summit) | |
| # Clear cache | |
| if "solana_summit" in solana_summit_cache: | |
| del solana_summit_cache["solana_summit"] | |
| # Convert to Pydantic model | |
| return SolanaSummitResponse.model_validate(db_solana_summit, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| error_msg = f"Database error in update_solana_summit: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| # --- API Key models and endpoints --- | |
| class ApiKeyBase(BaseModel): | |
| key_type: str | |
| key_value: str | |
| description: Optional[str] = None | |
| is_active: bool = True | |
| class ApiKeyCreate(ApiKeyBase): | |
| pass | |
| class ApiKeyUpdate(BaseModel): | |
| key_type: Optional[str] = None | |
| key_value: Optional[str] = None | |
| description: Optional[str] = None | |
| is_active: Optional[bool] = None | |
| class ApiKeyResponse(ApiKeyBase): | |
| id: int | |
| created_at: datetime | |
| last_used: Optional[datetime] = None | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_api_keys( | |
| skip: int = 0, | |
| limit: int = 100, | |
| active_only: bool = False, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all API keys. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **active_only**: If true, only return active keys | |
| """ | |
| try: | |
| query = db.query(ApiKey) | |
| if active_only: | |
| query = query.filter(ApiKey.is_active == True) | |
| api_keys = query.offset(skip).limit(limit).all() | |
| return [ApiKeyResponse.model_validate(key, from_attributes=True) for key in api_keys] | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error retrieving API keys: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving API keys: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving API keys: {str(e)}") | |
| async def create_api_key( | |
| api_key: ApiKeyCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new API key. | |
| """ | |
| try: | |
| # Create API key object | |
| db_api_key = ApiKey( | |
| key_type=api_key.key_type, | |
| key_value=api_key.key_value, | |
| description=api_key.description, | |
| is_active=api_key.is_active | |
| ) | |
| db.add(db_api_key) | |
| db.commit() | |
| db.refresh(db_api_key) | |
| return ApiKeyResponse.model_validate(db_api_key, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error creating API key: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error creating API key: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error creating API key: {str(e)}") | |
| async def get_api_key( | |
| api_key_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get API key by ID. | |
| """ | |
| try: | |
| api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() | |
| if not api_key: | |
| raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") | |
| return ApiKeyResponse.model_validate(api_key, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving API key: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving API key: {str(e)}") | |
| async def update_api_key( | |
| api_key_id: int = Path(..., gt=0), | |
| api_key_update: ApiKeyUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update API key details. | |
| """ | |
| try: | |
| db_api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() | |
| if not db_api_key: | |
| raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") | |
| # Update fields if provided | |
| if api_key_update.key_type is not None: | |
| db_api_key.key_type = api_key_update.key_type | |
| if api_key_update.key_value is not None: | |
| db_api_key.key_value = api_key_update.key_value | |
| if api_key_update.description is not None: | |
| db_api_key.description = api_key_update.description | |
| if api_key_update.is_active is not None: | |
| db_api_key.is_active = api_key_update.is_active | |
| db.commit() | |
| db.refresh(db_api_key) | |
| return ApiKeyResponse.model_validate(db_api_key, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error updating API key: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error updating API key: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error updating API key: {str(e)}") | |
| async def delete_api_key( | |
| api_key_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete API key. | |
| """ | |
| try: | |
| db_api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() | |
| if not db_api_key: | |
| raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") | |
| db.delete(db_api_key) | |
| db.commit() | |
| return {"message": f"API key with ID {api_key_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting API key: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting API key: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting API key: {str(e)}") | |
| async def validate_api_key( | |
| key: str, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Validate an API key and update its last_used timestamp. | |
| """ | |
| try: | |
| db_api_key = db.query(ApiKey).filter(ApiKey.key_value == key, ApiKey.is_active == True).first() | |
| if not db_api_key: | |
| return {"valid": False, "message": "Invalid or inactive API key"} | |
| # Update last used timestamp | |
| db_api_key.last_used = datetime.now() | |
| db.commit() | |
| return { | |
| "valid": True, | |
| "key_type": db_api_key.key_type, | |
| "id": db_api_key.id, | |
| "message": "API key is valid" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error validating API key: {e}") | |
| logger.error(traceback.format_exc()) | |
| return {"valid": False, "message": f"Error validating API key: {str(e)}"} | |
| # --- Vector Database models and endpoints --- | |
| class VectorDatabaseBase(BaseModel): | |
| name: str | |
| description: Optional[str] = None | |
| pinecone_index: str | |
| api_key_id: Optional[int] = None # Make api_key_id optional to handle NULL values | |
| status: str = "active" | |
| class VectorDatabaseCreate(VectorDatabaseBase): | |
| api_key_id: int # Keep this required for new databases | |
| pass | |
| class VectorDatabaseUpdate(BaseModel): | |
| name: Optional[str] = None | |
| description: Optional[str] = None | |
| pinecone_index: Optional[str] = None | |
| api_key_id: Optional[int] = None | |
| status: Optional[str] = None | |
| class VectorDatabaseResponse(BaseModel): | |
| name: str | |
| description: Optional[str] = None | |
| pinecone_index: str | |
| api_key_id: Optional[int] = None # Make api_key_id optional to handle NULL values | |
| status: str | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| message: Optional[str] = None # Add message field for notifications | |
| model_config = ConfigDict(from_attributes=True) | |
| class VectorDatabaseDetailResponse(BaseModel): | |
| id: int | |
| name: str | |
| description: Optional[str] = None | |
| pinecone_index: str | |
| status: str | |
| created_at: datetime | |
| updated_at: datetime | |
| document_count: int | |
| embedded_count: int | |
| pending_count: int | |
| message: Optional[str] = None # Add message field for notifications | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_vector_databases( | |
| skip: int = 0, | |
| limit: int = 100, | |
| status: Optional[str] = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all vector databases. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **status**: Filter by status (e.g., 'active', 'inactive') | |
| """ | |
| try: | |
| query = db.query(VectorDatabase) | |
| if status: | |
| query = query.filter(VectorDatabase.status == status) | |
| vector_dbs = query.offset(skip).limit(limit).all() | |
| return [VectorDatabaseResponse.model_validate(db_item, from_attributes=True) for db_item in vector_dbs] | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error retrieving vector databases: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving vector databases: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving vector databases: {str(e)}") | |
| async def create_vector_database( | |
| vector_db: VectorDatabaseCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new vector database. If the specified Pinecone index doesn't exist, it will be created automatically. | |
| """ | |
| try: | |
| # Check if a database with the same name already exists | |
| existing_db = db.query(VectorDatabase).filter(VectorDatabase.name == vector_db.name).first() | |
| if existing_db: | |
| raise HTTPException(status_code=400, detail=f"Vector database with name '{vector_db.name}' already exists") | |
| # Check if the API key exists | |
| api_key = db.query(ApiKey).filter(ApiKey.id == vector_db.api_key_id).first() | |
| if not api_key: | |
| raise HTTPException(status_code=400, detail=f"API key with ID {vector_db.api_key_id} not found") | |
| # Initialize Pinecone client with the API key | |
| from pinecone import Pinecone, ServerlessSpec | |
| pc_client = Pinecone(api_key=api_key.key_value) | |
| # Check if the index exists | |
| index_list = pc_client.list_indexes() | |
| index_names = index_list.names() if hasattr(index_list, 'names') else [] | |
| index_exists = vector_db.pinecone_index in index_names | |
| index_created = False | |
| if not index_exists: | |
| # Index doesn't exist - try to create it | |
| try: | |
| logger.info(f"Pinecone index '{vector_db.pinecone_index}' does not exist. Attempting to create it automatically.") | |
| # Create the index with standard parameters | |
| pc_client.create_index( | |
| name=vector_db.pinecone_index, | |
| dimension=1536, # Standard OpenAI embedding dimension | |
| metric="cosine", # Most common similarity metric | |
| spec=ServerlessSpec( | |
| cloud="aws", | |
| region="us-east-1" # Use a standard region that works with the free tier | |
| ) | |
| ) | |
| logger.info(f"Successfully created Pinecone index '{vector_db.pinecone_index}'") | |
| index_created = True | |
| # Allow some time for the index to initialize | |
| import time | |
| time.sleep(5) | |
| except Exception as create_error: | |
| logger.error(f"Failed to create Pinecone index '{vector_db.pinecone_index}': {create_error}") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to create Pinecone index '{vector_db.pinecone_index}': {str(create_error)}" | |
| ) | |
| # Verify we can connect to the index (whether existing or newly created) | |
| try: | |
| index = pc_client.Index(vector_db.pinecone_index) | |
| # Try to get stats to verify connection | |
| stats = index.describe_index_stats() | |
| # Create success message based on whether we created the index or used an existing one | |
| if index_created: | |
| success_message = f"Successfully created and connected to new Pinecone index '{vector_db.pinecone_index}'" | |
| else: | |
| success_message = f"Successfully connected to existing Pinecone index '{vector_db.pinecone_index}'" | |
| logger.info(f"{success_message}: {stats}") | |
| except Exception as e: | |
| error_message = f"Error connecting to Pinecone index '{vector_db.pinecone_index}': {str(e)}" | |
| logger.error(error_message) | |
| raise HTTPException(status_code=400, detail=error_message) | |
| # Create new vector database | |
| db_vector_db = VectorDatabase(**vector_db.model_dump()) | |
| db.add(db_vector_db) | |
| db.commit() | |
| db.refresh(db_vector_db) | |
| # Return response with additional info about index creation | |
| response_data = VectorDatabaseResponse.model_validate(db_vector_db, from_attributes=True).model_dump() | |
| # Add a message to the response indicating whether the index was created or existed | |
| if index_created: | |
| response_data["message"] = f"Created new Pinecone index '{vector_db.pinecone_index}' automatically" | |
| else: | |
| response_data["message"] = f"Using existing Pinecone index '{vector_db.pinecone_index}'" | |
| return VectorDatabaseResponse.model_validate(response_data) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error creating vector database: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error creating vector database: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error creating vector database: {str(e)}") | |
| async def get_vector_database( | |
| vector_db_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get vector database by ID. | |
| """ | |
| try: | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
| if not vector_db: | |
| raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") | |
| return VectorDatabaseResponse.model_validate(vector_db, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving vector database: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving vector database: {str(e)}") | |
| async def update_vector_database( | |
| vector_db_id: int = Path(..., gt=0), | |
| vector_db_update: VectorDatabaseUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update vector database details. | |
| """ | |
| try: | |
| db_vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
| if not db_vector_db: | |
| raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") | |
| # Check name uniqueness if updating name | |
| if vector_db_update.name and vector_db_update.name != db_vector_db.name: | |
| existing_db = db.query(VectorDatabase).filter(VectorDatabase.name == vector_db_update.name).first() | |
| if existing_db: | |
| raise HTTPException(status_code=400, detail=f"Vector database with name '{vector_db_update.name}' already exists") | |
| # Check if API key exists if updating API key ID | |
| if vector_db_update.api_key_id: | |
| api_key = db.query(ApiKey).filter(ApiKey.id == vector_db_update.api_key_id).first() | |
| if not api_key: | |
| raise HTTPException(status_code=400, detail=f"API key with ID {vector_db_update.api_key_id} not found") | |
| # Update fields if provided | |
| update_data = vector_db_update.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| if value is not None: | |
| setattr(db_vector_db, key, value) | |
| db.commit() | |
| db.refresh(db_vector_db) | |
| return VectorDatabaseResponse.model_validate(db_vector_db, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error updating vector database: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error updating vector database: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error updating vector database: {str(e)}") | |
| async def delete_vector_database( | |
| vector_db_id: int = Path(..., gt=0), | |
| force: bool = Query(False, description="Force deletion even if documents exist"), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete vector database. | |
| - **force**: If true, will delete all associated documents first | |
| """ | |
| try: | |
| db_vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
| if not db_vector_db: | |
| raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") | |
| # Check if there are documents associated with this database | |
| doc_count = db.query(func.count(Document.id)).filter(Document.vector_database_id == vector_db_id).scalar() | |
| if doc_count > 0 and not force: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Cannot delete vector database with {doc_count} documents. Use force=true to delete anyway." | |
| ) | |
| # If force=true, delete all associated documents first | |
| if force and doc_count > 0: | |
| # Delete all documents associated with this database | |
| db.query(Document).filter(Document.vector_database_id == vector_db_id).delete() | |
| # Delete all vector statuses associated with this database | |
| db.query(VectorStatus).filter(VectorStatus.vector_database_id == vector_db_id).delete() | |
| # Delete all engine-vector-db associations | |
| db.query(EngineVectorDb).filter(EngineVectorDb.vector_database_id == vector_db_id).delete() | |
| # Delete the vector database | |
| db.delete(db_vector_db) | |
| db.commit() | |
| return {"message": f"Vector database with ID {vector_db_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting vector database: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting vector database: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting vector database: {str(e)}") | |
| async def get_vector_database_info( | |
| vector_db_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get detailed information about a vector database including document counts. | |
| Also verifies connectivity to the Pinecone index. | |
| """ | |
| try: | |
| # Get the vector database | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
| if not vector_db: | |
| raise HTTPException(status_code=404, detail="Vector database not found") | |
| # Count total documents | |
| total_docs = db.query(func.count(Document.id)).filter( | |
| Document.vector_database_id == vector_db_id | |
| ).scalar() | |
| # Count embedded documents | |
| embedded_docs = db.query(func.count(Document.id)).filter( | |
| Document.vector_database_id == vector_db_id, | |
| Document.is_embedded == True | |
| ).scalar() | |
| # Count pending documents (not embedded) | |
| pending_docs = db.query(func.count(Document.id)).filter( | |
| Document.vector_database_id == vector_db_id, | |
| Document.is_embedded == False | |
| ).scalar() | |
| # Verify Pinecone index connectivity if API key is available | |
| message = None | |
| if vector_db.api_key_id: | |
| try: | |
| # Get the API key | |
| api_key = db.query(ApiKey).filter(ApiKey.id == vector_db.api_key_id).first() | |
| if api_key: | |
| # Initialize Pinecone client with the API key | |
| from pinecone import Pinecone | |
| pc_client = Pinecone(api_key=api_key.key_value) | |
| # Check if the index exists | |
| index_list = pc_client.list_indexes() | |
| index_names = index_list.names() if hasattr(index_list, 'names') else [] | |
| if vector_db.pinecone_index in index_names: | |
| # Try to connect to the index | |
| index = pc_client.Index(vector_db.pinecone_index) | |
| stats = index.describe_index_stats() | |
| message = f"Pinecone index '{vector_db.pinecone_index}' is operational with {stats.get('total_vector_count', 0)} vectors" | |
| logger.info(f"Successfully connected to Pinecone index '{vector_db.pinecone_index}': {stats}") | |
| else: | |
| message = f"Pinecone index '{vector_db.pinecone_index}' does not exist. Available indexes: {', '.join(index_names)}" | |
| logger.warning(message) | |
| else: | |
| message = f"API key with ID {vector_db.api_key_id} not found" | |
| logger.warning(message) | |
| except Exception as e: | |
| message = f"Error connecting to Pinecone: {str(e)}" | |
| logger.error(message) | |
| else: | |
| message = "No API key associated with this vector database" | |
| logger.warning(message) | |
| # Create response with added counts | |
| result = VectorDatabaseDetailResponse( | |
| id=vector_db.id, | |
| name=vector_db.name, | |
| description=vector_db.description, | |
| pinecone_index=vector_db.pinecone_index, | |
| status=vector_db.status, | |
| created_at=vector_db.created_at, | |
| updated_at=vector_db.updated_at, | |
| document_count=total_docs or 0, | |
| embedded_count=embedded_docs or 0, | |
| pending_count=pending_docs or 0, | |
| message=message | |
| ) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting vector database info: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error getting vector database info: {str(e)}") | |
| # --- Document models and endpoints --- | |
| class DocumentBase(BaseModel): | |
| name: str | |
| vector_database_id: int | |
| class DocumentCreate(BaseModel): | |
| name: str | |
| vector_database_id: int | |
| class DocumentUpdate(BaseModel): | |
| name: Optional[str] = None | |
| class DocumentResponse(BaseModel): | |
| id: int | |
| name: str | |
| file_type: str | |
| content_type: Optional[str] = None | |
| size: int | |
| created_at: datetime | |
| updated_at: datetime | |
| vector_database_id: int | |
| vector_database_name: Optional[str] = None | |
| is_embedded: bool | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_documents( | |
| skip: int = 0, | |
| limit: int = 100, | |
| vector_database_id: Optional[int] = None, | |
| is_embedded: Optional[bool] = None, | |
| file_type: Optional[str] = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all documents with optional filtering. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **vector_database_id**: Filter by vector database ID | |
| - **is_embedded**: Filter by embedding status | |
| - **file_type**: Filter by file type | |
| """ | |
| try: | |
| query = db.query(Document) | |
| # Apply filters if provided | |
| if vector_database_id is not None: | |
| query = query.filter(Document.vector_database_id == vector_database_id) | |
| if is_embedded is not None: | |
| query = query.filter(Document.is_embedded == is_embedded) | |
| if file_type is not None: | |
| query = query.filter(Document.file_type == file_type) | |
| # Execute query with pagination | |
| documents = query.offset(skip).limit(limit).all() | |
| # Add vector database name | |
| result = [] | |
| for doc in documents: | |
| # Create a dictionary from the document for easier manipulation | |
| doc_dict = { | |
| "id": doc.id, | |
| "name": doc.name, | |
| "file_type": doc.file_type, | |
| "content_type": doc.content_type, | |
| "size": doc.size, | |
| "created_at": doc.created_at, | |
| "updated_at": doc.updated_at, | |
| "vector_database_id": doc.vector_database_id or 0, # Handle NULL values | |
| "is_embedded": doc.is_embedded | |
| } | |
| # Get vector database name if not already populated | |
| vector_db_name = None | |
| if doc.vector_database_id is not None: | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == doc.vector_database_id).first() | |
| vector_db_name = vector_db.name if vector_db else f"db_{doc.vector_database_id}" | |
| else: | |
| vector_db_name = "No Database" | |
| doc_dict["vector_database_name"] = vector_db_name | |
| # Create Pydantic model from dictionary | |
| doc_response = DocumentResponse(**doc_dict) | |
| result.append(doc_response) | |
| return result | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error retrieving documents: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving documents: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving documents: {str(e)}") | |
| async def get_document( | |
| document_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get document by ID. | |
| """ | |
| try: | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if not document: | |
| raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
| # Get vector database name | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == document.vector_database_id).first() | |
| vector_db_name = vector_db.name if vector_db else f"db_{document.vector_database_id}" | |
| # Create response with vector database name | |
| result = DocumentResponse.model_validate(document, from_attributes=True) | |
| result.vector_database_name = vector_db_name | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving document: {str(e)}") | |
| async def get_document_content( | |
| document_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get document content (file) by document ID. | |
| Returns the binary content with the appropriate Content-Type header. | |
| """ | |
| try: | |
| # Get document to check if it exists and get metadata | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if not document: | |
| raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
| # Get document content from document_content table | |
| document_content = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).first() | |
| if not document_content or not document_content.file_content: | |
| raise HTTPException(status_code=404, detail=f"Content for document with ID {document_id} not found") | |
| # Determine content type | |
| content_type = document.content_type if hasattr(document, 'content_type') and document.content_type else "application/octet-stream" | |
| # Return binary content with correct content type | |
| return Response( | |
| content=document_content.file_content, | |
| media_type=content_type, | |
| headers={"Content-Disposition": f"attachment; filename=\"{document.name}\""} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving document content: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving document content: {str(e)}") | |
| # --- Telegram Bot models and endpoints --- | |
| class TelegramBotBase(BaseModel): | |
| name: str | |
| username: str | |
| token: str | |
| status: str = "inactive" | |
| class TelegramBotCreate(TelegramBotBase): | |
| pass | |
| class TelegramBotUpdate(BaseModel): | |
| name: Optional[str] = None | |
| username: Optional[str] = None | |
| token: Optional[str] = None | |
| status: Optional[str] = None | |
| class TelegramBotResponse(TelegramBotBase): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_telegram_bot( | |
| bot_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get Telegram bot by ID. | |
| """ | |
| try: | |
| bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
| if not bot: | |
| raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
| return TelegramBotResponse.model_validate(bot, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving Telegram bot: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving Telegram bot: {str(e)}") | |
| async def update_telegram_bot( | |
| bot_id: int = Path(..., gt=0), | |
| bot_update: TelegramBotUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update Telegram bot details. | |
| """ | |
| try: | |
| db_bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
| if not db_bot: | |
| raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
| # Check if new username conflicts with existing bots | |
| if bot_update.username and bot_update.username != db_bot.username: | |
| existing_bot = db.query(TelegramBot).filter(TelegramBot.username == bot_update.username).first() | |
| if existing_bot: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Telegram bot with username '{bot_update.username}' already exists" | |
| ) | |
| # Update fields if provided | |
| update_data = bot_update.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| if value is not None: | |
| setattr(db_bot, key, value) | |
| db.commit() | |
| db.refresh(db_bot) | |
| return TelegramBotResponse.model_validate(db_bot, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error updating Telegram bot: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error updating Telegram bot: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error updating Telegram bot: {str(e)}") | |
| async def delete_telegram_bot( | |
| bot_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete Telegram bot. | |
| """ | |
| try: | |
| db_bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
| if not db_bot: | |
| raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
| # Check if bot is associated with any engines | |
| bot_engines = db.query(BotEngine).filter(BotEngine.bot_id == bot_id).all() | |
| if bot_engines: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Cannot delete bot as it is associated with chat engines. Remove associations first." | |
| ) | |
| # Delete bot | |
| db.delete(db_bot) | |
| db.commit() | |
| return {"message": f"Telegram bot with ID {bot_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting Telegram bot: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting Telegram bot: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting Telegram bot: {str(e)}") | |
| async def get_bot_engines_info( | |
| bot_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all chat engines associated with a Telegram bot. | |
| """ | |
| try: | |
| # Verify bot exists | |
| bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
| if not bot: | |
| raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
| # Get associated engines through BotEngine | |
| bot_engines = db.query(BotEngine).filter(BotEngine.bot_id == bot_id).all() | |
| result = [] | |
| for association in bot_engines: | |
| engine = db.query(ChatEngine).filter(ChatEngine.id == association.engine_id).first() | |
| if engine: | |
| result.append({ | |
| "association_id": association.id, | |
| "engine_id": engine.id, | |
| "engine_name": engine.name, | |
| "answer_model": engine.answer_model, | |
| "status": engine.status, | |
| "created_at": association.created_at | |
| }) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving bot engines: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving bot engines: {str(e)}") | |
| # --- Chat Engine models and endpoints --- | |
| class ChatEngineBase(BaseModel): | |
| name: str | |
| answer_model: str | |
| system_prompt: Optional[str] = None | |
| empty_response: Optional[str] = None | |
| similarity_top_k: int = 3 | |
| vector_distance_threshold: float = 0.75 | |
| grounding_threshold: float = 0.2 | |
| use_public_information: bool = False | |
| status: str = "active" | |
| class ChatEngineCreate(ChatEngineBase): | |
| pass | |
| class ChatEngineUpdate(BaseModel): | |
| name: Optional[str] = None | |
| answer_model: Optional[str] = None | |
| system_prompt: Optional[str] = None | |
| empty_response: Optional[str] = None | |
| similarity_top_k: Optional[int] = None | |
| vector_distance_threshold: Optional[float] = None | |
| grounding_threshold: Optional[float] = None | |
| use_public_information: Optional[bool] = None | |
| status: Optional[str] = None | |
| class ChatEngineResponse(ChatEngineBase): | |
| id: int | |
| created_at: datetime | |
| last_modified: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_chat_engines( | |
| skip: int = 0, | |
| limit: int = 100, | |
| status: Optional[str] = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all chat engines. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **status**: Filter by status (e.g., 'active', 'inactive') | |
| """ | |
| try: | |
| query = db.query(ChatEngine) | |
| if status: | |
| query = query.filter(ChatEngine.status == status) | |
| engines = query.offset(skip).limit(limit).all() | |
| return [ChatEngineResponse.model_validate(engine, from_attributes=True) for engine in engines] | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error retrieving chat engines: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving chat engines: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving chat engines: {str(e)}") | |
| async def create_chat_engine( | |
| engine: ChatEngineCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new chat engine. | |
| """ | |
| try: | |
| # Create chat engine | |
| db_engine = ChatEngine(**engine.model_dump()) | |
| db.add(db_engine) | |
| db.commit() | |
| db.refresh(db_engine) | |
| return ChatEngineResponse.model_validate(db_engine, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error creating chat engine: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error creating chat engine: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error creating chat engine: {str(e)}") | |
| async def get_chat_engine( | |
| engine_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get chat engine by ID. | |
| """ | |
| try: | |
| engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
| if not engine: | |
| raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
| return ChatEngineResponse.model_validate(engine, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving chat engine: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving chat engine: {str(e)}") | |
| async def update_chat_engine( | |
| engine_id: int = Path(..., gt=0), | |
| engine_update: ChatEngineUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update chat engine details. | |
| """ | |
| try: | |
| db_engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
| if not db_engine: | |
| raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
| # Update fields if provided | |
| update_data = engine_update.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| if value is not None: | |
| setattr(db_engine, key, value) | |
| # Update last_modified timestamp | |
| db_engine.last_modified = datetime.utcnow() | |
| db.commit() | |
| db.refresh(db_engine) | |
| return ChatEngineResponse.model_validate(db_engine, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error updating chat engine: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error updating chat engine: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error updating chat engine: {str(e)}") | |
| async def delete_chat_engine( | |
| engine_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete chat engine. | |
| """ | |
| try: | |
| db_engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
| if not db_engine: | |
| raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
| # Check if engine has associated bots or vector databases | |
| bot_engine_count = db.query(func.count(BotEngine.id)).filter(BotEngine.engine_id == engine_id).scalar() | |
| vector_db_count = db.query(func.count(EngineVectorDb.id)).filter(EngineVectorDb.engine_id == engine_id).scalar() | |
| if bot_engine_count > 0 or vector_db_count > 0: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Cannot delete chat engine as it has associated bots or vector databases. Remove associations first." | |
| ) | |
| # Delete engine | |
| db.delete(db_engine) | |
| db.commit() | |
| return {"message": f"Chat engine with ID {engine_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting chat engine: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting chat engine: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting chat engine: {str(e)}") | |
| async def get_engine_vector_databases( | |
| engine_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all vector databases associated with a chat engine. | |
| """ | |
| try: | |
| # Verify engine exists | |
| engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
| if not engine: | |
| raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
| # Get associated vector databases through EngineVectorDb | |
| engine_vector_dbs = db.query(EngineVectorDb).filter(EngineVectorDb.engine_id == engine_id).all() | |
| result = [] | |
| for association in engine_vector_dbs: | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == association.vector_database_id).first() | |
| if vector_db: | |
| result.append({ | |
| "association_id": association.id, | |
| "vector_database_id": vector_db.id, | |
| "name": vector_db.name, | |
| "pinecone_index": vector_db.pinecone_index, | |
| "priority": association.priority, | |
| "status": vector_db.status | |
| }) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving engine vector databases: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving engine vector databases: {str(e)}") | |
| # --- Bot Engine Association models and endpoints --- | |
| class BotEngineCreate(BaseModel): | |
| bot_id: int | |
| engine_id: int | |
| class BotEngineResponse(BotEngineCreate): | |
| id: int | |
| created_at: datetime | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_bot_engines( | |
| skip: int = 0, | |
| limit: int = 100, | |
| bot_id: Optional[int] = None, | |
| engine_id: Optional[int] = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all bot-engine associations. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **bot_id**: Filter by bot ID | |
| - **engine_id**: Filter by engine ID | |
| """ | |
| try: | |
| query = db.query(BotEngine) | |
| if bot_id is not None: | |
| query = query.filter(BotEngine.bot_id == bot_id) | |
| if engine_id is not None: | |
| query = query.filter(BotEngine.engine_id == engine_id) | |
| bot_engines = query.offset(skip).limit(limit).all() | |
| return [BotEngineResponse.model_validate(association, from_attributes=True) for association in bot_engines] | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error retrieving bot-engine associations: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving bot-engine associations: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving bot-engine associations: {str(e)}") | |
| async def create_bot_engine( | |
| bot_engine: BotEngineCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new bot-engine association. | |
| """ | |
| try: | |
| # Check if bot exists | |
| bot = db.query(TelegramBot).filter(TelegramBot.id == bot_engine.bot_id).first() | |
| if not bot: | |
| raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_engine.bot_id} not found") | |
| # Check if engine exists | |
| engine = db.query(ChatEngine).filter(ChatEngine.id == bot_engine.engine_id).first() | |
| if not engine: | |
| raise HTTPException(status_code=404, detail=f"Chat engine with ID {bot_engine.engine_id} not found") | |
| # Check if association already exists | |
| existing_association = db.query(BotEngine).filter( | |
| BotEngine.bot_id == bot_engine.bot_id, | |
| BotEngine.engine_id == bot_engine.engine_id | |
| ).first() | |
| if existing_association: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Association between bot ID {bot_engine.bot_id} and engine ID {bot_engine.engine_id} already exists" | |
| ) | |
| # Create association | |
| db_bot_engine = BotEngine(**bot_engine.model_dump()) | |
| db.add(db_bot_engine) | |
| db.commit() | |
| db.refresh(db_bot_engine) | |
| return BotEngineResponse.model_validate(db_bot_engine, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error creating bot-engine association: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error creating bot-engine association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error creating bot-engine association: {str(e)}") | |
| async def get_bot_engine( | |
| association_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get bot-engine association by ID. | |
| """ | |
| try: | |
| association = db.query(BotEngine).filter(BotEngine.id == association_id).first() | |
| if not association: | |
| raise HTTPException(status_code=404, detail=f"Bot-engine association with ID {association_id} not found") | |
| return BotEngineResponse.model_validate(association, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving bot-engine association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving bot-engine association: {str(e)}") | |
| async def delete_bot_engine( | |
| association_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete bot-engine association. | |
| """ | |
| try: | |
| association = db.query(BotEngine).filter(BotEngine.id == association_id).first() | |
| if not association: | |
| raise HTTPException(status_code=404, detail=f"Bot-engine association with ID {association_id} not found") | |
| db.delete(association) | |
| db.commit() | |
| return {"message": f"Bot-engine association with ID {association_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting bot-engine association: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting bot-engine association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting bot-engine association: {str(e)}") | |
| # --- Engine-Vector DB Association models and endpoints --- | |
| class EngineVectorDbCreate(BaseModel): | |
| engine_id: int | |
| vector_database_id: int | |
| priority: int = 0 | |
| class EngineVectorDbResponse(EngineVectorDbCreate): | |
| id: int | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_engine_vector_dbs( | |
| skip: int = 0, | |
| limit: int = 100, | |
| engine_id: Optional[int] = None, | |
| vector_database_id: Optional[int] = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all engine-vector-db associations. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **engine_id**: Filter by engine ID | |
| - **vector_database_id**: Filter by vector database ID | |
| """ | |
| try: | |
| query = db.query(EngineVectorDb) | |
| if engine_id is not None: | |
| query = query.filter(EngineVectorDb.engine_id == engine_id) | |
| if vector_database_id is not None: | |
| query = query.filter(EngineVectorDb.vector_database_id == vector_database_id) | |
| associations = query.offset(skip).limit(limit).all() | |
| return [EngineVectorDbResponse.model_validate(association, from_attributes=True) for association in associations] | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error retrieving engine-vector-db associations: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving engine-vector-db associations: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving engine-vector-db associations: {str(e)}") | |
| async def create_engine_vector_db( | |
| engine_vector_db: EngineVectorDbCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new engine-vector-db association. | |
| """ | |
| try: | |
| # Check if engine exists | |
| engine = db.query(ChatEngine).filter(ChatEngine.id == engine_vector_db.engine_id).first() | |
| if not engine: | |
| raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_vector_db.engine_id} not found") | |
| # Check if vector database exists | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == engine_vector_db.vector_database_id).first() | |
| if not vector_db: | |
| raise HTTPException(status_code=404, detail=f"Vector database with ID {engine_vector_db.vector_database_id} not found") | |
| # Check if association already exists | |
| existing_association = db.query(EngineVectorDb).filter( | |
| EngineVectorDb.engine_id == engine_vector_db.engine_id, | |
| EngineVectorDb.vector_database_id == engine_vector_db.vector_database_id | |
| ).first() | |
| if existing_association: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Association between engine ID {engine_vector_db.engine_id} and vector database ID {engine_vector_db.vector_database_id} already exists" | |
| ) | |
| # Create association | |
| db_engine_vector_db = EngineVectorDb(**engine_vector_db.model_dump()) | |
| db.add(db_engine_vector_db) | |
| db.commit() | |
| db.refresh(db_engine_vector_db) | |
| return EngineVectorDbResponse.model_validate(db_engine_vector_db, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error creating engine-vector-db association: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error creating engine-vector-db association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error creating engine-vector-db association: {str(e)}") | |
| async def get_engine_vector_db( | |
| association_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get engine-vector-db association by ID. | |
| """ | |
| try: | |
| association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() | |
| if not association: | |
| raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") | |
| return EngineVectorDbResponse.model_validate(association, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error retrieving engine-vector-db association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving engine-vector-db association: {str(e)}") | |
| async def update_engine_vector_db( | |
| association_id: int = Path(..., gt=0), | |
| update_data: dict = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update engine-vector-db association details (only priority can be updated). | |
| """ | |
| try: | |
| association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() | |
| if not association: | |
| raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") | |
| # Only priority can be updated | |
| if "priority" in update_data: | |
| association.priority = update_data["priority"] | |
| db.commit() | |
| db.refresh(association) | |
| return EngineVectorDbResponse.model_validate(association, from_attributes=True) | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error updating engine-vector-db association: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error updating engine-vector-db association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error updating engine-vector-db association: {str(e)}") | |
| async def delete_engine_vector_db( | |
| association_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete engine-vector-db association. | |
| """ | |
| try: | |
| association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() | |
| if not association: | |
| raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") | |
| db.delete(association) | |
| db.commit() | |
| return {"message": f"Engine-vector-db association with ID {association_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting engine-vector-db association: {e}") | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting engine-vector-db association: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting engine-vector-db association: {str(e)}") | |
| # --- VectorStatus models and endpoints --- | |
| class VectorStatusBase(BaseModel): | |
| document_id: int | |
| vector_database_id: int | |
| vector_id: Optional[str] = None | |
| status: str = "pending" | |
| error_message: Optional[str] = None | |
| class VectorStatusCreate(VectorStatusBase): | |
| pass | |
| class VectorStatusResponse(VectorStatusBase): | |
| id: int | |
| embedded_at: Optional[datetime] = None | |
| model_config = ConfigDict(from_attributes=True) | |
| async def get_vector_statuses( | |
| skip: int = 0, | |
| limit: int = 100, | |
| status: Optional[str] = None, | |
| document_id: Optional[int] = None, | |
| vector_database_id: Optional[int] = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all vector statuses with optional filtering. | |
| - **skip**: Number of items to skip | |
| - **limit**: Maximum number of items to return | |
| - **status**: Filter by status (e.g., 'pending', 'completed', 'error') | |
| - **document_id**: Filter by document ID | |
| - **vector_database_id**: Filter by vector database ID | |
| """ | |
| try: | |
| query = db.query(VectorStatus) | |
| # Apply filters if provided | |
| if status is not None: | |
| query = query.filter(VectorStatus.status == status) | |
| if document_id is not None: | |
| query = query.filter(VectorStatus.document_id == document_id) | |
| if vector_database_id is not None: | |
| query = query.filter(VectorStatus.vector_database_id == vector_database_id) | |
| # Execute query with pagination | |
| vector_statuses = query.offset(skip).limit(limit).all() | |
| # Convert to Pydantic models | |
| return [VectorStatusResponse.model_validate(status, from_attributes=True) for status in vector_statuses] | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_vector_statuses: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error retrieving vector statuses: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error retrieving vector statuses: {str(e)}") | |
| async def create_emergency_contact( | |
| emergency: EmergencyCreate, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Create a new emergency contact. | |
| - **name**: Contact name | |
| - **phone_number**: Phone number | |
| - **description**: Description (optional) | |
| - **address**: Address (optional) | |
| - **location**: Location coordinates (optional) | |
| - **priority**: Priority order (default: 0) | |
| - **is_active**: Whether the contact is active (default: True) | |
| """ | |
| try: | |
| db_emergency = EmergencyItem(**emergency.model_dump()) | |
| db.add(db_emergency) | |
| db.commit() | |
| db.refresh(db_emergency) | |
| # Invalidate emergency cache after creating a new item | |
| emergencies_cache.clear() | |
| # Convert SQLAlchemy model to Pydantic model before returning | |
| result = EmergencyResponse.model_validate(db_emergency, from_attributes=True) | |
| return result | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in create_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def get_emergency_contact( | |
| emergency_id: int = Path(..., gt=0), | |
| use_cache: bool = True, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get a single emergency contact by ID. | |
| - **emergency_id**: ID of the emergency contact | |
| - **use_cache**: If true, use cached results when available | |
| """ | |
| try: | |
| # Generate cache key | |
| cache_key = f"emergency_{emergency_id}" | |
| # Try to get from cache if caching is enabled | |
| if use_cache: | |
| cached_result = emergencies_cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Use direct SQL query for better performance on single item lookup | |
| stmt = text("SELECT * FROM emergency_item WHERE id = :id") | |
| result = db.execute(stmt, {"id": emergency_id}).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Emergency contact not found") | |
| # Create an EmergencyItem model instance manually | |
| emergency = EmergencyItem() | |
| for key, value in result._mapping.items(): | |
| if hasattr(emergency, key): | |
| setattr(emergency, key, value) | |
| # Convert to Pydantic model | |
| response = EmergencyResponse.model_validate(emergency, from_attributes=True) | |
| # Store in cache if caching is enabled | |
| if use_cache: | |
| emergencies_cache[cache_key] = response | |
| return response | |
| except SQLAlchemyError as e: | |
| logger.error(f"Database error in get_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def update_emergency_contact( | |
| emergency_id: int = Path(..., gt=0), | |
| emergency_update: EmergencyUpdate = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update a specific emergency contact. | |
| - **emergency_id**: ID of the emergency contact to update | |
| - **name**: New name (optional) | |
| - **phone_number**: New phone number (optional) | |
| - **description**: New description (optional) | |
| - **address**: New address (optional) | |
| - **location**: New location coordinates (optional) | |
| - **priority**: New priority order (optional) | |
| - **is_active**: New active status (optional) | |
| """ | |
| try: | |
| emergency = db.query(EmergencyItem).filter(EmergencyItem.id == emergency_id).first() | |
| if not emergency: | |
| raise HTTPException(status_code=404, detail="Emergency contact not found") | |
| # Update fields if provided | |
| update_data = emergency_update.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| setattr(emergency, key, value) | |
| db.commit() | |
| db.refresh(emergency) | |
| # Invalidate specific cache entries | |
| emergencies_cache.delete(f"emergency_{emergency_id}") | |
| emergencies_cache.clear() # Clear all list caches | |
| # Convert to Pydantic model | |
| return EmergencyResponse.model_validate(emergency, from_attributes=True) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in update_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def delete_emergency_contact( | |
| emergency_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete a specific emergency contact. | |
| - **emergency_id**: ID of the emergency contact to delete | |
| """ | |
| try: | |
| # Use optimized direct SQL with RETURNING for better performance | |
| result = db.execute( | |
| text("DELETE FROM emergency_item WHERE id = :id RETURNING id"), | |
| {"id": emergency_id} | |
| ).fetchone() | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Emergency contact not found") | |
| db.commit() | |
| # Invalidate cache entries | |
| emergencies_cache.delete(f"emergency_{emergency_id}") | |
| emergencies_cache.clear() # Clear all list caches | |
| return {"status": "success", "message": f"Emergency contact {emergency_id} deleted"} | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in delete_emergency_contact: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_update_emergency_status( | |
| emergency_ids: List[int] = Body(..., embed=True), | |
| is_active: bool = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the active status of multiple emergency contacts at once. | |
| This is much more efficient than updating emergency contacts one at a time. | |
| """ | |
| try: | |
| if not emergency_ids: | |
| raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
| # Prepare the update statement | |
| stmt = text(""" | |
| UPDATE emergency_item | |
| SET is_active = :is_active, updated_at = NOW() | |
| WHERE id = ANY(:emergency_ids) | |
| RETURNING id | |
| """) | |
| # Execute the update in a single query | |
| result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) | |
| updated_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in emergency_ids if id not in updated_ids] | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(updated_ids), | |
| failed_ids=failed_ids, | |
| message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_update_emergency_status: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def batch_delete_emergency_contacts( | |
| emergency_ids: List[int] = Body(..., embed=True), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete multiple emergency contacts at once. | |
| This is much more efficient than deleting emergency contacts one at a time with separate API calls. | |
| """ | |
| try: | |
| if not emergency_ids: | |
| raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
| # Prepare and execute the delete statement with RETURNING to get deleted IDs | |
| stmt = text(""" | |
| DELETE FROM emergency_item | |
| WHERE id = ANY(:emergency_ids) | |
| RETURNING id | |
| """) | |
| result = db.execute(stmt, {"emergency_ids": emergency_ids}) | |
| deleted_ids = [row[0] for row in result] | |
| # Commit the transaction | |
| db.commit() | |
| # Determine which IDs weren't found | |
| failed_ids = [id for id in emergency_ids if id not in deleted_ids] | |
| # Invalidate emergency cache | |
| emergencies_cache.clear() | |
| return BatchUpdateResult( | |
| success_count=len(deleted_ids), | |
| failed_ids=failed_ids, | |
| message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" | |
| ) | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error in batch_delete_emergency_contacts: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| async def upload_document( | |
| name: str = Form(...), | |
| vector_database_id: int = Form(...), | |
| file: UploadFile = File(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Upload a new document and associate it with a vector database. | |
| - **name**: Document name | |
| - **vector_database_id**: ID of the vector database to associate with | |
| - **file**: The file to upload | |
| """ | |
| try: | |
| # Check if vector database exists | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_database_id).first() | |
| if not vector_db: | |
| raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_database_id} not found") | |
| # Read file content | |
| file_content = await file.read() | |
| file_size = len(file_content) | |
| # Determine file type from extension | |
| filename = file.filename | |
| file_extension = pathlib_Path(filename).suffix.lower()[1:] if filename else "" | |
| # Create document record | |
| document = Document( | |
| name=name, | |
| vector_database_id=vector_database_id, | |
| file_type=file_extension, | |
| content_type=file.content_type, | |
| size=file_size, | |
| is_embedded=False | |
| ) | |
| db.add(document) | |
| db.flush() # Get ID without committing | |
| # Create document content record | |
| document_content = DocumentContent( | |
| document_id=document.id, | |
| file_content=file_content | |
| ) | |
| db.add(document_content) | |
| db.commit() | |
| db.refresh(document) | |
| # Create vector status record for tracking embedding | |
| vector_status = VectorStatus( | |
| document_id=document.id, | |
| vector_database_id=vector_database_id, | |
| status="pending" | |
| ) | |
| db.add(vector_status) | |
| db.commit() | |
| # Get vector database name for response | |
| vector_db_name = vector_db.name if vector_db else f"db_{vector_database_id}" | |
| # Create response | |
| result = DocumentResponse( | |
| id=document.id, | |
| name=document.name, | |
| file_type=document.file_type, | |
| content_type=document.content_type, | |
| size=document.size, | |
| created_at=document.created_at, | |
| updated_at=document.updated_at, | |
| vector_database_id=document.vector_database_id, | |
| vector_database_name=vector_db_name, | |
| is_embedded=document.is_embedded | |
| ) | |
| return result | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error uploading document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error uploading document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error uploading document: {str(e)}") | |
| async def update_document( | |
| document_id: int, | |
| name: Optional[str] = Form(None), | |
| file: Optional[UploadFile] = File(None), | |
| background_tasks: BackgroundTasks = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update an existing document. Can update name, file content, or both. | |
| - **document_id**: ID of the document to update | |
| - **name**: New document name (optional) | |
| - **file**: New file content (optional) | |
| """ | |
| try: | |
| # Validate document_id | |
| if document_id <= 0: | |
| raise HTTPException(status_code=400, detail="document_id must be greater than 0") | |
| # Check if document exists | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if not document: | |
| raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
| # Get vector database information for later use | |
| vector_db = None | |
| if document.vector_database_id: | |
| vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == document.vector_database_id).first() | |
| # Update name if provided | |
| if name: | |
| document.name = name | |
| # Update file if provided | |
| if file: | |
| # Read new file content | |
| file_content = await file.read() | |
| file_size = len(file_content) | |
| # Determine file type from extension | |
| filename = file.filename | |
| file_extension = pathlib_Path(filename).suffix.lower()[1:] if filename else "" | |
| # Update document record | |
| document.file_type = file_extension | |
| document.content_type = file.content_type | |
| document.size = file_size | |
| document.is_embedded = False # Reset embedding status | |
| document.updated_at = datetime.now() | |
| # Update document content | |
| document_content = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).first() | |
| if document_content: | |
| document_content.file_content = file_content | |
| else: | |
| # Create new document content if it doesn't exist | |
| document_content = DocumentContent( | |
| document_id=document_id, | |
| file_content=file_content | |
| ) | |
| db.add(document_content) | |
| # Get vector status for Pinecone cleanup | |
| vector_status = db.query(VectorStatus).filter(VectorStatus.document_id == document_id).first() | |
| # Store old vector_id for cleanup | |
| old_vector_id = None | |
| if vector_status and vector_status.vector_id: | |
| old_vector_id = vector_status.vector_id | |
| # Update vector status to pending | |
| if vector_status: | |
| vector_status.status = "pending" | |
| vector_status.vector_id = None | |
| vector_status.embedded_at = None | |
| vector_status.error_message = None | |
| else: | |
| # Create new vector status if it doesn't exist | |
| vector_status = VectorStatus( | |
| document_id=document_id, | |
| vector_database_id=document.vector_database_id, | |
| status="pending" | |
| ) | |
| db.add(vector_status) | |
| # Schedule deletion of old vectors in Pinecone if we have all needed info | |
| if old_vector_id and vector_db and document.vector_database_id and background_tasks: | |
| try: | |
| # Initialize PDFProcessor for vector deletion | |
| from app.pdf.processor import PDFProcessor | |
| processor = PDFProcessor( | |
| index_name=vector_db.pinecone_index, | |
| namespace=f"vdb-{document.vector_database_id}", | |
| vector_db_id=document.vector_database_id | |
| ) | |
| # Add deletion task to background tasks | |
| background_tasks.add_task( | |
| processor.delete_document_vectors, | |
| old_vector_id | |
| ) | |
| logger.info(f"Scheduled deletion of old vectors for document {document_id}") | |
| except Exception as e: | |
| logger.error(f"Error scheduling vector deletion: {str(e)}") | |
| # Continue with the update even if vector deletion scheduling fails | |
| # Schedule document for re-embedding if possible | |
| if background_tasks and document.vector_database_id: | |
| try: | |
| # Import here to avoid circular imports | |
| from app.pdf.tasks import process_document_for_embedding | |
| # Schedule embedding | |
| background_tasks.add_task( | |
| process_document_for_embedding, | |
| document_id=document_id, | |
| vector_db_id=document.vector_database_id | |
| ) | |
| logger.info(f"Scheduled re-embedding for document {document_id}") | |
| except Exception as e: | |
| logger.error(f"Error scheduling document embedding: {str(e)}") | |
| # Continue with the update even if embedding scheduling fails | |
| db.commit() | |
| db.refresh(document) | |
| # Get vector database name for response | |
| vector_db_name = "No Database" | |
| if vector_db: | |
| vector_db_name = vector_db.name | |
| elif document.vector_database_id: | |
| vector_db_name = f"db_{document.vector_database_id}" | |
| # Create response | |
| result = DocumentResponse( | |
| id=document.id, | |
| name=document.name, | |
| file_type=document.file_type, | |
| content_type=document.content_type, | |
| size=document.size, | |
| created_at=document.created_at, | |
| updated_at=document.updated_at, | |
| vector_database_id=document.vector_database_id or 0, | |
| vector_database_name=vector_db_name, | |
| is_embedded=document.is_embedded | |
| ) | |
| return result | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error updating document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error updating document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error updating document: {str(e)}") | |
| async def delete_document( | |
| document_id: int = Path(..., gt=0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete a document and its associated content. | |
| - **document_id**: ID of the document to delete | |
| """ | |
| try: | |
| # Check if document exists | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if not document: | |
| raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
| # Delete vector status | |
| db.query(VectorStatus).filter(VectorStatus.document_id == document_id).delete() | |
| # Delete document content | |
| db.query(DocumentContent).filter(DocumentContent.document_id == document_id).delete() | |
| # Delete document | |
| db.delete(document) | |
| db.commit() | |
| return {"status": "success", "message": f"Document with ID {document_id} deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| logger.error(f"Database error deleting document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error deleting document: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Error deleting document: {str(e)}") |