|
|
from functools import lru_cache |
|
|
import json |
|
|
from typing import Dict, Any, cast |
|
|
|
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
from langchain.output_parsers import PydanticOutputParser |
|
|
from langchain_core.prompts import PromptTemplate |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
from app.db.models import NarratorExtractionResponse, NarratorAnalysisResponse |
|
|
from app.tools.scrape_shamela import ShamelaNarratorExtractor |
|
|
|
|
|
from app.config.constants import EXTRACT_PROMPT, ANALYZE_PROMPT, SYNTHESIS_PROMPT |
|
|
import asyncio |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
class LLMService: |
|
|
"""Service class for LLM operations.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.model_name = "gemini-1.5-flash-latest" |
|
|
self._llm = None |
|
|
|
|
|
@property |
|
|
def llm(self) -> ChatGoogleGenerativeAI: |
|
|
"""Lazy initialization of LLM.""" |
|
|
if self._llm is None: |
|
|
self._llm = ChatGoogleGenerativeAI( |
|
|
model=self.model_name, |
|
|
temperature=0.1, |
|
|
max_output_tokens=2048, |
|
|
) |
|
|
return self._llm |
|
|
|
|
|
async def extract_narrators(self, hadith_text: str) -> NarratorExtractionResponse: |
|
|
"""Extract narrators from hadith text.""" |
|
|
try: |
|
|
|
|
|
parser = PydanticOutputParser(pydantic_object=NarratorExtractionResponse) |
|
|
|
|
|
|
|
|
prompt_template = PromptTemplate( |
|
|
template=EXTRACT_PROMPT, |
|
|
input_variables=["hadith_text"], |
|
|
partial_variables={ |
|
|
"format_instructions": parser.get_format_instructions() |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
chain = prompt_template | self.llm | parser |
|
|
|
|
|
|
|
|
result = await chain.ainvoke({"hadith_text": hadith_text}) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return NarratorExtractionResponse( |
|
|
narrators=[], |
|
|
sanad_chain="", |
|
|
success=False, |
|
|
message=f"Error extracting narrators: {str(e)}", |
|
|
) |
|
|
|
|
|
async def analyze_narrator(self, narrator_name: str) -> NarratorAnalysisResponse: |
|
|
"""Enhanced narrator analyzer agent that uses Shamela scraper and LLM reasoning.""" |
|
|
try: |
|
|
|
|
|
try: |
|
|
shamela_data = await ShamelaNarratorExtractor.extract_narrator_by_name( |
|
|
narrator_name |
|
|
) |
|
|
except Exception as shamela_error: |
|
|
shamela_data = {"error": f"Extraction failed: {str(shamela_error)}"} |
|
|
|
|
|
|
|
|
try: |
|
|
shamela_context = self._format_shamela_data(shamela_data) |
|
|
except Exception as format_error: |
|
|
shamela_context = ( |
|
|
f"β Failed to format Shamela data: {str(format_error)}" |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
parser = PydanticOutputParser(pydantic_object=NarratorAnalysisResponse) |
|
|
prompt_template = PromptTemplate( |
|
|
template=ANALYZE_PROMPT, |
|
|
input_variables=["narrator_name", "shamela_context"], |
|
|
partial_variables={ |
|
|
"format_instructions": parser.get_format_instructions() |
|
|
}, |
|
|
) |
|
|
except Exception as prompt_error: |
|
|
raise prompt_error |
|
|
|
|
|
|
|
|
try: |
|
|
chain = prompt_template | self.llm | parser |
|
|
result = await chain.ainvoke( |
|
|
{"narrator_name": narrator_name, "shamela_context": shamela_context} |
|
|
) |
|
|
except Exception as chain_error: |
|
|
raise chain_error |
|
|
|
|
|
|
|
|
try: |
|
|
total_scholars = 0 |
|
|
if ( |
|
|
shamela_data |
|
|
and isinstance(shamela_data, dict) |
|
|
and not shamela_data.get("error") |
|
|
): |
|
|
metadata = shamela_data.get("extraction_metadata", {}) |
|
|
if isinstance(metadata, dict): |
|
|
total_scholars = metadata.get("total_scholars", 0) |
|
|
result.message = f"Analysis completed using Shamela data ({total_scholars} scholars) + LLM knowledge" |
|
|
result.success = True |
|
|
return result |
|
|
except Exception as metadata_error: |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return NarratorAnalysisResponse( |
|
|
narrator_name=narrator_name, |
|
|
reliability_grade="Majhul", |
|
|
confidence_level="Low", |
|
|
reasoning=f"Analysis failed due to technical error: {str(e)}", |
|
|
scholarly_consensus="Unable to determine due to system error", |
|
|
known_issues=None, |
|
|
biographical_info="Unable to retrieve information due to error", |
|
|
recommendation="Cannot provide recommendation due to analysis failure", |
|
|
success=False, |
|
|
message=f"Error analyzing narrator: {str(e)}", |
|
|
) |
|
|
|
|
|
async def analyze_narrator_chain( |
|
|
self, narrator_names: list[str] |
|
|
) -> Dict[str, NarratorAnalysisResponse]: |
|
|
"""Analyze a complete chain of narrators concurrently.""" |
|
|
|
|
|
results: Dict[str, NarratorAnalysisResponse] = {} |
|
|
|
|
|
if not narrator_names: |
|
|
return results |
|
|
|
|
|
print(f"Analyzing chain of {len(narrator_names)} narrators concurrently...") |
|
|
|
|
|
|
|
|
tasks = [ |
|
|
asyncio.create_task(self.analyze_narrator(name)) for name in narrator_names |
|
|
] |
|
|
|
|
|
|
|
|
completed = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
for name, outcome in zip(narrator_names, completed): |
|
|
if isinstance(outcome, Exception): |
|
|
print(f"Failed to analyze {name}: {outcome}") |
|
|
results[name] = NarratorAnalysisResponse( |
|
|
narrator_name=name, |
|
|
reliability_grade="Majhul", |
|
|
confidence_level="Low", |
|
|
reasoning=f"Chain analysis failed: {str(outcome)}", |
|
|
scholarly_consensus="Unable to determine", |
|
|
known_issues=None, |
|
|
biographical_info="Error during analysis", |
|
|
recommendation="Cannot recommend due to error", |
|
|
success=False, |
|
|
message=f"Error in chain analysis: {str(outcome)}", |
|
|
) |
|
|
else: |
|
|
results[name] = cast(NarratorAnalysisResponse, outcome) |
|
|
|
|
|
return results |
|
|
|
|
|
async def synthesize_chain_analysis( |
|
|
self, chain_results: Dict[str, NarratorAnalysisResponse] |
|
|
) -> Dict[str, Any]: |
|
|
"""Synthesize individual narrator analyses into an overall chain assessment.""" |
|
|
try: |
|
|
|
|
|
narrator_summaries = [] |
|
|
for name, analysis in chain_results.items(): |
|
|
narrator_summaries.append( |
|
|
{ |
|
|
"name": name, |
|
|
"grade": analysis.reliability_grade, |
|
|
"confidence": analysis.confidence_level, |
|
|
"reasoning": ( |
|
|
analysis.reasoning[:200] + "..." |
|
|
if len(analysis.reasoning) > 200 |
|
|
else analysis.reasoning |
|
|
), |
|
|
"issues": analysis.known_issues, |
|
|
} |
|
|
) |
|
|
|
|
|
prompt_template = PromptTemplate( |
|
|
template=SYNTHESIS_PROMPT, |
|
|
input_variables=["narrator_summaries"], |
|
|
) |
|
|
|
|
|
summaries_json = json.dumps( |
|
|
narrator_summaries, ensure_ascii=False, indent=2 |
|
|
) |
|
|
chain = prompt_template | self.llm |
|
|
synthesis_result = await chain.ainvoke( |
|
|
{"narrator_summaries": summaries_json} |
|
|
) |
|
|
|
|
|
|
|
|
synthesis_text = getattr(synthesis_result, "content", synthesis_result) |
|
|
|
|
|
return { |
|
|
"overall_assessment": synthesis_text, |
|
|
"individual_results": chain_results, |
|
|
"chain_length": len(chain_results), |
|
|
"success": True, |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"overall_assessment": f"Synthesis failed: {str(e)}", |
|
|
"individual_results": chain_results, |
|
|
"chain_length": len(chain_results), |
|
|
"success": False, |
|
|
} |
|
|
|
|
|
def _format_shamela_data(self, narrator_info: Dict[str, Any]) -> str: |
|
|
"""Format Shamela data for LLM consumption.""" |
|
|
if not narrator_info or narrator_info.get("error"): |
|
|
return "β No data found on Shamela.ws or extraction failed" |
|
|
|
|
|
context_parts = [] |
|
|
|
|
|
|
|
|
if narrator_info.get("narrator_name"): |
|
|
context_parts.append( |
|
|
f"**Narrator Name (Shamela):** {narrator_info['narrator_name']}" |
|
|
) |
|
|
|
|
|
|
|
|
if narrator_info.get("biographical_info"): |
|
|
context_parts.append("**π Biographical Information:**") |
|
|
for key, value in narrator_info["biographical_info"].items(): |
|
|
context_parts.append(f" β’ {key}: {value}") |
|
|
else: |
|
|
context_parts.append("**π Biographical Information:** None found") |
|
|
|
|
|
|
|
|
if narrator_info.get("scholarly_critique"): |
|
|
context_parts.append( |
|
|
f"**π Scholarly Opinions ({len(narrator_info['scholarly_critique'])} scholars):**" |
|
|
) |
|
|
for i, scholar_critique in enumerate( |
|
|
narrator_info["scholarly_critique"], 1 |
|
|
): |
|
|
context_parts.append(f"\n {i}. **{scholar_critique['scholar']}:**") |
|
|
for comment in scholar_critique["comments"]: |
|
|
context_parts.append(f" - {comment['text']}") |
|
|
if comment.get("highlighted"): |
|
|
context_parts.append( |
|
|
f" (Highlighted terms: {', '.join(comment['highlighted'])})" |
|
|
) |
|
|
else: |
|
|
context_parts.append("**π Scholarly Opinions:** None found") |
|
|
|
|
|
|
|
|
metadata = narrator_info.get("extraction_metadata", {}) |
|
|
context_parts.append(f"\n**π Data Quality:**") |
|
|
context_parts.append( |
|
|
f" β’ Total scholars cited: {metadata.get('total_scholars', 0)}" |
|
|
) |
|
|
context_parts.append(f" β’ Total comments: {metadata.get('total_comments', 0)}") |
|
|
context_parts.append( |
|
|
f" β’ Biographical fields: {metadata.get('biographical_fields', 0)}" |
|
|
) |
|
|
context_parts.append( |
|
|
f" β’ Has critique section: {metadata.get('has_critique_section', False)}" |
|
|
) |
|
|
|
|
|
return "\n".join(context_parts) |
|
|
|
|
|
|
|
|
@lru_cache() |
|
|
def get_llm_service() -> LLMService: |
|
|
"""Get cached LLM service instance.""" |
|
|
return LLMService() |
|
|
|