from functools import lru_cache import json from typing import Dict, Any, cast from langchain_google_genai import ChatGoogleGenerativeAI from langchain.output_parsers import PydanticOutputParser from langchain_core.prompts import PromptTemplate from dotenv import load_dotenv from app.db.models import NarratorExtractionResponse, NarratorAnalysisResponse from app.tools.scrape_shamela import ShamelaNarratorExtractor from app.config.constants import EXTRACT_PROMPT, ANALYZE_PROMPT, SYNTHESIS_PROMPT import asyncio load_dotenv() class LLMService: """Service class for LLM operations.""" def __init__(self): self.model_name = "gemini-1.5-flash-latest" self._llm = None @property def llm(self) -> ChatGoogleGenerativeAI: """Lazy initialization of LLM.""" if self._llm is None: self._llm = ChatGoogleGenerativeAI( model=self.model_name, temperature=0.1, # Low temperature for more consistent results max_output_tokens=2048, ) return self._llm async def extract_narrators(self, hadith_text: str) -> NarratorExtractionResponse: """Extract narrators from hadith text.""" try: # Create parser for structured output parser = PydanticOutputParser(pydantic_object=NarratorExtractionResponse) # Create prompt template prompt_template = PromptTemplate( template=EXTRACT_PROMPT, input_variables=["hadith_text"], partial_variables={ "format_instructions": parser.get_format_instructions() }, ) # Create chain chain = prompt_template | self.llm | parser # Invoke chain result = await chain.ainvoke({"hadith_text": hadith_text}) return result except Exception as e: return NarratorExtractionResponse( narrators=[], sanad_chain="", success=False, message=f"Error extracting narrators: {str(e)}", ) async def analyze_narrator(self, narrator_name: str) -> NarratorAnalysisResponse: """Enhanced narrator analyzer agent that uses Shamela scraper and LLM reasoning.""" try: # Step 1: Scrape data from Shamela try: shamela_data = await ShamelaNarratorExtractor.extract_narrator_by_name( narrator_name ) except Exception as shamela_error: shamela_data = {"error": f"Extraction failed: {str(shamela_error)}"} # Step 2: Prepare context for LLM analysis try: shamela_context = self._format_shamela_data(shamela_data) except Exception as format_error: shamela_context = ( f"❌ Failed to format Shamela data: {str(format_error)}" ) # Step 3: Create enhanced prompt with Shamela data try: parser = PydanticOutputParser(pydantic_object=NarratorAnalysisResponse) prompt_template = PromptTemplate( template=ANALYZE_PROMPT, input_variables=["narrator_name", "shamela_context"], partial_variables={ "format_instructions": parser.get_format_instructions() }, ) except Exception as prompt_error: raise prompt_error # Step 4: Invoke the enhanced analysis try: chain = prompt_template | self.llm | parser result = await chain.ainvoke( {"narrator_name": narrator_name, "shamela_context": shamela_context} ) except Exception as chain_error: raise chain_error # Step 5: Enhance the response with metadata try: total_scholars = 0 if ( shamela_data and isinstance(shamela_data, dict) and not shamela_data.get("error") ): metadata = shamela_data.get("extraction_metadata", {}) if isinstance(metadata, dict): total_scholars = metadata.get("total_scholars", 0) result.message = f"Analysis completed using Shamela data ({total_scholars} scholars) + LLM knowledge" result.success = True return result except Exception as metadata_error: return result except Exception as e: return NarratorAnalysisResponse( narrator_name=narrator_name, reliability_grade="Majhul", confidence_level="Low", reasoning=f"Analysis failed due to technical error: {str(e)}", scholarly_consensus="Unable to determine due to system error", known_issues=None, biographical_info="Unable to retrieve information due to error", recommendation="Cannot provide recommendation due to analysis failure", success=False, message=f"Error analyzing narrator: {str(e)}", ) async def analyze_narrator_chain( self, narrator_names: list[str] ) -> Dict[str, NarratorAnalysisResponse]: """Analyze a complete chain of narrators concurrently.""" results: Dict[str, NarratorAnalysisResponse] = {} if not narrator_names: return results print(f"Analyzing chain of {len(narrator_names)} narrators concurrently...") # Fire off all analysis tasks at once tasks = [ asyncio.create_task(self.analyze_narrator(name)) for name in narrator_names ] # Wait for all to complete, capturing exceptions per-task completed = await asyncio.gather(*tasks, return_exceptions=True) for name, outcome in zip(narrator_names, completed): if isinstance(outcome, Exception): print(f"Failed to analyze {name}: {outcome}") results[name] = NarratorAnalysisResponse( narrator_name=name, reliability_grade="Majhul", confidence_level="Low", reasoning=f"Chain analysis failed: {str(outcome)}", scholarly_consensus="Unable to determine", known_issues=None, biographical_info="Error during analysis", recommendation="Cannot recommend due to error", success=False, message=f"Error in chain analysis: {str(outcome)}", ) else: results[name] = cast(NarratorAnalysisResponse, outcome) return results async def synthesize_chain_analysis( self, chain_results: Dict[str, NarratorAnalysisResponse] ) -> Dict[str, Any]: """Synthesize individual narrator analyses into an overall chain assessment.""" try: # Prepare data for synthesis narrator_summaries = [] for name, analysis in chain_results.items(): narrator_summaries.append( { "name": name, "grade": analysis.reliability_grade, "confidence": analysis.confidence_level, "reasoning": ( analysis.reasoning[:200] + "..." if len(analysis.reasoning) > 200 else analysis.reasoning ), "issues": analysis.known_issues, } ) # Create PromptTemplate and invoke LLM prompt_template = PromptTemplate( template=SYNTHESIS_PROMPT, input_variables=["narrator_summaries"], ) summaries_json = json.dumps( narrator_summaries, ensure_ascii=False, indent=2 ) chain = prompt_template | self.llm synthesis_result = await chain.ainvoke( {"narrator_summaries": summaries_json} ) # Normalize synthesis text synthesis_text = getattr(synthesis_result, "content", synthesis_result) return { "overall_assessment": synthesis_text, "individual_results": chain_results, "chain_length": len(chain_results), "success": True, } except Exception as e: return { "overall_assessment": f"Synthesis failed: {str(e)}", "individual_results": chain_results, "chain_length": len(chain_results), "success": False, } def _format_shamela_data(self, narrator_info: Dict[str, Any]) -> str: """Format Shamela data for LLM consumption.""" if not narrator_info or narrator_info.get("error"): return "❌ No data found on Shamela.ws or extraction failed" context_parts = [] # Basic info if narrator_info.get("narrator_name"): context_parts.append( f"**Narrator Name (Shamela):** {narrator_info['narrator_name']}" ) # Biographical information if narrator_info.get("biographical_info"): context_parts.append("**📋 Biographical Information:**") for key, value in narrator_info["biographical_info"].items(): context_parts.append(f" • {key}: {value}") else: context_parts.append("**📋 Biographical Information:** None found") # Scholarly critique if narrator_info.get("scholarly_critique"): context_parts.append( f"**📚 Scholarly Opinions ({len(narrator_info['scholarly_critique'])} scholars):**" ) for i, scholar_critique in enumerate( narrator_info["scholarly_critique"], 1 ): context_parts.append(f"\n {i}. **{scholar_critique['scholar']}:**") for comment in scholar_critique["comments"]: context_parts.append(f" - {comment['text']}") if comment.get("highlighted"): context_parts.append( f" (Highlighted terms: {', '.join(comment['highlighted'])})" ) else: context_parts.append("**📚 Scholarly Opinions:** None found") # Metadata metadata = narrator_info.get("extraction_metadata", {}) context_parts.append(f"\n**📊 Data Quality:**") context_parts.append( f" • Total scholars cited: {metadata.get('total_scholars', 0)}" ) context_parts.append(f" • Total comments: {metadata.get('total_comments', 0)}") context_parts.append( f" • Biographical fields: {metadata.get('biographical_fields', 0)}" ) context_parts.append( f" • Has critique section: {metadata.get('has_critique_section', False)}" ) return "\n".join(context_parts) @lru_cache() def get_llm_service() -> LLMService: """Get cached LLM service instance.""" return LLMService()