Spaces:
Sleeping
Sleeping
| """ | |
| Prisma Client Integration for NAVADA 2.0 | |
| Provides enhanced database operations with Prisma ORM | |
| """ | |
| import asyncio | |
| import json | |
| import base64 | |
| import logging | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime | |
| import numpy as np | |
| import cv2 | |
| logger = logging.getLogger(__name__) | |
| class PrismaManager: | |
| """Enhanced database manager using Prisma ORM""" | |
| def __init__(self): | |
| self.client = None | |
| self._init_client() | |
| def _init_client(self): | |
| """Initialize Prisma client""" | |
| try: | |
| # Import Prisma client (needs to be generated first) | |
| # from prisma import Prisma | |
| # self.client = Prisma() | |
| logger.info("Prisma client initialized") | |
| except ImportError: | |
| logger.warning("Prisma client not available - run 'npm run prisma:generate'") | |
| self.client = None | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Prisma client: {e}") | |
| self.client = None | |
| async def connect(self): | |
| """Connect to database""" | |
| if self.client: | |
| try: | |
| await self.client.connect() | |
| logger.info("Connected to database via Prisma") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to connect to database: {e}") | |
| return False | |
| return False | |
| async def disconnect(self): | |
| """Disconnect from database""" | |
| if self.client: | |
| try: | |
| await self.client.disconnect() | |
| logger.info("Disconnected from database") | |
| except Exception as e: | |
| logger.error(f"Error disconnecting: {e}") | |
| # Document Management for Knowledge Retrieval | |
| async def add_document(self, title: str, content: str, content_type: str = "text", | |
| tags: List[str] = None, category: str = None, | |
| image_data: bytes = None, image_url: str = None) -> Optional[int]: | |
| """ | |
| Add document for knowledge retrieval | |
| Args: | |
| title: Document title | |
| content: Document content (text) | |
| content_type: "text", "image", "mixed" | |
| tags: List of tags | |
| category: Document category | |
| image_data: Binary image data | |
| image_url: URL to image | |
| Returns: | |
| Document ID if successful | |
| """ | |
| if not self.client: | |
| return None | |
| try: | |
| tags_str = json.dumps(tags) if tags else None | |
| document = await self.client.document.create( | |
| data={ | |
| 'title': title, | |
| 'content': content, | |
| 'contentType': content_type, | |
| 'tags': tags_str, | |
| 'category': category, | |
| 'imageData': image_data, | |
| 'imageUrl': image_url | |
| } | |
| ) | |
| # Create document chunks for better retrieval | |
| await self._create_document_chunks(document.id, content) | |
| logger.info(f"Added document: {title} (ID: {document.id})") | |
| return document.id | |
| except Exception as e: | |
| logger.error(f"Failed to add document: {e}") | |
| return None | |
| async def _create_document_chunks(self, document_id: int, content: str, chunk_size: int = 500): | |
| """Create chunks from document content for better retrieval""" | |
| if not self.client: | |
| return | |
| try: | |
| # Split content into chunks | |
| chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] | |
| for i, chunk in enumerate(chunks): | |
| await self.client.documentchunk.create( | |
| data={ | |
| 'documentId': document_id, | |
| 'chunkIndex': i, | |
| 'content': chunk | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to create document chunks: {e}") | |
| async def search_documents(self, query: str, content_type: str = None, | |
| category: str = None, limit: int = 10) -> List[Dict]: | |
| """ | |
| Search documents by content, tags, or category | |
| Args: | |
| query: Search query | |
| content_type: Filter by content type | |
| category: Filter by category | |
| limit: Maximum results | |
| Returns: | |
| List of matching documents | |
| """ | |
| if not self.client: | |
| return [] | |
| try: | |
| where_clause = { | |
| 'isActive': True, | |
| 'OR': [ | |
| {'title': {'contains': query}}, | |
| {'content': {'contains': query}}, | |
| {'tags': {'contains': query}} | |
| ] | |
| } | |
| if content_type: | |
| where_clause['contentType'] = content_type | |
| if category: | |
| where_clause['category'] = category | |
| documents = await self.client.document.find_many( | |
| where=where_clause, | |
| take=limit, | |
| order_by={'createdAt': 'desc'} | |
| ) | |
| return [self._document_to_dict(doc) for doc in documents] | |
| except Exception as e: | |
| logger.error(f"Document search failed: {e}") | |
| return [] | |
| def _document_to_dict(self, document) -> Dict: | |
| """Convert Prisma document to dictionary""" | |
| return { | |
| 'id': document.id, | |
| 'title': document.title, | |
| 'content': document.content, | |
| 'content_type': document.contentType, | |
| 'tags': json.loads(document.tags) if document.tags else [], | |
| 'category': document.category, | |
| 'image_url': document.imageUrl, | |
| 'created_at': document.createdAt, | |
| 'updated_at': document.updatedAt | |
| } | |
| # Media File Management | |
| async def add_media_file(self, filename: str, filepath: str, mime_type: str, | |
| file_size: int, image_data: bytes = None, | |
| description: str = None, tags: List[str] = None) -> Optional[int]: | |
| """Add media file to database""" | |
| if not self.client: | |
| return None | |
| try: | |
| tags_str = json.dumps(tags) if tags else None | |
| media_file = await self.client.mediafile.create( | |
| data={ | |
| 'filename': filename, | |
| 'filepath': filepath, | |
| 'mimeType': mime_type, | |
| 'fileSize': file_size, | |
| 'imageData': image_data, | |
| 'description': description, | |
| 'tags': tags_str | |
| } | |
| ) | |
| logger.info(f"Added media file: {filename} (ID: {media_file.id})") | |
| return media_file.id | |
| except Exception as e: | |
| logger.error(f"Failed to add media file: {e}") | |
| return None | |
| async def get_media_files(self, tags: List[str] = None, mime_type: str = None, | |
| limit: int = 50) -> List[Dict]: | |
| """Get media files with optional filtering""" | |
| if not self.client: | |
| return [] | |
| try: | |
| where_clause = {'isActive': True} | |
| if mime_type: | |
| where_clause['mimeType'] = {'contains': mime_type} | |
| if tags: | |
| # Search for any of the provided tags | |
| tag_conditions = [{'tags': {'contains': tag}} for tag in tags] | |
| where_clause['OR'] = tag_conditions | |
| media_files = await self.client.mediafile.find_many( | |
| where=where_clause, | |
| take=limit, | |
| order_by={'createdAt': 'desc'} | |
| ) | |
| return [self._media_file_to_dict(file) for file in media_files] | |
| except Exception as e: | |
| logger.error(f"Failed to get media files: {e}") | |
| return [] | |
| def _media_file_to_dict(self, media_file) -> Dict: | |
| """Convert Prisma media file to dictionary""" | |
| return { | |
| 'id': media_file.id, | |
| 'filename': media_file.filename, | |
| 'filepath': media_file.filepath, | |
| 'mime_type': media_file.mimeType, | |
| 'file_size': media_file.fileSize, | |
| 'description': media_file.description, | |
| 'tags': json.loads(media_file.tags) if media_file.tags else [], | |
| 'created_at': media_file.createdAt | |
| } | |
| # Enhanced Knowledge Base Operations | |
| async def add_knowledge_entry(self, entity_type: str, entity_id: int, content: str, | |
| title: str = None, description: str = None, | |
| tags: List[str] = None, category: str = None, | |
| image_url: str = None, text_content: str = None) -> Optional[int]: | |
| """Add enhanced knowledge base entry""" | |
| if not self.client: | |
| return None | |
| try: | |
| keywords_str = json.dumps(tags) if tags else None | |
| knowledge_entry = await self.client.knowledgebase.create( | |
| data={ | |
| 'entityType': entity_type, | |
| 'entityId': entity_id, | |
| 'content': content, | |
| 'title': title, | |
| 'description': description, | |
| 'keywords': keywords_str, | |
| 'category': category, | |
| 'imageUrl': image_url, | |
| 'textContent': text_content | |
| } | |
| ) | |
| logger.info(f"Added knowledge entry: {title or content[:50]}") | |
| return knowledge_entry.id | |
| except Exception as e: | |
| logger.error(f"Failed to add knowledge entry: {e}") | |
| return None | |
| async def search_knowledge(self, query: str, entity_type: str = None, | |
| category: str = None, limit: int = 10) -> List[Dict]: | |
| """Enhanced knowledge search""" | |
| if not self.client: | |
| return [] | |
| try: | |
| where_clause = { | |
| 'OR': [ | |
| {'content': {'contains': query}}, | |
| {'title': {'contains': query}}, | |
| {'description': {'contains': query}}, | |
| {'keywords': {'contains': query}}, | |
| {'textContent': {'contains': query}} | |
| ] | |
| } | |
| if entity_type: | |
| where_clause['entityType'] = entity_type | |
| if category: | |
| where_clause['category'] = category | |
| entries = await self.client.knowledgebase.find_many( | |
| where=where_clause, | |
| take=limit, | |
| order_by={'createdAt': 'desc'} | |
| ) | |
| return [self._knowledge_to_dict(entry) for entry in entries] | |
| except Exception as e: | |
| logger.error(f"Knowledge search failed: {e}") | |
| return [] | |
| def _knowledge_to_dict(self, entry) -> Dict: | |
| """Convert Prisma knowledge entry to dictionary""" | |
| return { | |
| 'id': entry.id, | |
| 'entity_type': entry.entityType, | |
| 'entity_id': entry.entityId, | |
| 'content': entry.content, | |
| 'title': entry.title, | |
| 'description': entry.description, | |
| 'keywords': json.loads(entry.keywords) if entry.keywords else [], | |
| 'category': entry.category, | |
| 'image_url': entry.imageUrl, | |
| 'text_content': entry.textContent, | |
| 'created_at': entry.createdAt, | |
| 'updated_at': entry.updatedAt | |
| } | |
| # Statistics and Analytics | |
| async def get_enhanced_stats(self) -> Dict: | |
| """Get comprehensive database statistics""" | |
| if not self.client: | |
| return {} | |
| try: | |
| stats = {} | |
| # Basic counts | |
| stats['faces'] = await self.client.face.count(where={'isActive': True}) | |
| stats['objects'] = await self.client.object.count(where={'isActive': True}) | |
| stats['documents'] = await self.client.document.count(where={'isActive': True}) | |
| stats['media_files'] = await self.client.mediafile.count(where={'isActive': True}) | |
| stats['knowledge_entries'] = await self.client.knowledgebase.count() | |
| stats['training_corrections'] = await self.client.trainingcorrection.count() | |
| # Recent activity (last 7 days) | |
| seven_days_ago = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
| stats['recent_detections'] = await self.client.detectionhistory.count( | |
| where={'createdAt': {'gte': seven_days_ago}} | |
| ) | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Failed to get enhanced stats: {e}") | |
| return {} | |
| # Global Prisma manager instance | |
| prisma_manager = PrismaManager() | |
| # Helper functions for async operations in Streamlit | |
| def run_async(coro): | |
| """Run async function in Streamlit""" | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| return loop.run_until_complete(coro) | |
| # Convenience functions | |
| def add_document_sync(title: str, content: str, **kwargs) -> Optional[int]: | |
| """Synchronous wrapper for adding documents""" | |
| return run_async(prisma_manager.add_document(title, content, **kwargs)) | |
| def search_documents_sync(query: str, **kwargs) -> List[Dict]: | |
| """Synchronous wrapper for searching documents""" | |
| return run_async(prisma_manager.search_documents(query, **kwargs)) | |
| def add_media_file_sync(filename: str, filepath: str, mime_type: str, | |
| file_size: int, **kwargs) -> Optional[int]: | |
| """Synchronous wrapper for adding media files""" | |
| return run_async(prisma_manager.add_media_file(filename, filepath, mime_type, file_size, **kwargs)) | |
| def get_enhanced_stats_sync() -> Dict: | |
| """Synchronous wrapper for getting stats""" | |
| return run_async(prisma_manager.get_enhanced_stats()) |