Spaces:
Running
Running
| """ | |
| Database functionality for the agent monitoring system. | |
| This package provides database access and utilities for agent monitoring. | |
| """ | |
| import os | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker, scoped_session | |
| # Get the absolute path to the project root directory | |
| ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| # Database URL - Use in-memory SQLite for HF Spaces to avoid data sharing between users | |
| # This ensures each container restart gets a fresh database | |
| import uuid | |
| session_id = str(uuid.uuid4())[:8] | |
| # For HF Spaces: Use in-memory database to prevent user data sharing | |
| # For local development: Use file database | |
| if os.getenv("SPACE_ID"): # HF Spaces environment | |
| DATABASE_URL = "sqlite:///:memory:" | |
| print(f"π HF Spaces: Using in-memory database (session: {session_id})") | |
| else: | |
| DATABASE_URL = f"sqlite:///{os.path.join(ROOT_DIR, 'datasets/db/agent_monitoring.db')}" | |
| print(f"πΎ Local: Using persistent database") | |
| # Create engine | |
| engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) | |
| # Create session factory | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| # Create a scoped session for thread safety | |
| Session = scoped_session(SessionLocal) | |
| # Base class for models | |
| Base = declarative_base() | |
| # Function to get database session | |
| def get_db(): | |
| db = Session() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| # Import models and utility functions | |
| from backend.database.models import KnowledgeGraph, Entity, Relation, Base | |
| from backend.database.utils import ( | |
| save_knowledge_graph, | |
| update_knowledge_graph_status, | |
| get_knowledge_graph, | |
| get_all_knowledge_graphs, | |
| delete_knowledge_graph | |
| ) | |
| # Function to initialize database | |
| def init_db(): | |
| """Initialize the database by creating all tables.""" | |
| Base.metadata.create_all(bind=engine) | |
| def test_database_connection(): | |
| """Test if database connection is working.""" | |
| try: | |
| session = SessionLocal() | |
| # Try to create tables | |
| Base.metadata.create_all(bind=engine) | |
| # Test a simple query | |
| result = session.execute("SELECT 1").fetchone() | |
| print(f"β Database connection test successful: {result}") | |
| session.close() | |
| return True | |
| except Exception as e: | |
| print(f"β Database connection test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def add_sample_data_for_hf(): | |
| """Add simple sample data for HF Spaces using direct SQL.""" | |
| if not os.getenv("SPACE_ID"): | |
| return # Only run on HF Spaces | |
| print(f"π HF Spaces environment check:") | |
| print(f" β’ SPACE_ID: {os.getenv('SPACE_ID')}") | |
| print(f" β’ Database URL: {DATABASE_URL}") | |
| try: | |
| # Use direct connection to avoid session issues | |
| with engine.connect() as conn: | |
| # Begin transaction for atomic operations | |
| trans = conn.begin() | |
| try: | |
| # Check if data already exists | |
| result = conn.execute("SELECT COUNT(*) FROM traces").fetchone() | |
| existing_traces = result[0] if result else 0 | |
| result = conn.execute("SELECT COUNT(*) FROM knowledge_graphs").fetchone() | |
| existing_kgs = result[0] if result else 0 | |
| print(f" β’ Existing traces: {existing_traces}") | |
| print(f" β’ Existing KGs: {existing_kgs}") | |
| if existing_traces > 0 or existing_kgs > 0: | |
| print("π Sample data already exists, skipping...") | |
| return | |
| print("π― Adding sample data using direct SQL...") | |
| import json | |
| import uuid | |
| import hashlib | |
| from datetime import datetime | |
| # Simple trace content | |
| sample_trace_content = '''[ | |
| {"role": "user", "content": "I need help with my delayed order #12345. This is frustrating!", "timestamp": "2024-08-31T10:00:00Z"}, | |
| {"role": "assistant", "name": "RouterAgent", "content": "Let me route you to our order specialist.", "timestamp": "2024-08-31T10:00:15Z"}, | |
| {"role": "assistant", "name": "OrderAgent", "content": "I found the issue - weather delay. Your package arrives today at 2 PM.", "timestamp": "2024-08-31T10:01:00Z"}, | |
| {"role": "assistant", "name": "CompensationAgent", "content": "I'll authorize a $10 credit for the inconvenience.", "timestamp": "2024-08-31T10:02:00Z", "error": "Payment system unavailable"}, | |
| {"role": "assistant", "name": "SupervisorAgent", "content": "I'll manually flag your account for the credit. Technical team notified.", "timestamp": "2024-08-31T10:03:00Z"}, | |
| {"role": "user", "content": "Thank you for the quick resolution!", "timestamp": "2024-08-31T10:04:00Z", "sentiment": "satisfied"} | |
| ]''' | |
| # Generate IDs | |
| trace_id = str(uuid.uuid4()) | |
| content_hash = hashlib.sha256(sample_trace_content.encode()).hexdigest() | |
| now = datetime.utcnow() | |
| # Insert trace | |
| conn.execute( | |
| """INSERT INTO traces (trace_id, filename, title, description, content, content_hash, | |
| upload_timestamp, update_timestamp, uploader, trace_type, trace_source, | |
| character_count, turn_count, status, tags, trace_metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| (trace_id, "sample_demo.json", "Multi-Agent Customer Service Demo", | |
| "Demo showing agent coordination and error handling", sample_trace_content, content_hash, | |
| now, now, "AgentGraph Demo", "multi_agent", "sample", len(sample_trace_content), 6, | |
| "processed", '["demo", "customer_service", "multi_agent"]', | |
| '{"scenario": "customer_service", "agents": ["RouterAgent", "OrderAgent", "CompensationAgent", "SupervisorAgent"]}') | |
| ) | |
| # Insert knowledge graph | |
| conn.execute( | |
| """INSERT INTO knowledge_graphs (filename, creator, entity_count, relation_count, namespace, | |
| system_name, system_summary, status, trace_id, window_index, window_total, processing_run_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| ("demo_kg.json", "AgentGraph Demo", 5, 4, "demo", "Customer Service System", | |
| "Multi-agent customer service system with error handling and coordination", | |
| "completed", trace_id, 0, 1, "demo_run") | |
| ) | |
| # Get KG ID | |
| kg_result = conn.execute("SELECT id FROM knowledge_graphs WHERE trace_id = ?", (trace_id,)) | |
| kg_id = kg_result.fetchone()[0] | |
| # Insert sample entities | |
| entities = [ | |
| ("agent_1", "agent", "RouterAgent", '{"role": "routing", "priority_handling": true}'), | |
| ("agent_2", "agent", "OrderAgent", '{"role": "order_tracking", "data_sources": ["db", "api"]}'), | |
| ("agent_3", "agent", "CompensationAgent", '{"role": "compensation", "max_credit": 50}'), | |
| ("agent_4", "agent", "SupervisorAgent", '{"role": "escalation", "override_authority": true}'), | |
| ("issue_1", "issue", "PaymentSystemFailure", '{"severity": "high", "impact": "service_disruption"}') | |
| ] | |
| for entity_id, entity_type, name, properties in entities: | |
| conn.execute( | |
| """INSERT INTO entities (graph_id, entity_id, type, name, properties, knowledge_graph_namespace) | |
| VALUES (?, ?, ?, ?, ?, ?)""", | |
| (kg_id, entity_id, entity_type, name, properties, "demo") | |
| ) | |
| # Insert sample relations | |
| relations = [ | |
| ("rel_1", "agent_1", "routes_to", "agent_2", '{"priority": "high", "success": true}'), | |
| ("rel_2", "agent_2", "escalates_to", "agent_3", '{"reason": "compensation_needed"}'), | |
| ("rel_3", "agent_3", "escalates_to", "agent_4", '{"reason": "system_error"}'), | |
| ("rel_4", "agent_4", "resolves", "issue_1", '{"method": "manual_override"}') | |
| ] | |
| for relation_id, from_entity, relation_type, to_entity, properties in relations: | |
| conn.execute( | |
| """INSERT INTO relations (graph_id, relation_id, from_entity_id, relation_type, to_entity_id, | |
| properties, knowledge_graph_namespace) VALUES (?, ?, ?, ?, ?, ?, ?)""", | |
| (kg_id, relation_id, from_entity, relation_type, to_entity, properties, "demo") | |
| ) | |
| # Commit transaction | |
| trans.commit() | |
| # Verify data | |
| final_traces = conn.execute("SELECT COUNT(*) FROM traces").fetchone()[0] | |
| final_kgs = conn.execute("SELECT COUNT(*) FROM knowledge_graphs").fetchone()[0] | |
| final_entities = conn.execute("SELECT COUNT(*) FROM entities").fetchone()[0] | |
| final_relations = conn.execute("SELECT COUNT(*) FROM relations").fetchone()[0] | |
| print("β Sample data added successfully!") | |
| print(f" β’ Traces: {final_traces}") | |
| print(f" β’ Knowledge graphs: {final_kgs}") | |
| print(f" β’ Entities: {final_entities}") | |
| print(f" β’ Relations: {final_relations}") | |
| except Exception as e: | |
| trans.rollback() | |
| raise e | |
| except Exception as e: | |
| print(f"β Failed to add sample data: {e}") | |
| import traceback | |
| print("Full error traceback:") | |
| traceback.print_exc() | |
| __all__ = [ | |
| 'get_db', | |
| 'models', | |
| 'init_db', | |
| 'test_database_connection', | |
| 'add_sample_data_for_hf', | |
| 'save_knowledge_graph', | |
| 'update_knowledge_graph_status', | |
| 'get_knowledge_graph', | |
| 'get_all_knowledge_graphs', | |
| 'delete_knowledge_graph', | |
| 'KnowledgeGraph', | |
| 'Entity', | |
| 'Relation' | |
| ] | |