| """ |
| CriticAgent for SPARKNET - LangChain Version |
| Reviews and validates outputs against VISTA quality standards |
| Uses LangChain chains for structured validation and feedback |
| """ |
|
|
| from typing import Optional, Dict, Any, List |
| from loguru import logger |
| import json |
|
|
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.output_parsers import JsonOutputParser |
| from langchain_core.messages import HumanMessage, SystemMessage |
|
|
| from .base_agent import BaseAgent, Task, Message |
| from ..llm.langchain_ollama_client import LangChainOllamaClient |
| from ..workflow.langgraph_state import ValidationResult |
|
|
|
|
| class CriticAgent(BaseAgent): |
| """ |
| Agent specialized in output validation and quality assurance. |
| Uses LangChain chains with mistral for balanced analysis. |
| Ensures outputs meet VISTA quality standards. |
| """ |
|
|
| |
| QUALITY_CRITERIA = { |
| 'patent_analysis': { |
| 'completeness': { |
| 'weight': 0.30, |
| 'threshold': 0.90, |
| 'description': 'Must extract >90% of claims and key information', |
| }, |
| 'clarity': { |
| 'weight': 0.25, |
| 'threshold': 0.85, |
| 'description': 'Summaries and explanations must be clear and understandable', |
| }, |
| 'actionability': { |
| 'weight': 0.25, |
| 'threshold': 0.80, |
| 'description': 'Must include clear next steps and recommendations', |
| }, |
| 'accuracy': { |
| 'weight': 0.20, |
| 'threshold': 0.90, |
| 'description': 'Information must be factually correct', |
| }, |
| }, |
| 'legal_review': { |
| 'accuracy': { |
| 'weight': 0.35, |
| 'threshold': 0.95, |
| 'description': 'Risk identification must be precise', |
| }, |
| 'coverage': { |
| 'weight': 0.30, |
| 'threshold': 0.90, |
| 'description': 'Must check all major clauses and sections', |
| }, |
| 'compliance': { |
| 'weight': 0.25, |
| 'threshold': 1.00, |
| 'description': 'GDPR/Law 25 compliance must be 100%', |
| }, |
| 'actionability': { |
| 'weight': 0.10, |
| 'threshold': 0.85, |
| 'description': 'Must provide clear remediation steps', |
| }, |
| }, |
| 'stakeholder_matching': { |
| 'relevance': { |
| 'weight': 0.35, |
| 'threshold': 0.85, |
| 'description': 'Matches must be relevant to objectives', |
| }, |
| 'diversity': { |
| 'weight': 0.20, |
| 'threshold': 0.75, |
| 'description': 'Should include diverse perspectives', |
| }, |
| 'justification': { |
| 'weight': 0.25, |
| 'threshold': 0.80, |
| 'description': 'Must explain why matches are appropriate', |
| }, |
| 'actionability': { |
| 'weight': 0.20, |
| 'threshold': 0.85, |
| 'description': 'Must include concrete next steps', |
| }, |
| }, |
| 'general': { |
| 'completeness': { |
| 'weight': 0.30, |
| 'threshold': 0.80, |
| 'description': 'All required elements present', |
| }, |
| 'clarity': { |
| 'weight': 0.25, |
| 'threshold': 0.80, |
| 'description': 'Clear and understandable', |
| }, |
| 'accuracy': { |
| 'weight': 0.25, |
| 'threshold': 0.85, |
| 'description': 'Factually correct', |
| }, |
| 'actionability': { |
| 'weight': 0.20, |
| 'threshold': 0.75, |
| 'description': 'Provides next steps', |
| }, |
| }, |
| } |
|
|
| def __init__( |
| self, |
| llm_client: LangChainOllamaClient, |
| memory_agent: Optional['MemoryAgent'] = None, |
| temperature: float = 0.6, |
| ): |
| """ |
| Initialize CriticAgent with LangChain client. |
| |
| Args: |
| llm_client: LangChain Ollama client |
| memory_agent: Optional memory agent for context |
| temperature: LLM temperature for validation |
| """ |
| self.llm_client = llm_client |
| self.memory_agent = memory_agent |
| self.temperature = temperature |
|
|
| |
| self.validation_chain = self._create_validation_chain() |
| self.feedback_chain = self._create_feedback_chain() |
|
|
| |
| self.name = "CriticAgent" |
| self.description = "Output validation and quality assurance" |
|
|
| logger.info(f"Initialized CriticAgent with LangChain (complexity: analysis)") |
|
|
| def _create_validation_chain(self): |
| """ |
| Create LangChain chain for output validation. |
| |
| Returns: |
| Runnable chain: prompt | llm | parser |
| """ |
| system_template = """You are a critical analysis agent for research valorization outputs. |
| |
| Your role is to: |
| 1. Review outputs from other agents objectively |
| 2. Identify errors, inconsistencies, or gaps |
| 3. Assess quality against specific criteria |
| 4. Provide constructive feedback for improvement |
| 5. Ensure alignment with VISTA project objectives |
| |
| When reviewing output, evaluate: |
| - Completeness: Are all required elements present? |
| - Clarity: Is it easy to understand? |
| - Accuracy: Is the information correct? |
| - Actionability: Does it provide clear next steps? |
| - Relevance: Does it address the original task? |
| |
| Be thorough but fair. Focus on constructive feedback that helps improve quality. |
| |
| Output your assessment as JSON with this structure: |
| {{ |
| "dimension_scores": {{"completeness": 0.85, "clarity": 0.90, ...}}, |
| "issues": ["Issue 1", "Issue 2"], |
| "suggestions": ["Suggestion 1", "Suggestion 2"], |
| "details": {{}} |
| }}""" |
|
|
| human_template = """Review the following output and assess its quality. |
| |
| ORIGINAL TASK: |
| {task_description} |
| |
| OUTPUT TO REVIEW: |
| {output_text} |
| |
| QUALITY CRITERIA: |
| {criteria_text} |
| |
| For each criterion, score from 0.0 to 1.0: |
| - 1.0 = Perfect |
| - 0.8-0.9 = Good, minor improvements possible |
| - 0.6-0.7 = Acceptable, some issues |
| - 0.4-0.5 = Poor, significant issues |
| - < 0.4 = Unacceptable |
| |
| Provide: |
| 1. Score for each dimension (dimension_scores) |
| 2. List of specific issues found (issues) |
| 3. Concrete suggestions for improvement (suggestions) |
| 4. Additional details if needed (details) |
| |
| Output JSON only.""" |
|
|
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_template), |
| ("human", human_template) |
| ]) |
|
|
| |
| llm = self.llm_client.get_llm(complexity="analysis", temperature=self.temperature) |
|
|
| |
| parser = JsonOutputParser() |
|
|
| |
| chain = prompt | llm | parser |
|
|
| return chain |
|
|
| def _create_feedback_chain(self): |
| """ |
| Create LangChain chain for generating constructive feedback. |
| |
| Returns: |
| Runnable chain for feedback generation |
| """ |
| system_template = """You are an expert at providing constructive feedback for improvement. |
| |
| Your role is to: |
| 1. Analyze validation results and identify key issues |
| 2. Generate specific, actionable improvement suggestions |
| 3. Prioritize suggestions by impact |
| 4. Explain why each suggestion matters |
| 5. Be encouraging while being honest about problems |
| |
| Focus on feedback that: |
| - Is specific and concrete |
| - Can be acted upon immediately |
| - Addresses root causes, not symptoms |
| - Builds on strengths while fixing weaknesses""" |
|
|
| human_template = """Generate constructive feedback for the following output. |
| |
| VALIDATION RESULTS: |
| - Overall Score: {overall_score} |
| - Issues: {issues} |
| - Dimension Scores: {dimension_scores} |
| |
| ORIGINAL OUTPUT: |
| {output_text} |
| |
| Provide prioritized suggestions for improvement. Output as JSON: |
| {{ |
| "priority_suggestions": ["Most important suggestion", "Second priority", ...], |
| "strengths": ["What worked well", ...], |
| "weaknesses": ["What needs improvement", ...], |
| "next_steps": ["Specific action 1", "Specific action 2", ...] |
| }}""" |
|
|
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_template), |
| ("human", human_template) |
| ]) |
|
|
| llm = self.llm_client.get_llm(complexity="analysis", temperature=self.temperature) |
| parser = JsonOutputParser() |
|
|
| chain = prompt | llm | parser |
|
|
| return chain |
|
|
| async def process_task(self, task: Task) -> Task: |
| """ |
| Process validation task. |
| |
| Args: |
| task: Task containing output to validate |
| |
| Returns: |
| Updated task with validation result |
| """ |
| logger.info(f"CriticAgent validating output for task: {task.id}") |
| task.status = "in_progress" |
|
|
| try: |
| |
| if not task.metadata or 'output_to_validate' not in task.metadata: |
| raise ValueError("No output provided for validation") |
|
|
| output = task.metadata['output_to_validate'] |
| output_type = task.metadata.get('output_type', 'general') |
| criteria_override = task.metadata.get('criteria') |
|
|
| |
| validation_result = await self.validate_output( |
| output=output, |
| task=task, |
| output_type=output_type, |
| criteria=criteria_override, |
| ) |
|
|
| |
| task.result = validation_result |
| task.status = "completed" |
|
|
| logger.info(f"Validation completed: {validation_result.overall_score:.2f} score") |
|
|
| except Exception as e: |
| logger.error(f"Validation failed: {e}") |
| task.status = "failed" |
| task.error = str(e) |
|
|
| return task |
|
|
| async def validate_output( |
| self, |
| output: Any, |
| task: Task, |
| output_type: str = 'general', |
| criteria: Optional[Dict[str, Any]] = None, |
| ) -> ValidationResult: |
| """ |
| Validate output against quality criteria using LangChain. |
| |
| Args: |
| output: Output to validate (can be str, dict, list, etc.) |
| task: Original task that produced this output |
| output_type: Type of output (determines criteria) |
| criteria: Optional custom criteria |
| |
| Returns: |
| ValidationResult with score, issues, and suggestions |
| """ |
| |
| if criteria is None: |
| criteria = self.QUALITY_CRITERIA.get(output_type, self.QUALITY_CRITERIA['general']) |
|
|
| |
| if isinstance(output, dict) or isinstance(output, list): |
| output_str = json.dumps(output, indent=2) |
| else: |
| output_str = str(output) |
|
|
| |
| output_str = output_str[:2000] |
|
|
| |
| criteria_desc = [] |
| for dim, props in criteria.items(): |
| criteria_desc.append( |
| f"- {dim.capitalize()} (threshold: {props['threshold']:.0%}): {props['description']}" |
| ) |
| criteria_text = "\n".join(criteria_desc) |
|
|
| try: |
| |
| result = await self.validation_chain.ainvoke({ |
| "task_description": task.description, |
| "output_text": output_str, |
| "criteria_text": criteria_text |
| }) |
|
|
| |
| dimension_scores = result.get('dimension_scores', {}) |
|
|
| |
| total_weight = sum(props['weight'] for props in criteria.values()) |
| overall_score = 0.0 |
|
|
| for dim, props in criteria.items(): |
| score = dimension_scores.get(dim, 0.0) |
| weight = props['weight'] |
| overall_score += score * weight |
|
|
| if total_weight > 0: |
| overall_score /= total_weight |
|
|
| |
| valid = all( |
| dimension_scores.get(dim, 0.0) >= props['threshold'] |
| for dim, props in criteria.items() |
| ) |
|
|
| |
| validation_result = ValidationResult( |
| valid=valid, |
| overall_score=overall_score, |
| dimension_scores=dimension_scores, |
| issues=result.get('issues', []), |
| suggestions=result.get('suggestions', []), |
| details=result.get('details', {}), |
| ) |
|
|
| return validation_result |
|
|
| except Exception as e: |
| logger.error(f"Failed to validate with LangChain: {e}") |
| logger.debug(f"Output was: {output_str[:500]}") |
|
|
| |
| return ValidationResult( |
| valid=False, |
| overall_score=0.0, |
| dimension_scores={}, |
| issues=[f"Failed to validate: {str(e)}"], |
| suggestions=["Re-run validation with clearer output"], |
| details={'error': str(e)}, |
| ) |
|
|
| async def suggest_improvements( |
| self, |
| validation_result: ValidationResult, |
| original_output: Any, |
| ) -> List[str]: |
| """ |
| Generate actionable improvement suggestions using LangChain. |
| |
| Args: |
| validation_result: Previous validation result |
| original_output: The output that was validated |
| |
| Returns: |
| List of improvement suggestions |
| """ |
| if validation_result.valid and validation_result.overall_score >= 0.9: |
| return ["Output is excellent. No major improvements needed."] |
|
|
| |
| if validation_result.suggestions and len(validation_result.suggestions) > 0: |
| return validation_result.suggestions |
|
|
| |
| try: |
| output_str = str(original_output)[:1000] |
| |
| result = await self.feedback_chain.ainvoke({ |
| "overall_score": f"{validation_result.overall_score:.2f}", |
| "issues": ", ".join(validation_result.issues), |
| "dimension_scores": json.dumps(validation_result.dimension_scores), |
| "output_text": output_str |
| }) |
|
|
| suggestions = result.get('priority_suggestions', []) |
| next_steps = result.get('next_steps', []) |
|
|
| return suggestions + next_steps |
|
|
| except Exception as e: |
| logger.error(f"Failed to generate suggestions: {e}") |
| |
| |
| suggestions = [] |
| for issue in validation_result.issues: |
| suggestions.append(f"Address: {issue}") |
|
|
| |
| for dim, score in validation_result.dimension_scores.items(): |
| if score < 0.8: |
| suggestions.append(f"Improve {dim}: Current score {score:.2f}, aim for >0.80") |
|
|
| return suggestions |
|
|
| def get_feedback_for_iteration( |
| self, |
| validation_result: ValidationResult, |
| ) -> str: |
| """ |
| Format validation feedback for iterative improvement. |
| |
| Args: |
| validation_result: Validation result |
| |
| Returns: |
| Formatted feedback string |
| """ |
| feedback_parts = [] |
|
|
| |
| if validation_result.valid: |
| feedback_parts.append(f"✓ Output is VALID (score: {validation_result.overall_score:.2f})") |
| else: |
| feedback_parts.append(f"✗ Output is INVALID (score: {validation_result.overall_score:.2f})") |
|
|
| |
| feedback_parts.append("\nQuality Dimensions:") |
| for dim, score in validation_result.dimension_scores.items(): |
| status = "✓" if score >= 0.8 else "✗" |
| feedback_parts.append(f" {status} {dim.capitalize()}: {score:.2f}") |
|
|
| |
| if validation_result.issues: |
| feedback_parts.append("\nIssues Found:") |
| for i, issue in enumerate(validation_result.issues, 1): |
| feedback_parts.append(f" {i}. {issue}") |
|
|
| |
| if validation_result.suggestions: |
| feedback_parts.append("\nSuggestions for Improvement:") |
| for i, suggestion in enumerate(validation_result.suggestions, 1): |
| feedback_parts.append(f" {i}. {suggestion}") |
|
|
| return "\n".join(feedback_parts) |
|
|
| def get_vista_criteria(self, output_type: str) -> Dict[str, Any]: |
| """ |
| Get VISTA quality criteria for a specific output type. |
| |
| Args: |
| output_type: Type of output |
| |
| Returns: |
| Quality criteria dictionary |
| """ |
| return self.QUALITY_CRITERIA.get(output_type, self.QUALITY_CRITERIA['general']) |
|
|