| import asyncio |
| import uuid |
| from collections.abc import Sequence |
|
|
| from pydantic import BaseModel, ValidationError |
|
|
| from app import prompts |
| from app.domain import TranscriptAnalysis |
| from app.llm_schema import LLMTranscriptAnalysisResponse |
| from app.errors import AnalysisNotFoundError, InvalidTranscriptError, LLMCompletionError |
| from app.ports import LLm |
| from app.repositories import TranscriptAnalysisRepository |
|
|
|
|
| class TranscriptAnalysisService: |
| def __init__(self, llm: LLm, repository: TranscriptAnalysisRepository) -> None: |
| self._llm = llm |
| self._repository = repository |
|
|
| def analyze(self, transcript: str) -> TranscriptAnalysis: |
| clean_transcript = self._validate_transcript(transcript) |
| analysis = self._create_analysis(clean_transcript) |
| self._repository.save(analysis) |
| return analysis |
|
|
| async def analyze_many(self, transcripts: Sequence[str]) -> list[TranscriptAnalysis]: |
| if not transcripts: |
| raise InvalidTranscriptError("At least one transcript is required.") |
|
|
| clean_transcripts = [self._validate_transcript(transcript) for transcript in transcripts] |
| analyses = await asyncio.gather( |
| *(self._create_analysis_async(transcript) for transcript in clean_transcripts) |
| ) |
|
|
| for analysis in analyses: |
| self._repository.save(analysis) |
|
|
| return list(analyses) |
|
|
| def get(self, analysis_id: str) -> TranscriptAnalysis: |
| analysis = self._repository.get(analysis_id) |
| if analysis is None: |
| raise AnalysisNotFoundError(f"Transcript analysis '{analysis_id}' was not found.") |
| return analysis |
|
|
| def _create_analysis(self, transcript: str) -> TranscriptAnalysis: |
| response = self._run_completion(transcript) |
| return self._build_analysis(response) |
|
|
| async def _create_analysis_async(self, transcript: str) -> TranscriptAnalysis: |
| response = await self._run_completion_async(transcript) |
| return self._build_analysis(response) |
|
|
| @staticmethod |
| def _build_analysis(response: LLMTranscriptAnalysisResponse) -> TranscriptAnalysis: |
| return TranscriptAnalysis( |
| id=str(uuid.uuid4()), |
| summary=response.summary, |
| action_items=tuple(response.action_items), |
| ) |
|
|
| def _run_completion(self, transcript: str) -> LLMTranscriptAnalysisResponse: |
| user_prompt = prompts.RAW_USER_PROMPT.format(transcript=transcript) |
|
|
| try: |
| completion = self._llm.run_completion( |
| prompts.SYSTEM_PROMPT, |
| user_prompt, |
| LLMTranscriptAnalysisResponse, |
| ) |
| except Exception as exc: |
| raise LLMCompletionError("Transcript analysis failed.") from exc |
|
|
| return self._parse_completion_response(completion) |
|
|
| async def _run_completion_async(self, transcript: str) -> LLMTranscriptAnalysisResponse: |
| user_prompt = prompts.RAW_USER_PROMPT.format(transcript=transcript) |
|
|
| try: |
| completion = await self._llm.run_completion_async( |
| prompts.SYSTEM_PROMPT, |
| user_prompt, |
| LLMTranscriptAnalysisResponse, |
| ) |
| except Exception as exc: |
| raise LLMCompletionError("Transcript analysis failed.") from exc |
|
|
| return self._parse_completion_response(completion) |
|
|
| @staticmethod |
| def _parse_completion_response(completion: BaseModel | object) -> LLMTranscriptAnalysisResponse: |
| try: |
| if isinstance(completion, BaseModel): |
| return LLMTranscriptAnalysisResponse.model_validate(completion.model_dump()) |
| return LLMTranscriptAnalysisResponse.model_validate(completion) |
| except ValidationError as exc: |
| raise LLMCompletionError("Transcript analysis returned an invalid response.") from exc |
|
|
| @staticmethod |
| def _validate_transcript(transcript: str) -> str: |
| clean_transcript = transcript.strip() |
| if not clean_transcript: |
| raise InvalidTranscriptError("Transcript cannot be empty.") |
| return clean_transcript |
|
|