Mexar / backend /workers /compilation_worker.py
Devrajsinh bharatsinh gohil
Initial commit of MEXAR Ultimate - Phase 2 cleanup complete
b0b150b
import threading
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
from sqlalchemy.orm import Session
import logging
from models.agent import Agent, CompilationJob
from modules.knowledge_compiler import KnowledgeCompiler, create_knowledge_compiler
from modules.prompt_analyzer import PromptAnalyzer, create_prompt_analyzer
logger = logging.getLogger(__name__)
def _run_compilation_thread(
agent_id: int,
job_id: int,
storage_path: str,
system_prompt: str,
files_data: list
):
"""Execute the compilation process in a separate thread."""
from core.database import SessionLocal
db = SessionLocal()
job = None
agent = None
try:
job = db.query(CompilationJob).filter(CompilationJob.id == job_id).first()
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not job or not agent:
logger.error(f"Job or agent not found: job_id={job_id}, agent_id={agent_id}")
return
logger.info(f"Starting compilation for agent {agent.name}")
# Step 1: Analyze prompt (10%)
_update_progress(db, job, 10, "Analyzing system prompt")
analyzer = create_prompt_analyzer()
prompt_analysis = analyzer.analyze_prompt(system_prompt)
logger.info(f"Prompt analysis complete: domain={prompt_analysis.get('domain')}")
# Step 2: Initialize compiler (20%)
_update_progress(db, job, 20, "Initializing knowledge compiler")
compiler = create_knowledge_compiler(str(Path(storage_path).parent))
# Step 3: Compile knowledge (20-80%)
_update_progress(db, job, 30, "Compiling knowledge base")
logger.info(f"Starting compilation with {len(files_data)} files")
# Parse files into the expected format
parsed_data = []
for file_info in files_data:
content = file_info.get("content", "")
filename = file_info.get("filename", "unknown")
parsed_data.append({
"source": filename,
"content": content,
"type": "text",
"records": content.split("\n") if content else []
})
# Run compilation with error handling
try:
result = compiler.compile(
agent_name=agent.name,
parsed_data=parsed_data,
system_prompt=system_prompt,
prompt_analysis=prompt_analysis
)
logger.info(f"Compilation complete: {result.get('stats', {})}")
except Exception as compile_error:
logger.error(f"Compilation error (continuing): {compile_error}", exc_info=True)
# Continue even if compilation has issues - embeddings may still be created
result = {"stats": {}, "domain_signature": []}
_update_progress(db, job, 80, "Saving to vector store")
# Step 4: Update agent metadata (90%)
_update_progress(db, job, 90, "Updating agent metadata")
agent.status = "ready"
agent.domain = prompt_analysis.get("domain", "general")
agent.domain_keywords = prompt_analysis.get("domain_keywords", [])
agent.entity_count = result.get("stats", {}).get("total_entries", 0)
# Step 5: Complete (100%)
job.status = "completed"
job.progress = 100
job.current_step = "Compilation complete"
job.completed_at = datetime.utcnow()
db.commit()
logger.info(f"Agent {agent.name} compilation completed successfully")
except Exception as e:
logger.error(f"Compilation failed for job {job_id}: {str(e)}", exc_info=True)
# CRITICAL: Always update job and agent status on error
try:
if not job:
job = db.query(CompilationJob).filter(CompilationJob.id == job_id).first()
if not agent:
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if job:
job.status = "failed"
job.error_message = str(e)[:500] # Limit error message length
job.completed_at = datetime.utcnow()
logger.error(f"Job {job_id} marked as failed")
if agent:
agent.status = "failed"
logger.error(f"Agent {agent_id} marked as failed")
db.commit()
except Exception as update_error:
logger.error(f"Failed to update error status: {update_error}")
db.rollback()
finally:
# CRITICAL: Ensure database connection is closed
try:
db.close()
logger.info(f"Database connection closed for job {job_id}")
except Exception as close_error:
logger.error(f"Error closing database: {close_error}")
def _update_progress(db: Session, job: CompilationJob, progress: int, step: str):
"""Update job progress."""
job.progress = progress
job.current_step = step
db.commit()
logger.info(f"Job {job.id} progress: {progress}% - {step}")
class CompilationWorker:
"""
Background worker for compiling agent knowledge bases.
Uses threading for reliable async execution on Windows.
"""
def __init__(self):
self.active_jobs = {} # agent_id -> job_info
def start_compilation(
self,
db: Session,
agent: Agent,
files_data: list,
progress_callback: Optional[Callable] = None
) -> CompilationJob:
"""Start a background compilation job using threading."""
# Create compilation job record
job = CompilationJob(
agent_id=agent.id,
status="in_progress",
progress=0,
current_step="Initializing"
)
db.add(job)
db.commit()
db.refresh(job)
logger.info(f"Created compilation job {job.id} for agent {agent.name}")
# Start background thread
thread = threading.Thread(
target=_run_compilation_thread,
args=(agent.id, job.id, agent.storage_path, agent.system_prompt, files_data),
daemon=True
)
thread.start()
self.active_jobs[agent.id] = {
"job_id": job.id,
"thread": thread,
"status": "in_progress"
}
logger.info(f"Started compilation thread for agent {agent.name}")
return job
def get_job_status(self, db: Session, agent_id: int) -> Optional[dict]:
"""Get the latest job status for an agent."""
job = db.query(CompilationJob).filter(
CompilationJob.agent_id == agent_id
).order_by(CompilationJob.created_at.desc()).first()
if not job:
return None
return {
"id": job.id,
"status": job.status,
"progress": job.progress,
"current_step": job.current_step,
"error_message": job.error_message,
"created_at": job.created_at,
"completed_at": job.completed_at
}
# Singleton instance
compilation_worker = CompilationWorker()