File size: 4,111 Bytes
62151d3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | import asyncio
import uuid
from collections.abc import Sequence
from pydantic import BaseModel, ValidationError
from app import prompts
from app.domain import TranscriptAnalysis
from app.llm_schema import LLMTranscriptAnalysisResponse
from app.errors import AnalysisNotFoundError, InvalidTranscriptError, LLMCompletionError
from app.ports import LLm
from app.repositories import TranscriptAnalysisRepository
class TranscriptAnalysisService:
def __init__(self, llm: LLm, repository: TranscriptAnalysisRepository) -> None:
self._llm = llm
self._repository = repository
def analyze(self, transcript: str) -> TranscriptAnalysis:
clean_transcript = self._validate_transcript(transcript)
analysis = self._create_analysis(clean_transcript)
self._repository.save(analysis)
return analysis
async def analyze_many(self, transcripts: Sequence[str]) -> list[TranscriptAnalysis]:
if not transcripts:
raise InvalidTranscriptError("At least one transcript is required.")
clean_transcripts = [self._validate_transcript(transcript) for transcript in transcripts]
analyses = await asyncio.gather(
*(self._create_analysis_async(transcript) for transcript in clean_transcripts)
)
for analysis in analyses:
self._repository.save(analysis)
return list(analyses)
def get(self, analysis_id: str) -> TranscriptAnalysis:
analysis = self._repository.get(analysis_id)
if analysis is None:
raise AnalysisNotFoundError(f"Transcript analysis '{analysis_id}' was not found.")
return analysis
def _create_analysis(self, transcript: str) -> TranscriptAnalysis:
response = self._run_completion(transcript)
return self._build_analysis(response)
async def _create_analysis_async(self, transcript: str) -> TranscriptAnalysis:
response = await self._run_completion_async(transcript)
return self._build_analysis(response)
@staticmethod
def _build_analysis(response: LLMTranscriptAnalysisResponse) -> TranscriptAnalysis:
return TranscriptAnalysis(
id=str(uuid.uuid4()),
summary=response.summary,
action_items=tuple(response.action_items),
)
def _run_completion(self, transcript: str) -> LLMTranscriptAnalysisResponse:
user_prompt = prompts.RAW_USER_PROMPT.format(transcript=transcript)
try:
completion = self._llm.run_completion(
prompts.SYSTEM_PROMPT,
user_prompt,
LLMTranscriptAnalysisResponse,
)
except Exception as exc:
raise LLMCompletionError("Transcript analysis failed.") from exc
return self._parse_completion_response(completion)
async def _run_completion_async(self, transcript: str) -> LLMTranscriptAnalysisResponse:
user_prompt = prompts.RAW_USER_PROMPT.format(transcript=transcript)
try:
completion = await self._llm.run_completion_async(
prompts.SYSTEM_PROMPT,
user_prompt,
LLMTranscriptAnalysisResponse,
)
except Exception as exc:
raise LLMCompletionError("Transcript analysis failed.") from exc
return self._parse_completion_response(completion)
@staticmethod
def _parse_completion_response(completion: BaseModel | object) -> LLMTranscriptAnalysisResponse:
try:
if isinstance(completion, BaseModel):
return LLMTranscriptAnalysisResponse.model_validate(completion.model_dump())
return LLMTranscriptAnalysisResponse.model_validate(completion)
except ValidationError as exc:
raise LLMCompletionError("Transcript analysis returned an invalid response.") from exc
@staticmethod
def _validate_transcript(transcript: str) -> str:
clean_transcript = transcript.strip()
if not clean_transcript:
raise InvalidTranscriptError("Transcript cannot be empty.")
return clean_transcript
|