Spaces:
Running
Running
File size: 7,368 Bytes
b0b150b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import threading
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
from sqlalchemy.orm import Session
import logging
from models.agent import Agent, CompilationJob
from modules.knowledge_compiler import KnowledgeCompiler, create_knowledge_compiler
from modules.prompt_analyzer import PromptAnalyzer, create_prompt_analyzer
logger = logging.getLogger(__name__)
def _run_compilation_thread(
agent_id: int,
job_id: int,
storage_path: str,
system_prompt: str,
files_data: list
):
"""Execute the compilation process in a separate thread."""
from core.database import SessionLocal
db = SessionLocal()
job = None
agent = None
try:
job = db.query(CompilationJob).filter(CompilationJob.id == job_id).first()
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not job or not agent:
logger.error(f"Job or agent not found: job_id={job_id}, agent_id={agent_id}")
return
logger.info(f"Starting compilation for agent {agent.name}")
# Step 1: Analyze prompt (10%)
_update_progress(db, job, 10, "Analyzing system prompt")
analyzer = create_prompt_analyzer()
prompt_analysis = analyzer.analyze_prompt(system_prompt)
logger.info(f"Prompt analysis complete: domain={prompt_analysis.get('domain')}")
# Step 2: Initialize compiler (20%)
_update_progress(db, job, 20, "Initializing knowledge compiler")
compiler = create_knowledge_compiler(str(Path(storage_path).parent))
# Step 3: Compile knowledge (20-80%)
_update_progress(db, job, 30, "Compiling knowledge base")
logger.info(f"Starting compilation with {len(files_data)} files")
# Parse files into the expected format
parsed_data = []
for file_info in files_data:
content = file_info.get("content", "")
filename = file_info.get("filename", "unknown")
parsed_data.append({
"source": filename,
"content": content,
"type": "text",
"records": content.split("\n") if content else []
})
# Run compilation with error handling
try:
result = compiler.compile(
agent_name=agent.name,
parsed_data=parsed_data,
system_prompt=system_prompt,
prompt_analysis=prompt_analysis
)
logger.info(f"Compilation complete: {result.get('stats', {})}")
except Exception as compile_error:
logger.error(f"Compilation error (continuing): {compile_error}", exc_info=True)
# Continue even if compilation has issues - embeddings may still be created
result = {"stats": {}, "domain_signature": []}
_update_progress(db, job, 80, "Saving to vector store")
# Step 4: Update agent metadata (90%)
_update_progress(db, job, 90, "Updating agent metadata")
agent.status = "ready"
agent.domain = prompt_analysis.get("domain", "general")
agent.domain_keywords = prompt_analysis.get("domain_keywords", [])
agent.entity_count = result.get("stats", {}).get("total_entries", 0)
# Step 5: Complete (100%)
job.status = "completed"
job.progress = 100
job.current_step = "Compilation complete"
job.completed_at = datetime.utcnow()
db.commit()
logger.info(f"Agent {agent.name} compilation completed successfully")
except Exception as e:
logger.error(f"Compilation failed for job {job_id}: {str(e)}", exc_info=True)
# CRITICAL: Always update job and agent status on error
try:
if not job:
job = db.query(CompilationJob).filter(CompilationJob.id == job_id).first()
if not agent:
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if job:
job.status = "failed"
job.error_message = str(e)[:500] # Limit error message length
job.completed_at = datetime.utcnow()
logger.error(f"Job {job_id} marked as failed")
if agent:
agent.status = "failed"
logger.error(f"Agent {agent_id} marked as failed")
db.commit()
except Exception as update_error:
logger.error(f"Failed to update error status: {update_error}")
db.rollback()
finally:
# CRITICAL: Ensure database connection is closed
try:
db.close()
logger.info(f"Database connection closed for job {job_id}")
except Exception as close_error:
logger.error(f"Error closing database: {close_error}")
def _update_progress(db: Session, job: CompilationJob, progress: int, step: str):
"""Update job progress."""
job.progress = progress
job.current_step = step
db.commit()
logger.info(f"Job {job.id} progress: {progress}% - {step}")
class CompilationWorker:
"""
Background worker for compiling agent knowledge bases.
Uses threading for reliable async execution on Windows.
"""
def __init__(self):
self.active_jobs = {} # agent_id -> job_info
def start_compilation(
self,
db: Session,
agent: Agent,
files_data: list,
progress_callback: Optional[Callable] = None
) -> CompilationJob:
"""Start a background compilation job using threading."""
# Create compilation job record
job = CompilationJob(
agent_id=agent.id,
status="in_progress",
progress=0,
current_step="Initializing"
)
db.add(job)
db.commit()
db.refresh(job)
logger.info(f"Created compilation job {job.id} for agent {agent.name}")
# Start background thread
thread = threading.Thread(
target=_run_compilation_thread,
args=(agent.id, job.id, agent.storage_path, agent.system_prompt, files_data),
daemon=True
)
thread.start()
self.active_jobs[agent.id] = {
"job_id": job.id,
"thread": thread,
"status": "in_progress"
}
logger.info(f"Started compilation thread for agent {agent.name}")
return job
def get_job_status(self, db: Session, agent_id: int) -> Optional[dict]:
"""Get the latest job status for an agent."""
job = db.query(CompilationJob).filter(
CompilationJob.agent_id == agent_id
).order_by(CompilationJob.created_at.desc()).first()
if not job:
return None
return {
"id": job.id,
"status": job.status,
"progress": job.progress,
"current_step": job.current_step,
"error_message": job.error_message,
"created_at": job.created_at,
"completed_at": job.completed_at
}
# Singleton instance
compilation_worker = CompilationWorker()
|