Spaces:
Running
Running
| import threading | |
| from pathlib import Path | |
| from typing import Optional, Callable | |
| from datetime import datetime | |
| from sqlalchemy.orm import Session | |
| import logging | |
| from models.agent import Agent, CompilationJob | |
| from modules.knowledge_compiler import KnowledgeCompiler, create_knowledge_compiler | |
| from modules.prompt_analyzer import PromptAnalyzer, create_prompt_analyzer | |
| logger = logging.getLogger(__name__) | |
| def _run_compilation_thread( | |
| agent_id: int, | |
| job_id: int, | |
| storage_path: str, | |
| system_prompt: str, | |
| files_data: list | |
| ): | |
| """Execute the compilation process in a separate thread.""" | |
| from core.database import SessionLocal | |
| db = SessionLocal() | |
| job = None | |
| agent = None | |
| try: | |
| job = db.query(CompilationJob).filter(CompilationJob.id == job_id).first() | |
| agent = db.query(Agent).filter(Agent.id == agent_id).first() | |
| if not job or not agent: | |
| logger.error(f"Job or agent not found: job_id={job_id}, agent_id={agent_id}") | |
| return | |
| logger.info(f"Starting compilation for agent {agent.name}") | |
| # Step 1: Analyze prompt (10%) | |
| _update_progress(db, job, 10, "Analyzing system prompt") | |
| analyzer = create_prompt_analyzer() | |
| prompt_analysis = analyzer.analyze_prompt(system_prompt) | |
| logger.info(f"Prompt analysis complete: domain={prompt_analysis.get('domain')}") | |
| # Step 2: Initialize compiler (20%) | |
| _update_progress(db, job, 20, "Initializing knowledge compiler") | |
| compiler = create_knowledge_compiler(str(Path(storage_path).parent)) | |
| # Step 3: Compile knowledge (20-80%) | |
| _update_progress(db, job, 30, "Compiling knowledge base") | |
| logger.info(f"Starting compilation with {len(files_data)} files") | |
| # Parse files into the expected format | |
| parsed_data = [] | |
| for file_info in files_data: | |
| content = file_info.get("content", "") | |
| filename = file_info.get("filename", "unknown") | |
| parsed_data.append({ | |
| "source": filename, | |
| "content": content, | |
| "type": "text", | |
| "records": content.split("\n") if content else [] | |
| }) | |
| # Run compilation with error handling | |
| try: | |
| result = compiler.compile( | |
| agent_name=agent.name, | |
| parsed_data=parsed_data, | |
| system_prompt=system_prompt, | |
| prompt_analysis=prompt_analysis | |
| ) | |
| logger.info(f"Compilation complete: {result.get('stats', {})}") | |
| except Exception as compile_error: | |
| logger.error(f"Compilation error (continuing): {compile_error}", exc_info=True) | |
| # Continue even if compilation has issues - embeddings may still be created | |
| result = {"stats": {}, "domain_signature": []} | |
| _update_progress(db, job, 80, "Saving to vector store") | |
| # Step 4: Update agent metadata (90%) | |
| _update_progress(db, job, 90, "Updating agent metadata") | |
| agent.status = "ready" | |
| agent.domain = prompt_analysis.get("domain", "general") | |
| agent.domain_keywords = prompt_analysis.get("domain_keywords", []) | |
| agent.entity_count = result.get("stats", {}).get("total_entries", 0) | |
| # Step 5: Complete (100%) | |
| job.status = "completed" | |
| job.progress = 100 | |
| job.current_step = "Compilation complete" | |
| job.completed_at = datetime.utcnow() | |
| db.commit() | |
| logger.info(f"Agent {agent.name} compilation completed successfully") | |
| except Exception as e: | |
| logger.error(f"Compilation failed for job {job_id}: {str(e)}", exc_info=True) | |
| # CRITICAL: Always update job and agent status on error | |
| try: | |
| if not job: | |
| job = db.query(CompilationJob).filter(CompilationJob.id == job_id).first() | |
| if not agent: | |
| agent = db.query(Agent).filter(Agent.id == agent_id).first() | |
| if job: | |
| job.status = "failed" | |
| job.error_message = str(e)[:500] # Limit error message length | |
| job.completed_at = datetime.utcnow() | |
| logger.error(f"Job {job_id} marked as failed") | |
| if agent: | |
| agent.status = "failed" | |
| logger.error(f"Agent {agent_id} marked as failed") | |
| db.commit() | |
| except Exception as update_error: | |
| logger.error(f"Failed to update error status: {update_error}") | |
| db.rollback() | |
| finally: | |
| # CRITICAL: Ensure database connection is closed | |
| try: | |
| db.close() | |
| logger.info(f"Database connection closed for job {job_id}") | |
| except Exception as close_error: | |
| logger.error(f"Error closing database: {close_error}") | |
| def _update_progress(db: Session, job: CompilationJob, progress: int, step: str): | |
| """Update job progress.""" | |
| job.progress = progress | |
| job.current_step = step | |
| db.commit() | |
| logger.info(f"Job {job.id} progress: {progress}% - {step}") | |
| class CompilationWorker: | |
| """ | |
| Background worker for compiling agent knowledge bases. | |
| Uses threading for reliable async execution on Windows. | |
| """ | |
| def __init__(self): | |
| self.active_jobs = {} # agent_id -> job_info | |
| def start_compilation( | |
| self, | |
| db: Session, | |
| agent: Agent, | |
| files_data: list, | |
| progress_callback: Optional[Callable] = None | |
| ) -> CompilationJob: | |
| """Start a background compilation job using threading.""" | |
| # Create compilation job record | |
| job = CompilationJob( | |
| agent_id=agent.id, | |
| status="in_progress", | |
| progress=0, | |
| current_step="Initializing" | |
| ) | |
| db.add(job) | |
| db.commit() | |
| db.refresh(job) | |
| logger.info(f"Created compilation job {job.id} for agent {agent.name}") | |
| # Start background thread | |
| thread = threading.Thread( | |
| target=_run_compilation_thread, | |
| args=(agent.id, job.id, agent.storage_path, agent.system_prompt, files_data), | |
| daemon=True | |
| ) | |
| thread.start() | |
| self.active_jobs[agent.id] = { | |
| "job_id": job.id, | |
| "thread": thread, | |
| "status": "in_progress" | |
| } | |
| logger.info(f"Started compilation thread for agent {agent.name}") | |
| return job | |
| def get_job_status(self, db: Session, agent_id: int) -> Optional[dict]: | |
| """Get the latest job status for an agent.""" | |
| job = db.query(CompilationJob).filter( | |
| CompilationJob.agent_id == agent_id | |
| ).order_by(CompilationJob.created_at.desc()).first() | |
| if not job: | |
| return None | |
| return { | |
| "id": job.id, | |
| "status": job.status, | |
| "progress": job.progress, | |
| "current_step": job.current_step, | |
| "error_message": job.error_message, | |
| "created_at": job.created_at, | |
| "completed_at": job.completed_at | |
| } | |
| # Singleton instance | |
| compilation_worker = CompilationWorker() | |