Spaces:
Sleeping
Sleeping
| """ | |
| Reconstruction Service | |
| This service handles all database operations for prompt reconstruction, | |
| providing a clean interface between the database layer and the pure | |
| reconstruction functions in agentgraph.reconstruction. | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from sqlalchemy.orm import Session | |
| from datetime import datetime, timezone | |
| import traceback | |
| from backend.database.models import PromptReconstruction, KnowledgeGraph | |
| from backend.database.utils import get_knowledge_graph_by_id, get_knowledge_graph | |
| from backend.database import get_db | |
| from backend.services.task_service import update_task_status | |
| logger = logging.getLogger(__name__) | |
| class ReconstructionService: | |
| """ | |
| Service for orchestrating prompt reconstruction with database operations. | |
| This service fetches data from the database, calls pure reconstruction functions | |
| from agentgraph.reconstruction, and saves the results back to the database. | |
| """ | |
| def __init__(self, session: Session): | |
| self.session = session | |
| def fetch_reconstruction_data(self, kg_identifier: str) -> Dict[str, Any]: | |
| """ | |
| Fetch knowledge graph data needed for reconstruction from the database. | |
| Args: | |
| kg_identifier: Knowledge graph identifier (ID or filename) | |
| Returns: | |
| Dictionary containing knowledge graph data for reconstruction | |
| """ | |
| try: | |
| # Try to load by ID first (if numeric), then by filename | |
| kg = None | |
| if str(kg_identifier).isdigit(): | |
| kg = get_knowledge_graph_by_id(self.session, kg_identifier) | |
| if not kg: | |
| kg = get_knowledge_graph(self.session, kg_identifier) | |
| if not kg: | |
| return {"error": f"Knowledge graph with identifier {kg_identifier} not found"} | |
| # Extract the actual graph data | |
| if hasattr(kg, 'graph_data'): | |
| kg_data = kg.graph_data | |
| else: | |
| kg_data = kg | |
| # Ensure the graph data has entities and relations | |
| if not kg_data or 'entities' not in kg_data or 'relations' not in kg_data: | |
| return {"error": f"Invalid knowledge graph data for {kg_identifier}"} | |
| reconstruction_data = { | |
| "knowledge_graph": kg_data, | |
| "knowledge_graph_id": kg.id, | |
| "knowledge_graph_filename": getattr(kg, 'filename', str(kg_identifier)), | |
| "entities": {entity["id"]: entity for entity in kg_data["entities"]}, | |
| "relations": {relation["id"]: relation for relation in kg_data["relations"]} | |
| } | |
| logger.info(f"Successfully loaded knowledge graph {kg_identifier} with {len(kg_data['entities'])} entities and {len(kg_data['relations'])} relations") | |
| return reconstruction_data | |
| except Exception as e: | |
| logger.error(f"Error loading knowledge graph {kg_identifier}: {repr(e)}") | |
| return {"error": f"Failed to load knowledge graph: {repr(e)}"} | |
| def save_prompt_reconstructions( | |
| self, | |
| kg_identifier: str, | |
| reconstructed_relations: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Save reconstructed prompts to the database. | |
| Args: | |
| kg_identifier: Knowledge graph identifier | |
| reconstructed_relations: List of relations with reconstructed prompts | |
| Returns: | |
| Dictionary with save results and metadata | |
| """ | |
| try: | |
| logger.info(f"Saving prompt reconstructions for knowledge graph: {kg_identifier}") | |
| # Find the knowledge graph | |
| kg = None | |
| # First try direct lookup by ID | |
| if str(kg_identifier).isdigit(): | |
| logger.info(f"Trying to find knowledge graph by ID: {kg_identifier}") | |
| kg = self.session.query(KnowledgeGraph).filter_by(id=kg_identifier).first() | |
| if kg: | |
| logger.info(f"Found knowledge graph by ID {kg_identifier}") | |
| # If not found by ID, try by filename | |
| if not kg: | |
| logger.info(f"Trying to find knowledge graph by filename: {kg_identifier}") | |
| kg = self.session.query(KnowledgeGraph).filter_by(filename=kg_identifier).first() | |
| if kg: | |
| logger.info(f"Found knowledge graph by filename {kg_identifier}") | |
| if not kg: | |
| error_msg = f"Knowledge graph with identifier {kg_identifier} not found" | |
| logger.error(error_msg) | |
| return {"error": error_msg} | |
| logger.info(f"Found knowledge graph: ID={kg.id}, filename={kg.filename}, status={kg.status}") | |
| # Delete existing prompt reconstructions for this knowledge graph (if any) | |
| existing_count = self.session.query(PromptReconstruction).filter_by( | |
| knowledge_graph_id=kg.id | |
| ).count() | |
| if existing_count > 0: | |
| logger.info(f"Deleting {existing_count} existing prompt reconstructions") | |
| self.session.query(PromptReconstruction).filter_by( | |
| knowledge_graph_id=kg.id | |
| ).delete() | |
| # Save new prompt reconstructions | |
| saved_count = 0 | |
| for relation in reconstructed_relations: | |
| pr = PromptReconstruction( | |
| knowledge_graph_id=kg.id, | |
| relation_id=relation["id"], | |
| reconstructed_prompt=relation.get("prompt", ""), | |
| dependencies=relation.get("dependencies", {}), | |
| ) | |
| self.session.add(pr) | |
| saved_count += 1 | |
| # Update the knowledge graph status to 'enriched' | |
| kg.status = "enriched" | |
| kg.updated_at = datetime.utcnow() | |
| # Commit all changes | |
| self.session.commit() | |
| logger.info(f"Successfully saved {saved_count} prompt reconstructions for KG {kg_identifier}") | |
| return { | |
| "status": "success", | |
| "knowledge_graph_id": kg.id, | |
| "knowledge_graph_filename": kg.filename, | |
| "saved_reconstructions": saved_count, | |
| "replaced_existing": existing_count | |
| } | |
| except Exception as e: | |
| logger.error(f"Error saving prompt reconstructions: {repr(e)}") | |
| self.session.rollback() | |
| return {"error": f"Failed to save prompt reconstructions: {repr(e)}"} | |
| def run_prompt_reconstruction( | |
| self, | |
| kg_identifier: str, | |
| output_identifier: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run prompt reconstruction with database operations. | |
| Args: | |
| kg_identifier: Knowledge graph identifier to reconstruct | |
| output_identifier: Optional output identifier (defaults to kg_identifier) | |
| Returns: | |
| Dictionary containing reconstruction results | |
| """ | |
| # Fetch data from database | |
| reconstruction_data = self.fetch_reconstruction_data(kg_identifier) | |
| if "error" in reconstruction_data: | |
| return reconstruction_data | |
| try: | |
| # Import and call pure reconstruction function | |
| from agentgraph.reconstruction import reconstruct_prompts_from_knowledge_graph | |
| reconstructed_relations = reconstruct_prompts_from_knowledge_graph( | |
| reconstruction_data["knowledge_graph"] | |
| ) | |
| reconstruction_results = { | |
| "reconstructed_relations": reconstructed_relations, | |
| "metadata": { | |
| "total_relations": len(reconstructed_relations), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| } | |
| if "error" in reconstruction_results: | |
| return reconstruction_results | |
| # Save results to database | |
| save_identifier = output_identifier or kg_identifier | |
| save_results = self.save_prompt_reconstructions( | |
| kg_identifier=save_identifier, | |
| reconstructed_relations=reconstruction_results["reconstructed_relations"] | |
| ) | |
| if "error" in save_results: | |
| return save_results | |
| # Combine results | |
| final_results = { | |
| "status": "success", | |
| "knowledge_graph_id": reconstruction_data["knowledge_graph_id"], | |
| "reconstruction_metadata": reconstruction_results["metadata"], | |
| "save_results": save_results, | |
| "total_relations_processed": len(reconstruction_results["reconstructed_relations"]) | |
| } | |
| return final_results | |
| except Exception as e: | |
| logger.error(f"Error during prompt reconstruction: {repr(e)}") | |
| return {"error": f"Reconstruction failed: {repr(e)}"} | |
| def get_prompt_reconstructions( | |
| self, | |
| kg_identifier: str | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get saved prompt reconstructions from database. | |
| Args: | |
| kg_identifier: Knowledge graph identifier | |
| Returns: | |
| List of prompt reconstruction dictionaries | |
| """ | |
| try: | |
| # Find the knowledge graph | |
| kg = None | |
| if str(kg_identifier).isdigit(): | |
| kg = self.session.query(KnowledgeGraph).filter_by(id=kg_identifier).first() | |
| if not kg: | |
| kg = self.session.query(KnowledgeGraph).filter_by(filename=kg_identifier).first() | |
| if not kg: | |
| logger.error(f"Knowledge graph with identifier {kg_identifier} not found") | |
| return [] | |
| # Get prompt reconstructions | |
| reconstructions = self.session.query(PromptReconstruction).filter_by( | |
| knowledge_graph_id=kg.id | |
| ).all() | |
| return [ | |
| { | |
| "id": pr.id, | |
| "relation_id": pr.relation_id, | |
| "reconstructed_prompt": pr.reconstructed_prompt, | |
| "dependencies": pr.dependencies, | |
| "created_at": pr.created_at.isoformat() if pr.created_at else None | |
| } | |
| for pr in reconstructions | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error retrieving prompt reconstructions: {repr(e)}") | |
| return [] | |
| def get_reconstruction_summary(self, kg_identifier: str) -> Dict[str, Any]: | |
| """ | |
| Get a summary of reconstruction results for a knowledge graph. | |
| Args: | |
| kg_identifier: Knowledge graph identifier | |
| Returns: | |
| Dictionary containing summary statistics | |
| """ | |
| try: | |
| reconstructions = self.get_prompt_reconstructions(kg_identifier) | |
| if not reconstructions: | |
| return { | |
| "total_reconstructions": 0, | |
| "knowledge_graph_identifier": kg_identifier, | |
| "status": "no_reconstructions_found" | |
| } | |
| # Calculate summary statistics | |
| total_reconstructions = len(reconstructions) | |
| # Get relation types from dependencies if available | |
| relation_types = set() | |
| entity_count = 0 | |
| for recon in reconstructions: | |
| deps = recon.get("dependencies", {}) | |
| entities = deps.get("entities", []) | |
| relations = deps.get("relations", []) | |
| entity_count += len(entities) | |
| # This would require additional query to get relation types | |
| # For now, just count unique relation IDs | |
| relation_types.update(relations) | |
| return { | |
| "total_reconstructions": total_reconstructions, | |
| "unique_relations_referenced": len(relation_types), | |
| "total_entity_references": entity_count, | |
| "knowledge_graph_identifier": kg_identifier, | |
| "status": "reconstructions_available", | |
| "latest_reconstruction": max(recon.get("created_at", "") for recon in reconstructions) if reconstructions else None | |
| } | |
| except Exception as e: | |
| logger.error(f"Error generating reconstruction summary: {repr(e)}") | |
| return {"error": f"Failed to generate summary: {repr(e)}"} | |
| def reconstruct_single_relation( | |
| self, | |
| kg_identifier: str, | |
| relation_id: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| Reconstruct prompt for a single relation. | |
| Args: | |
| kg_identifier: Knowledge graph identifier | |
| relation_id: Specific relation ID to reconstruct | |
| Returns: | |
| Dictionary containing reconstruction result for the single relation | |
| """ | |
| # Fetch data from database | |
| reconstruction_data = self.fetch_reconstruction_data(kg_identifier) | |
| if "error" in reconstruction_data: | |
| return reconstruction_data | |
| try: | |
| # Import and call pure reconstruction function | |
| from agentgraph.reconstruction import reconstruct_single_relation_prompt | |
| result = reconstruct_single_relation_prompt( | |
| knowledge_graph_data=reconstruction_data["knowledge_graph"], | |
| relation_id=relation_id | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error reconstructing single relation {relation_id}: {repr(e)}") | |
| return {"error": f"Single relation reconstruction failed: {repr(e)}"} | |
| def enrich_knowledge_graph_with_prompts( | |
| self, | |
| kg_identifier: str, | |
| output_identifier: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Enrich a knowledge graph with reconstructed prompts and save to database. | |
| Args: | |
| kg_identifier: Knowledge graph identifier to enrich | |
| output_identifier: Optional output identifier | |
| Returns: | |
| Dictionary containing enrichment results | |
| """ | |
| logger.info(f"Starting knowledge graph enrichment for {kg_identifier}") | |
| # Run the prompt reconstruction | |
| results = self.run_prompt_reconstruction(kg_identifier, output_identifier) | |
| if "error" in results: | |
| return results | |
| return { | |
| "status": "enriched", | |
| "knowledge_graph": results.get("knowledge_graph_id"), | |
| "reconstruction_results": results | |
| } | |
| def save_reconstructions(self, kg_id: str, reconstructions: list): | |
| """Saves prompt reconstructions to the database and updates KG status.""" | |
| session = next(get_db()) | |
| try: | |
| for recon in reconstructions: | |
| pr = PromptReconstruction( | |
| knowledge_graph_id=kg_id, | |
| relation_id=recon["relation_id"], | |
| reconstructed_prompt=recon["reconstructed_prompt"], | |
| dependencies=recon.get("dependencies", {}) | |
| ) | |
| session.add(pr) | |
| kg = get_knowledge_graph_by_id(session, kg_id) | |
| if kg: | |
| kg.status = "enriched" | |
| kg.update_timestamp = datetime.now(timezone.utc) | |
| session.commit() | |
| finally: | |
| session.close() | |
| async def enrich_knowledge_graph_task(kg_id: str, task_id: str) -> bool: | |
| """ | |
| Background task for enriching a knowledge graph using PromptReconstructor. | |
| Returns True if successful, False otherwise. | |
| """ | |
| logger.info(f"Starting knowledge graph enrichment task {task_id} for KG {kg_id}") | |
| update_task_status(task_id, "RUNNING", "Enriching knowledge graph") | |
| try: | |
| session = next(get_db()) | |
| try: | |
| from agentgraph.reconstruction import PromptReconstructor | |
| # First get the knowledge graph to ensure it exists and to get its filename | |
| kg = get_knowledge_graph_by_id(session, kg_id) | |
| if not kg: | |
| logger.error(f"Knowledge graph with ID {kg_id} not found") | |
| update_task_status(task_id, "FAILED", f"Knowledge graph with ID {kg_id} not found") | |
| return False | |
| # Use the actual filename as the output identifier to ensure it's found properly | |
| # If filename is None, use the ID as a fallback | |
| output_identifier = kg.filename if kg.filename else str(kg_id) | |
| # Log the KG details and output_identifier for debugging | |
| logger.info(f"Knowledge graph found: ID={kg.id}, filename={kg.filename}, status={kg.status}") | |
| logger.info(f"Using output_identifier: {output_identifier}") | |
| # Use the new pure function approach | |
| from agentgraph.reconstruction import reconstruct_prompts_from_knowledge_graph | |
| from backend.database.models import PromptReconstruction | |
| # Get the knowledge graph data | |
| kg_data = kg.graph_data | |
| # Use pure function to reconstruct prompts | |
| reconstructed_relations = reconstruct_prompts_from_knowledge_graph(kg_data) | |
| # Save the prompt reconstructions to the database | |
| for relation in reconstructed_relations: | |
| # Check if prompt reconstruction already exists | |
| existing_pr = session.query(PromptReconstruction).filter_by( | |
| knowledge_graph_id=kg.id, | |
| relation_id=relation["id"] | |
| ).first() | |
| if existing_pr: | |
| # Update existing | |
| existing_pr.reconstructed_prompt = relation["prompt"] | |
| existing_pr.dependencies = relation.get("dependencies", {}) | |
| else: | |
| # Create new | |
| pr = PromptReconstruction( | |
| knowledge_graph_id=kg.id, | |
| relation_id=relation["id"], | |
| reconstructed_prompt=relation["prompt"], | |
| dependencies=relation.get("dependencies", {}), | |
| ) | |
| session.add(pr) | |
| # Update the knowledge graph status | |
| kg.status = "enriched" | |
| kg.update_timestamp = datetime.now(timezone.utc) | |
| session.commit() | |
| update_task_status(task_id, "COMPLETED", "Knowledge graph enriched successfully") | |
| logger.info(f"Knowledge graph {kg_id} enriched successfully") | |
| return True | |
| finally: | |
| session.close() | |
| except Exception as e: | |
| error_message = f"Error enriching knowledge graph: {str(e)}" | |
| logger.error(error_message) | |
| # Log the full traceback for easier debugging | |
| logger.error(traceback.format_exc()) | |
| update_task_status(task_id, "FAILED", error_message) | |
| return False |