| | """Ad-hoc analysis endpoints (paste/upload text, PDF).""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import logging |
| | import io |
| | import re |
| | import json |
| | from datetime import datetime |
| | from typing import Any, Dict, List, Optional |
| |
|
| | from fastapi import APIRouter, File, Form, HTTPException, UploadFile |
| | from pydantic import BaseModel, Field |
| |
|
| | from .conversation_service import run_resource_agent_analysis |
| | from .storage_service import get_run_store, get_persona_store |
| | from config.settings import get_settings |
| | from backend.storage import RunRecord |
| |
|
| | router = APIRouter(prefix="", tags=["analysis"]) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class ExportMessage(BaseModel): |
| | role: str |
| | persona: Optional[str] = None |
| | time: Optional[str] = None |
| | text: str |
| |
|
| |
|
| | class AnalyzeTextRequest(BaseModel): |
| | text: str = Field(..., description="Raw transcript text to analyze") |
| | conversation_id: Optional[str] = Field(default=None, description="Optional client-generated id for this analysis run") |
| | source_name: Optional[str] = Field(default=None, description="Optional label for the uploaded/pasted source") |
| | analysis_attributes: Optional[List[str]] = Field( |
| | default=None, |
| | description="(Deprecated) analysis attributes are now configured per-pass server-side", |
| | ) |
| | top_down_codebook_template_id: Optional[str] = Field( |
| | default=None, |
| | description="Top-down codebook template id to use for analysis (optional).", |
| | ) |
| |
|
| |
|
| | class AnalyzeTextResponse(BaseModel): |
| | run_id: Optional[str] = None |
| | persisted: bool = False |
| | conversation_id: str |
| | messages: List[ExportMessage] |
| | resources: Dict[str, Any] |
| |
|
| |
|
| | def _parse_transcript_text(text: str, source_name: Optional[str]) -> List[Dict[str, Any]]: |
| | normalized = (text or "").replace("\r\n", "\n").replace("\r", "\n").strip() |
| | if not normalized: |
| | return [] |
| |
|
| | label = source_name or "Uploaded transcript" |
| | lines = [line.rstrip() for line in normalized.split("\n")] |
| | labeled = False |
| | blocks: List[Dict[str, Any]] = [] |
| |
|
| | current_role: Optional[str] = None |
| | current_lines: List[str] = [] |
| |
|
| | def flush(): |
| | nonlocal current_role, current_lines |
| | content = "\n".join([l for l in current_lines]).strip() |
| | if content: |
| | role = current_role or "transcript" |
| | persona = "Surveyor" if role == "surveyor" else ("Patient" if role == "patient" else label) |
| | blocks.append({ |
| | "role": role, |
| | "persona": persona, |
| | "content": content, |
| | }) |
| | current_role = None |
| | current_lines = [] |
| |
|
| | pattern = re.compile(r"^(surveyor|interviewer|patient|respondent)\s*:\s*(.*)$", re.IGNORECASE) |
| |
|
| | for line in lines: |
| | stripped = line.strip() |
| | if not stripped: |
| | if current_lines: |
| | current_lines.append("") |
| | continue |
| |
|
| | match = pattern.match(stripped) |
| | if match: |
| | labeled = True |
| | flush() |
| | speaker = match.group(1).lower() |
| | current_role = "surveyor" if speaker in ("surveyor", "interviewer") else "patient" |
| | remainder = match.group(2).strip() |
| | if remainder: |
| | current_lines.append(remainder) |
| | continue |
| |
|
| | if current_role is None: |
| | current_role = "transcript" |
| | current_lines.append(line) |
| |
|
| | flush() |
| |
|
| | if labeled: |
| | return blocks |
| |
|
| | paragraphs = [p.strip() for p in re.split(r"\n\s*\n+", normalized) if p.strip()] |
| | return [{ |
| | "role": "transcript", |
| | "persona": label, |
| | "content": p, |
| | } for p in paragraphs] or [{ |
| | "role": "transcript", |
| | "persona": label, |
| | "content": normalized, |
| | }] |
| |
|
| |
|
| | async def _analyze_from_text( |
| | *, |
| | text: str, |
| | conversation_id: str, |
| | source_name: Optional[str], |
| | analysis_attributes: Optional[List[str]] = None, |
| | top_down_codebook_template_id: Optional[str] = None, |
| | ) -> AnalyzeTextResponse: |
| | settings = get_settings() |
| | exported_at = datetime.now().isoformat() |
| |
|
| | parsed_messages = _parse_transcript_text(text, source_name) |
| | if not parsed_messages: |
| | raise HTTPException(status_code=400, detail="No content to analyze") |
| |
|
| | transcript: List[Dict[str, Any]] = [] |
| | ui_messages: List[ExportMessage] = [] |
| | for idx, msg in enumerate(parsed_messages): |
| | transcript.append({ |
| | "index": idx, |
| | "role": msg["role"], |
| | "persona": msg.get("persona"), |
| | "content": msg["content"], |
| | "timestamp": exported_at, |
| | }) |
| | ui_messages.append(ExportMessage( |
| | role=msg["role"], |
| | persona=msg.get("persona"), |
| | time=exported_at, |
| | text=msg["content"], |
| | )) |
| |
|
| | store = get_persona_store() |
| | effective_analysis_system_prompt = await store.get_setting("analysis_system_prompt") |
| | override = top_down_codebook_template_id.strip() if isinstance(top_down_codebook_template_id, str) else "" |
| | template_id = await store.get_setting("top_down_codebook_template_id") |
| | template_id_str = template_id.strip() if isinstance(template_id, str) else "" |
| | template_record = await store.get_analysis_template(override, include_deleted=False) if override else None |
| | if not template_record and template_id_str: |
| | template_record = await store.get_analysis_template(template_id_str, include_deleted=False) |
| | if not template_record and template_id_str: |
| | raise HTTPException(status_code=500, detail="Default analysis framework template not found") |
| | resources = await run_resource_agent_analysis( |
| | transcript=transcript, |
| | llm_backend=settings.llm.backend, |
| | host=settings.llm.host, |
| | model=settings.llm.model, |
| | settings=settings, |
| | analysis_system_prompt=effective_analysis_system_prompt if isinstance(effective_analysis_system_prompt, str) else None, |
| | bottom_up_instructions=template_record.bottom_up_instructions if template_record else None, |
| | bottom_up_attributes=template_record.bottom_up_attributes if template_record else None, |
| | rubric_instructions=template_record.rubric_instructions if template_record else None, |
| | rubric_attributes=template_record.rubric_attributes if template_record else None, |
| | top_down_instructions=template_record.top_down_instructions if template_record else None, |
| | top_down_attributes=template_record.top_down_attributes if template_record else None, |
| | top_down_template_id=template_record.template_id if template_record else template_id_str, |
| | top_down_template_version_id=template_record.current_version_id if template_record else "", |
| | top_down_template_categories=template_record.categories if template_record else [], |
| | ) |
| |
|
| | persisted = False |
| | run_id = None |
| | try: |
| | store = get_run_store() |
| | run_id = conversation_id |
| | config_snapshot: Dict[str, Any] = { |
| | "llm": { |
| | "backend": settings.llm.backend, |
| | "host": settings.llm.host, |
| | "model": settings.llm.model, |
| | "timeout": settings.llm.timeout, |
| | "max_retries": settings.llm.max_retries, |
| | "retry_delay": settings.llm.retry_delay, |
| | }, |
| | "text_analysis": { |
| | "source_name": source_name, |
| | }, |
| | "analysis": { |
| | "analysis_system_prompt": effective_analysis_system_prompt if isinstance(effective_analysis_system_prompt, str) else None, |
| | "bottom_up_instructions": template_record.bottom_up_instructions if template_record else None, |
| | "bottom_up_attributes": template_record.bottom_up_attributes if template_record else None, |
| | "rubric_instructions": template_record.rubric_instructions if template_record else None, |
| | "rubric_attributes": template_record.rubric_attributes if template_record else None, |
| | "top_down_instructions": template_record.top_down_instructions if template_record else None, |
| | "top_down_attributes": template_record.top_down_attributes if template_record else None, |
| | "top_down_codebook_template_id": template_record.template_id if template_record else template_id_str, |
| | "top_down_codebook_template_version_id": template_record.current_version_id if template_record else "", |
| | "top_down_codebook_template_snapshot": template_record.categories if template_record else [], |
| | }, |
| | } |
| | record = RunRecord( |
| | run_id=run_id, |
| | mode="text_analysis", |
| | status="completed", |
| | created_at=exported_at, |
| | ended_at=exported_at, |
| | sealed_at=exported_at, |
| | title=None, |
| | input_summary=source_name, |
| | config=config_snapshot, |
| | messages=transcript, |
| | analyses={"resource_agent_v2": resources}, |
| | persona_snapshots={}, |
| | ) |
| | await store.save_sealed_run(record) |
| | persisted = True |
| | except Exception as e: |
| | logger.error(f"Failed to persist sealed text analysis {conversation_id}: {e}") |
| | persisted = False |
| | run_id = None |
| |
|
| | return AnalyzeTextResponse( |
| | run_id=run_id, |
| | persisted=persisted, |
| | conversation_id=conversation_id, |
| | messages=ui_messages, |
| | resources=resources, |
| | ) |
| |
|
| |
|
| | @router.post("/analyze/text") |
| | async def analyze_text(payload: AnalyzeTextRequest) -> AnalyzeTextResponse: |
| | if not isinstance(payload.text, str) or not payload.text.strip(): |
| | raise HTTPException(status_code=400, detail="text is required") |
| |
|
| | conversation_id = payload.conversation_id or f"analysis_{int(datetime.now().timestamp())}" |
| | return await _analyze_from_text( |
| | text=payload.text, |
| | conversation_id=conversation_id, |
| | source_name=payload.source_name, |
| | analysis_attributes=payload.analysis_attributes, |
| | top_down_codebook_template_id=payload.top_down_codebook_template_id, |
| | ) |
| |
|
| |
|
| | @router.post("/analyze/file") |
| | async def analyze_file( |
| | file: UploadFile = File(...), |
| | conversation_id: Optional[str] = Form(default=None), |
| | source_name: Optional[str] = Form(default=None), |
| | analysis_attributes_json: Optional[str] = Form(default=None), |
| | top_down_codebook_template_id: Optional[str] = Form(default=None), |
| | ) -> AnalyzeTextResponse: |
| | data = await file.read() |
| | if not data: |
| | raise HTTPException(status_code=400, detail="Empty file") |
| |
|
| | inferred_name = source_name or file.filename or "Uploaded file" |
| | cid = conversation_id or f"analysis_{int(datetime.now().timestamp())}" |
| | analysis_attributes: Optional[List[str]] = None |
| | if isinstance(analysis_attributes_json, str) and analysis_attributes_json.strip(): |
| | try: |
| | parsed = json.loads(analysis_attributes_json) |
| | if isinstance(parsed, list): |
| | analysis_attributes = [str(x).strip() for x in parsed if isinstance(x, str) and str(x).strip()] |
| | except Exception: |
| | analysis_attributes = None |
| |
|
| | filename = (file.filename or "").lower() |
| | content_type = (file.content_type or "").lower() |
| |
|
| | is_pdf = filename.endswith(".pdf") or content_type == "application/pdf" |
| | if is_pdf: |
| | try: |
| | from pypdf import PdfReader |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"pypdf not available: {e}") |
| |
|
| | try: |
| | reader = PdfReader(io.BytesIO(data)) |
| | chunks: List[str] = [] |
| | for page in reader.pages: |
| | page_text = (page.extract_text() or "").strip() |
| | if page_text: |
| | chunks.append(page_text) |
| | extracted = "\n\n".join(chunks).strip() |
| | except Exception as e: |
| | raise HTTPException(status_code=400, detail=f"Failed to parse PDF: {e}") |
| |
|
| | if not extracted: |
| | raise HTTPException(status_code=400, detail="No extractable text found in PDF") |
| |
|
| | return await _analyze_from_text( |
| | text=extracted, |
| | conversation_id=cid, |
| | source_name=inferred_name, |
| | analysis_attributes=analysis_attributes, |
| | top_down_codebook_template_id=top_down_codebook_template_id, |
| | ) |
| |
|
| | decoded = data.decode("utf-8", errors="replace").strip() |
| | if not decoded: |
| | raise HTTPException(status_code=400, detail="No text content found in file") |
| |
|
| | return await _analyze_from_text( |
| | text=decoded, |
| | conversation_id=cid, |
| | source_name=inferred_name, |
| | analysis_attributes=analysis_attributes, |
| | top_down_codebook_template_id=top_down_codebook_template_id, |
| | ) |
| |
|