SanadLLM / app /agent /services.py
Hydra-Bolt
restructured
eef2a73
from functools import lru_cache
import json
from typing import Dict, Any, cast
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from dotenv import load_dotenv
from app.db.models import NarratorExtractionResponse, NarratorAnalysisResponse
from app.tools.scrape_shamela import ShamelaNarratorExtractor
from app.config.constants import EXTRACT_PROMPT, ANALYZE_PROMPT, SYNTHESIS_PROMPT
import asyncio
load_dotenv()
class LLMService:
"""Service class for LLM operations."""
def __init__(self):
self.model_name = "gemini-1.5-flash-latest"
self._llm = None
@property
def llm(self) -> ChatGoogleGenerativeAI:
"""Lazy initialization of LLM."""
if self._llm is None:
self._llm = ChatGoogleGenerativeAI(
model=self.model_name,
temperature=0.1, # Low temperature for more consistent results
max_output_tokens=2048,
)
return self._llm
async def extract_narrators(self, hadith_text: str) -> NarratorExtractionResponse:
"""Extract narrators from hadith text."""
try:
# Create parser for structured output
parser = PydanticOutputParser(pydantic_object=NarratorExtractionResponse)
# Create prompt template
prompt_template = PromptTemplate(
template=EXTRACT_PROMPT,
input_variables=["hadith_text"],
partial_variables={
"format_instructions": parser.get_format_instructions()
},
)
# Create chain
chain = prompt_template | self.llm | parser
# Invoke chain
result = await chain.ainvoke({"hadith_text": hadith_text})
return result
except Exception as e:
return NarratorExtractionResponse(
narrators=[],
sanad_chain="",
success=False,
message=f"Error extracting narrators: {str(e)}",
)
async def analyze_narrator(self, narrator_name: str) -> NarratorAnalysisResponse:
"""Enhanced narrator analyzer agent that uses Shamela scraper and LLM reasoning."""
try:
# Step 1: Scrape data from Shamela
try:
shamela_data = await ShamelaNarratorExtractor.extract_narrator_by_name(
narrator_name
)
except Exception as shamela_error:
shamela_data = {"error": f"Extraction failed: {str(shamela_error)}"}
# Step 2: Prepare context for LLM analysis
try:
shamela_context = self._format_shamela_data(shamela_data)
except Exception as format_error:
shamela_context = (
f"❌ Failed to format Shamela data: {str(format_error)}"
)
# Step 3: Create enhanced prompt with Shamela data
try:
parser = PydanticOutputParser(pydantic_object=NarratorAnalysisResponse)
prompt_template = PromptTemplate(
template=ANALYZE_PROMPT,
input_variables=["narrator_name", "shamela_context"],
partial_variables={
"format_instructions": parser.get_format_instructions()
},
)
except Exception as prompt_error:
raise prompt_error
# Step 4: Invoke the enhanced analysis
try:
chain = prompt_template | self.llm | parser
result = await chain.ainvoke(
{"narrator_name": narrator_name, "shamela_context": shamela_context}
)
except Exception as chain_error:
raise chain_error
# Step 5: Enhance the response with metadata
try:
total_scholars = 0
if (
shamela_data
and isinstance(shamela_data, dict)
and not shamela_data.get("error")
):
metadata = shamela_data.get("extraction_metadata", {})
if isinstance(metadata, dict):
total_scholars = metadata.get("total_scholars", 0)
result.message = f"Analysis completed using Shamela data ({total_scholars} scholars) + LLM knowledge"
result.success = True
return result
except Exception as metadata_error:
return result
except Exception as e:
return NarratorAnalysisResponse(
narrator_name=narrator_name,
reliability_grade="Majhul",
confidence_level="Low",
reasoning=f"Analysis failed due to technical error: {str(e)}",
scholarly_consensus="Unable to determine due to system error",
known_issues=None,
biographical_info="Unable to retrieve information due to error",
recommendation="Cannot provide recommendation due to analysis failure",
success=False,
message=f"Error analyzing narrator: {str(e)}",
)
async def analyze_narrator_chain(
self, narrator_names: list[str]
) -> Dict[str, NarratorAnalysisResponse]:
"""Analyze a complete chain of narrators concurrently."""
results: Dict[str, NarratorAnalysisResponse] = {}
if not narrator_names:
return results
print(f"Analyzing chain of {len(narrator_names)} narrators concurrently...")
# Fire off all analysis tasks at once
tasks = [
asyncio.create_task(self.analyze_narrator(name)) for name in narrator_names
]
# Wait for all to complete, capturing exceptions per-task
completed = await asyncio.gather(*tasks, return_exceptions=True)
for name, outcome in zip(narrator_names, completed):
if isinstance(outcome, Exception):
print(f"Failed to analyze {name}: {outcome}")
results[name] = NarratorAnalysisResponse(
narrator_name=name,
reliability_grade="Majhul",
confidence_level="Low",
reasoning=f"Chain analysis failed: {str(outcome)}",
scholarly_consensus="Unable to determine",
known_issues=None,
biographical_info="Error during analysis",
recommendation="Cannot recommend due to error",
success=False,
message=f"Error in chain analysis: {str(outcome)}",
)
else:
results[name] = cast(NarratorAnalysisResponse, outcome)
return results
async def synthesize_chain_analysis(
self, chain_results: Dict[str, NarratorAnalysisResponse]
) -> Dict[str, Any]:
"""Synthesize individual narrator analyses into an overall chain assessment."""
try:
# Prepare data for synthesis
narrator_summaries = []
for name, analysis in chain_results.items():
narrator_summaries.append(
{
"name": name,
"grade": analysis.reliability_grade,
"confidence": analysis.confidence_level,
"reasoning": (
analysis.reasoning[:200] + "..."
if len(analysis.reasoning) > 200
else analysis.reasoning
),
"issues": analysis.known_issues,
}
)
# Create PromptTemplate and invoke LLM
prompt_template = PromptTemplate(
template=SYNTHESIS_PROMPT,
input_variables=["narrator_summaries"],
)
summaries_json = json.dumps(
narrator_summaries, ensure_ascii=False, indent=2
)
chain = prompt_template | self.llm
synthesis_result = await chain.ainvoke(
{"narrator_summaries": summaries_json}
)
# Normalize synthesis text
synthesis_text = getattr(synthesis_result, "content", synthesis_result)
return {
"overall_assessment": synthesis_text,
"individual_results": chain_results,
"chain_length": len(chain_results),
"success": True,
}
except Exception as e:
return {
"overall_assessment": f"Synthesis failed: {str(e)}",
"individual_results": chain_results,
"chain_length": len(chain_results),
"success": False,
}
def _format_shamela_data(self, narrator_info: Dict[str, Any]) -> str:
"""Format Shamela data for LLM consumption."""
if not narrator_info or narrator_info.get("error"):
return "❌ No data found on Shamela.ws or extraction failed"
context_parts = []
# Basic info
if narrator_info.get("narrator_name"):
context_parts.append(
f"**Narrator Name (Shamela):** {narrator_info['narrator_name']}"
)
# Biographical information
if narrator_info.get("biographical_info"):
context_parts.append("**πŸ“‹ Biographical Information:**")
for key, value in narrator_info["biographical_info"].items():
context_parts.append(f" β€’ {key}: {value}")
else:
context_parts.append("**πŸ“‹ Biographical Information:** None found")
# Scholarly critique
if narrator_info.get("scholarly_critique"):
context_parts.append(
f"**πŸ“š Scholarly Opinions ({len(narrator_info['scholarly_critique'])} scholars):**"
)
for i, scholar_critique in enumerate(
narrator_info["scholarly_critique"], 1
):
context_parts.append(f"\n {i}. **{scholar_critique['scholar']}:**")
for comment in scholar_critique["comments"]:
context_parts.append(f" - {comment['text']}")
if comment.get("highlighted"):
context_parts.append(
f" (Highlighted terms: {', '.join(comment['highlighted'])})"
)
else:
context_parts.append("**πŸ“š Scholarly Opinions:** None found")
# Metadata
metadata = narrator_info.get("extraction_metadata", {})
context_parts.append(f"\n**πŸ“Š Data Quality:**")
context_parts.append(
f" β€’ Total scholars cited: {metadata.get('total_scholars', 0)}"
)
context_parts.append(f" β€’ Total comments: {metadata.get('total_comments', 0)}")
context_parts.append(
f" β€’ Biographical fields: {metadata.get('biographical_fields', 0)}"
)
context_parts.append(
f" β€’ Has critique section: {metadata.get('has_critique_section', False)}"
)
return "\n".join(context_parts)
@lru_cache()
def get_llm_service() -> LLMService:
"""Get cached LLM service instance."""
return LLMService()