Spaces:
Paused
Paused
| import json | |
| from typing import List, Optional | |
| from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form, BackgroundTasks | |
| from pydantic import BaseModel, Field, HttpUrl | |
| from utils.auth_utils import get_current_user_id_from_jwt, verify_agent_access | |
| from services.supabase import DBConnection | |
| from knowledge_base.file_processor import FileProcessor | |
| from utils.logger import logger | |
| from flags.flags import is_enabled | |
| router = APIRouter(prefix="/knowledge-base", tags=["knowledge-base"]) | |
| class KnowledgeBaseEntry(BaseModel): | |
| entry_id: Optional[str] = None | |
| name: str = Field(..., min_length=1, max_length=255) | |
| description: Optional[str] = None | |
| content: str = Field(..., min_length=1) | |
| usage_context: str = Field(default="always", pattern="^(always|on_request|contextual)$") | |
| is_active: bool = True | |
| class KnowledgeBaseEntryResponse(BaseModel): | |
| entry_id: str | |
| name: str | |
| description: Optional[str] | |
| content: str | |
| usage_context: str | |
| is_active: bool | |
| content_tokens: Optional[int] | |
| created_at: str | |
| updated_at: str | |
| source_type: Optional[str] = None | |
| source_metadata: Optional[dict] = None | |
| file_size: Optional[int] = None | |
| file_mime_type: Optional[str] = None | |
| class KnowledgeBaseListResponse(BaseModel): | |
| entries: List[KnowledgeBaseEntryResponse] | |
| total_count: int | |
| total_tokens: int | |
| class CreateKnowledgeBaseEntryRequest(BaseModel): | |
| name: str = Field(..., min_length=1, max_length=255) | |
| description: Optional[str] = None | |
| content: str = Field(..., min_length=1) | |
| usage_context: str = Field(default="always", pattern="^(always|on_request|contextual)$") | |
| class UpdateKnowledgeBaseEntryRequest(BaseModel): | |
| name: Optional[str] = Field(None, min_length=1, max_length=255) | |
| description: Optional[str] = None | |
| content: Optional[str] = Field(None, min_length=1) | |
| usage_context: Optional[str] = Field(None, pattern="^(always|on_request|contextual)$") | |
| is_active: Optional[bool] = None | |
| class ProcessingJobResponse(BaseModel): | |
| job_id: str | |
| job_type: str | |
| status: str | |
| source_info: dict | |
| result_info: dict | |
| entries_created: int | |
| total_files: int | |
| created_at: str | |
| completed_at: Optional[str] | |
| error_message: Optional[str] | |
| db = DBConnection() | |
| async def get_agent_knowledge_base( | |
| agent_id: str, | |
| include_inactive: bool = False, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Get all knowledge base entries for an agent""" | |
| try: | |
| client = await db.client | |
| # Verify agent access | |
| await verify_agent_access(client, agent_id, user_id) | |
| result = await client.rpc('get_agent_knowledge_base', { | |
| 'p_agent_id': agent_id, | |
| 'p_include_inactive': include_inactive | |
| }).execute() | |
| entries = [] | |
| total_tokens = 0 | |
| for entry_data in result.data or []: | |
| entry = KnowledgeBaseEntryResponse( | |
| entry_id=entry_data['entry_id'], | |
| name=entry_data['name'], | |
| description=entry_data['description'], | |
| content=entry_data['content'], | |
| usage_context=entry_data['usage_context'], | |
| is_active=entry_data['is_active'], | |
| content_tokens=entry_data.get('content_tokens'), | |
| created_at=entry_data['created_at'], | |
| updated_at=entry_data.get('updated_at', entry_data['created_at']), | |
| source_type=entry_data.get('source_type'), | |
| source_metadata=entry_data.get('source_metadata'), | |
| file_size=entry_data.get('file_size'), | |
| file_mime_type=entry_data.get('file_mime_type') | |
| ) | |
| entries.append(entry) | |
| total_tokens += entry_data.get('content_tokens', 0) or 0 | |
| return KnowledgeBaseListResponse( | |
| entries=entries, | |
| total_count=len(entries), | |
| total_tokens=total_tokens | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting knowledge base for agent {agent_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve agent knowledge base") | |
| async def create_agent_knowledge_base_entry( | |
| agent_id: str, | |
| entry_data: CreateKnowledgeBaseEntryRequest, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Create a new knowledge base entry for an agent""" | |
| try: | |
| client = await db.client | |
| # Verify agent access and get agent data | |
| agent_data = await verify_agent_access(client, agent_id, user_id) | |
| account_id = agent_data['account_id'] | |
| insert_data = { | |
| 'agent_id': agent_id, | |
| 'account_id': account_id, | |
| 'name': entry_data.name, | |
| 'description': entry_data.description, | |
| 'content': entry_data.content, | |
| 'usage_context': entry_data.usage_context | |
| } | |
| result = await client.table('agent_knowledge_base_entries').insert(insert_data).execute() | |
| if not result.data: | |
| raise HTTPException(status_code=500, detail="Failed to create agent knowledge base entry") | |
| created_entry = result.data[0] | |
| return KnowledgeBaseEntryResponse( | |
| entry_id=created_entry['entry_id'], | |
| name=created_entry['name'], | |
| description=created_entry['description'], | |
| content=created_entry['content'], | |
| usage_context=created_entry['usage_context'], | |
| is_active=created_entry['is_active'], | |
| content_tokens=created_entry.get('content_tokens'), | |
| created_at=created_entry['created_at'], | |
| updated_at=created_entry['updated_at'] | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating knowledge base entry for agent {agent_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to create agent knowledge base entry") | |
| async def upload_file_to_agent_kb( | |
| agent_id: str, | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Upload and process a file for agent knowledge base""" | |
| try: | |
| client = await db.client | |
| # Verify agent access and get agent data | |
| agent_data = await verify_agent_access(client, agent_id, user_id) | |
| account_id = agent_data['account_id'] | |
| file_content = await file.read() | |
| job_id = await client.rpc('create_agent_kb_processing_job', { | |
| 'p_agent_id': agent_id, | |
| 'p_account_id': account_id, | |
| 'p_job_type': 'file_upload', | |
| 'p_source_info': { | |
| 'filename': file.filename, | |
| 'mime_type': file.content_type, | |
| 'file_size': len(file_content) | |
| } | |
| }).execute() | |
| if not job_id.data: | |
| raise HTTPException(status_code=500, detail="Failed to create processing job") | |
| job_id = job_id.data | |
| background_tasks.add_task( | |
| process_file_background, | |
| job_id, | |
| agent_id, | |
| account_id, | |
| file_content, | |
| file.filename, | |
| file.content_type or 'application/octet-stream' | |
| ) | |
| return { | |
| "job_id": job_id, | |
| "message": "File upload started. Processing in background.", | |
| "filename": file.filename | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error uploading file to agent {agent_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to upload file") | |
| async def update_knowledge_base_entry( | |
| entry_id: str, | |
| entry_data: UpdateKnowledgeBaseEntryRequest, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Update an agent knowledge base entry""" | |
| try: | |
| client = await db.client | |
| # Get the entry and verify it exists in agent_knowledge_base_entries table | |
| entry_result = await client.table('agent_knowledge_base_entries').select('*').eq('entry_id', entry_id).execute() | |
| if not entry_result.data: | |
| raise HTTPException(status_code=404, detail="Knowledge base entry not found") | |
| entry = entry_result.data[0] | |
| agent_id = entry['agent_id'] | |
| # Verify agent access | |
| await verify_agent_access(client, agent_id, user_id) | |
| update_data = {} | |
| if entry_data.name is not None: | |
| update_data['name'] = entry_data.name | |
| if entry_data.description is not None: | |
| update_data['description'] = entry_data.description | |
| if entry_data.content is not None: | |
| update_data['content'] = entry_data.content | |
| if entry_data.usage_context is not None: | |
| update_data['usage_context'] = entry_data.usage_context | |
| if entry_data.is_active is not None: | |
| update_data['is_active'] = entry_data.is_active | |
| if not update_data: | |
| raise HTTPException(status_code=400, detail="No fields to update") | |
| result = await client.table('agent_knowledge_base_entries').update(update_data).eq('entry_id', entry_id).execute() | |
| if not result.data: | |
| raise HTTPException(status_code=500, detail="Failed to update knowledge base entry") | |
| updated_entry = result.data[0] | |
| logger.debug(f"Updated agent knowledge base entry {entry_id} for agent {agent_id}") | |
| return KnowledgeBaseEntryResponse( | |
| entry_id=updated_entry['entry_id'], | |
| name=updated_entry['name'], | |
| description=updated_entry['description'], | |
| content=updated_entry['content'], | |
| usage_context=updated_entry['usage_context'], | |
| is_active=updated_entry['is_active'], | |
| content_tokens=updated_entry.get('content_tokens'), | |
| created_at=updated_entry['created_at'], | |
| updated_at=updated_entry['updated_at'], | |
| source_type=updated_entry.get('source_type'), | |
| source_metadata=updated_entry.get('source_metadata'), | |
| file_size=updated_entry.get('file_size'), | |
| file_mime_type=updated_entry.get('file_mime_type') | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error updating knowledge base entry {entry_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to update knowledge base entry") | |
| async def delete_knowledge_base_entry( | |
| entry_id: str, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Delete an agent knowledge base entry""" | |
| try: | |
| client = await db.client | |
| # Get the entry and verify it exists in agent_knowledge_base_entries table | |
| entry_result = await client.table('agent_knowledge_base_entries').select('entry_id, agent_id').eq('entry_id', entry_id).execute() | |
| if not entry_result.data: | |
| raise HTTPException(status_code=404, detail="Knowledge base entry not found") | |
| entry = entry_result.data[0] | |
| agent_id = entry['agent_id'] | |
| # Verify agent access | |
| await verify_agent_access(client, agent_id, user_id) | |
| result = await client.table('agent_knowledge_base_entries').delete().eq('entry_id', entry_id).execute() | |
| logger.debug(f"Deleted agent knowledge base entry {entry_id} for agent {agent_id}") | |
| return {"message": "Knowledge base entry deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error deleting knowledge base entry {entry_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to delete knowledge base entry") | |
| async def get_knowledge_base_entry( | |
| entry_id: str, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Get a specific agent knowledge base entry""" | |
| try: | |
| client = await db.client | |
| # Get the entry from agent_knowledge_base_entries table only | |
| result = await client.table('agent_knowledge_base_entries').select('*').eq('entry_id', entry_id).execute() | |
| if not result.data: | |
| raise HTTPException(status_code=404, detail="Knowledge base entry not found") | |
| entry = result.data[0] | |
| agent_id = entry['agent_id'] | |
| # Verify agent access | |
| await verify_agent_access(client, agent_id, user_id) | |
| logger.debug(f"Retrieved agent knowledge base entry {entry_id} for agent {agent_id}") | |
| return KnowledgeBaseEntryResponse( | |
| entry_id=entry['entry_id'], | |
| name=entry['name'], | |
| description=entry['description'], | |
| content=entry['content'], | |
| usage_context=entry['usage_context'], | |
| is_active=entry['is_active'], | |
| content_tokens=entry.get('content_tokens'), | |
| created_at=entry['created_at'], | |
| updated_at=entry['updated_at'], | |
| source_type=entry.get('source_type'), | |
| source_metadata=entry.get('source_metadata'), | |
| file_size=entry.get('file_size'), | |
| file_mime_type=entry.get('file_mime_type') | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting knowledge base entry {entry_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve knowledge base entry") | |
| async def get_agent_processing_jobs( | |
| agent_id: str, | |
| limit: int = 10, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Get processing jobs for an agent""" | |
| try: | |
| client = await db.client | |
| # Verify agent access | |
| await verify_agent_access(client, agent_id, user_id) | |
| result = await client.rpc('get_agent_kb_processing_jobs', { | |
| 'p_agent_id': agent_id, | |
| 'p_limit': limit | |
| }).execute() | |
| jobs = [] | |
| for job_data in result.data or []: | |
| job = ProcessingJobResponse( | |
| job_id=job_data['job_id'], | |
| job_type=job_data['job_type'], | |
| status=job_data['status'], | |
| source_info=job_data['source_info'], | |
| result_info=job_data['result_info'], | |
| entries_created=job_data['entries_created'], | |
| total_files=job_data['total_files'], | |
| created_at=job_data['created_at'], | |
| completed_at=job_data.get('completed_at'), | |
| error_message=job_data.get('error_message') | |
| ) | |
| jobs.append(job) | |
| return jobs | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting processing jobs for agent {agent_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to get processing jobs") | |
| async def process_file_background( | |
| job_id: str, | |
| agent_id: str, | |
| account_id: str, | |
| file_content: bytes, | |
| filename: str, | |
| mime_type: str | |
| ): | |
| """Background task to process uploaded files""" | |
| processor = FileProcessor() | |
| client = await processor.db.client | |
| try: | |
| await client.rpc('update_agent_kb_job_status', { | |
| 'p_job_id': job_id, | |
| 'p_status': 'processing' | |
| }).execute() | |
| result = await processor.process_file_upload( | |
| agent_id, account_id, file_content, filename, mime_type | |
| ) | |
| if result['success']: | |
| await client.rpc('update_agent_kb_job_status', { | |
| 'p_job_id': job_id, | |
| 'p_status': 'completed', | |
| 'p_result_info': result, | |
| 'p_entries_created': 1, | |
| 'p_total_files': 1 | |
| }).execute() | |
| else: | |
| await client.rpc('update_agent_kb_job_status', { | |
| 'p_job_id': job_id, | |
| 'p_status': 'failed', | |
| 'p_error_message': result.get('error', 'Unknown error') | |
| }).execute() | |
| except Exception as e: | |
| logger.error(f"Error in background file processing for job {job_id}: {str(e)}") | |
| try: | |
| await client.rpc('update_agent_kb_job_status', { | |
| 'p_job_id': job_id, | |
| 'p_status': 'failed', | |
| 'p_error_message': str(e) | |
| }).execute() | |
| except: | |
| pass | |
| async def get_agent_knowledge_base_context( | |
| agent_id: str, | |
| max_tokens: int = 4000, | |
| user_id: str = Depends(get_current_user_id_from_jwt) | |
| ): | |
| if not await is_enabled("knowledge_base"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail="This feature is not available at the moment." | |
| ) | |
| """Get knowledge base context for agent prompts""" | |
| try: | |
| client = await db.client | |
| # Verify agent access | |
| await verify_agent_access(client, agent_id, user_id) | |
| result = await client.rpc('get_agent_knowledge_base_context', { | |
| 'p_agent_id': agent_id, | |
| 'p_max_tokens': max_tokens | |
| }).execute() | |
| context = result.data if result.data else None | |
| return { | |
| "context": context, | |
| "max_tokens": max_tokens, | |
| "agent_id": agent_id | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting knowledge base context for agent {agent_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve agent knowledge base context") | |