"""Chat API: start a CCAI conversation, stream SSE, drive failsafe-pause continues, and export results. """ from __future__ import annotations import csv import io import json import logging import time from fastapi import APIRouter, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field from app.config import settings from app.middleware.rate_limit import ( DAILY_LIMIT, check_rate_limit, record_conversation, ) from typing import Any from app.clients.hana_client import hana_client from app.services import human_io from app.services.credential import ( build_human_credential_from_profile, normalize_one_credential, ) from app.services.extra_personas import EXTRA_PERSONAS, get_extra_persona from app.services.json_calls import orchestrator_call from app.services.resilience import build_substitution_chain from app.services.prompts import ( CREDENTIAL_INTAKE_EMPTY_TRANSCRIPT, CREDENTIAL_INTAKE_TURN_PROMPT, ) from app.services.models import ( CONVERSATION_LIMIT_BOUNDS, CONVERSATION_LIMIT_DESCRIPTIONS, ConversationLimits, DEFAULT_MAX_PARTICIPANTS, MAX_MAX_PARTICIPANTS, MIN_MAX_PARTICIPANTS, Participant, Phase, Session, clamp_conversation_limits, ) from app.services.auto_select import auto_select_participants from app.services.model_recommend import suggest_model_for_persona from app.services.prompts.catalog import build_prompt_catalog from app.services.orchestrator import ( create_session, get_session, run_conversation, ) from app.services.persona import ( generate_role_prompt, generate_role_prompt_freeform, ) router = APIRouter() LOG = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Request models # --------------------------------------------------------------------------- class GenerateRoleRequest(BaseModel): """Structured persona fields → role prompt. `model_id` is ignored (legacy).""" model_id: str = "" name: str = "" profile: str = "" identity: str = "" samples: str = "" role_style: str = "exact" orchestrator_model_id: str | None = None class GenerateRoleFreeformRequest(BaseModel): """Freeform persona text → role prompt. `model_id` is ignored (legacy).""" model_id: str = "" name: str = "" text: str = "" role_style: str = "ai_completed" orchestrator_model_id: str | None = None class AvailableModelPayload(BaseModel): """One row of the builder's live model list (from allModelsFlat).""" id: str name: str = "" provider: str = "" kind: str = "provider" class PanelMemberPayload(BaseModel): """Another participant already in the panel (for diversity hints).""" name: str = "" model_id: str = "" provider: str = "" class SuggestModelRequest(BaseModel): """Body of POST /api/chat/suggest-model.""" persona_name: str = "" source_text: str = "" role_prompt: str = "" available_models: list[AvailableModelPayload] panel_context: list[PanelMemberPayload] = Field(default_factory=list) orchestrator_model_id: str | None = None class SetOrchestratorRequest(BaseModel): model_id: str class SetSpeedPriorityRequest(BaseModel): enabled: bool class ExpertPersonaPayload(BaseModel): """Expert Persona created by the user via the popup. Already carries a finished `role_prompt` (the frontend calls /generate-role-* for that).""" participant_id: str name: str model_id: str role_prompt: str class ParticipantSelectionPayload(BaseModel): """Reference to a participant the user has chosen for this conversation.""" participant_id: str kind: str # "neon" | "extra" | "expert" # For Neon entries: the model_id IS the persona id (neon:model@ver:persona) # For extra/expert: defaults to the persona's bound model_id, but the # user can override via per-participant model_assignments. name: str role_prompt: str | None = None model_id_override: str | None = None class AutoSelectCandidate(BaseModel): """One row of the candidate pool sent to the auto-select endpoint.""" participant_id: str name: str role_prompt: str = "" kind: str = "" model_id: str = "" class AutoSelectRequest(BaseModel): """Body of POST /api/chat/auto-select-participants.""" question: str count: int = 5 candidates: list[AutoSelectCandidate] # Optional: pin the orchestrator model used for ranking. Defaults # to the configured global orchestrator model. orchestrator_model_id: str | None = None class HumanCredentialPayload(BaseModel): """Structured credential summary for the in-the-loop human. Generated from the user's profile text via /credentials/from-profile. The orchestrator prepends this entry to the LLM-built credential summary so the human always appears first in the modal / exports. """ participant_id: str name: str expertise: str = "" personality: str = "" credibility_for_question: float = 0.5 bias_to_watch: str = "" class StartChatRequest(BaseModel): question: str | None = None participants: list[ParticipantSelectionPayload] expert_personas: list[ExpertPersonaPayload] = Field(default_factory=list) model_assignments: dict[str, str] = Field(default_factory=dict) orchestrator_model_id: str | None = None summarizer_model_id: str | None = None max_participants: int = DEFAULT_MAX_PARTICIPANTS # User-supplied overrides for the conversation's repetition / # failsafe limits. Any field that is missing or out of range is # silently clamped to the server-side default; see # `clamp_conversation_limits` in services.models. limits: dict[str, int] | None = None # Optional in-the-loop human participant's pre-authored credential # summary. Must reference a participant in the `participants` list # that has kind == "human". Capped at one human per session. human_credential: HumanCredentialPayload | None = None # Conversation-format plugin selection. IDs come from # app.services.conversation.STRUCTURE_REGISTRY / # DECISION_REGISTRY. Missing or unknown IDs are silently coerced # to the defaults (collaborative + consensus) at start time. conversation_structure_id: str | None = None decision_method_id: str | None = None # --------------------------------------------------------------------------- # Settings endpoints (orchestrator default + speed priority) # --------------------------------------------------------------------------- @router.get("/chat/orchestrator") async def api_get_orchestrator(): return {"model_id": settings.orchestrator_model} @router.put("/chat/orchestrator") async def api_set_orchestrator(req: SetOrchestratorRequest): settings.orchestrator_model = req.model_id return {"model_id": settings.orchestrator_model} @router.get("/chat/speed-priority") async def api_get_speed_priority(): return {"enabled": settings.speed_priority} @router.get("/chat/conversation-formats") async def api_conversation_formats(): """Catalog the available conversation structures + decision methods. The frontend reads this once to populate the "Conversation format" accordion in Settings. IDs returned here are the same values accepted by /chat/start under `conversation_structure_id` and `decision_method_id`. """ from app.services.conversation import ( list_structures, list_decisions, DEFAULT_STRUCTURE_ID, DEFAULT_DECISION_ID, ) return { "structures": list_structures(), "decisions": list_decisions(), "default_structure_id": DEFAULT_STRUCTURE_ID, "default_decision_id": DEFAULT_DECISION_ID, } @router.put("/chat/speed-priority") async def api_set_speed_priority(req: SetSpeedPriorityRequest): settings.speed_priority = req.enabled return {"enabled": settings.speed_priority} # --------------------------------------------------------------------------- # Role-prompt generation (used by the Expert Persona modal) # --------------------------------------------------------------------------- async def _builder_neon_model_ids() -> list[str]: """Flat neon:model@ver:persona ids for neutral writer fallback.""" ids: list[str] = [] try: for nm in await hana_client.get_models(): base = nm.get("model_id") or "" for p in nm.get("personas") or []: if p.get("enabled") is False: continue pname = p.get("persona_name") or "" if base and pname: ids.append(f"neon:{base}:{pname}") except Exception as exc: LOG.warning("Could not list Neon models for role writer pick: %s", exc) return ids @router.post("/chat/generate-role") async def api_generate_role(req: GenerateRoleRequest): neon_ids = await _builder_neon_model_ids() orchestrator_id = req.orchestrator_model_id or settings.orchestrator_model result = await generate_role_prompt( name=req.name, profile=req.profile, identity=req.identity, samples=req.samples, role_style=req.role_style, orchestrator_model_id=orchestrator_id, extra_model_ids=neon_ids, ) if result.get("error"): raise HTTPException(status_code=400, detail=result["error"]) return result @router.post("/chat/generate-role-freeform") async def api_generate_role_freeform(req: GenerateRoleFreeformRequest): neon_ids = await _builder_neon_model_ids() orchestrator_id = req.orchestrator_model_id or settings.orchestrator_model result = await generate_role_prompt_freeform( name=req.name, text=req.text, role_style=req.role_style, orchestrator_model_id=orchestrator_id, extra_model_ids=neon_ids, ) if result.get("error"): raise HTTPException(status_code=400, detail=result["error"]) return result @router.post("/chat/suggest-model") async def api_suggest_model(req: SuggestModelRequest): """Recommend an LLM for an Expert Persona from the builder's model list. Returns: {"recommended_model_id": "...", "rationale": "..."} on success, or {"error": "..."} when the description is empty, the model list is empty, or the orchestrator call fails / returns an invalid id. """ if not req.available_models: raise HTTPException(400, "At least one available model is required") orchestrator_id = req.orchestrator_model_id or settings.orchestrator_model neon_ids = await _builder_neon_model_ids() result = await suggest_model_for_persona( orchestrator_model_id=orchestrator_id, persona_name=req.persona_name, source_text=req.source_text, role_prompt=req.role_prompt, available_models=[m.model_dump() for m in req.available_models], panel_context=[p.model_dump() for p in req.panel_context], ) if result.get("error"): raise HTTPException(status_code=400, detail=result["error"]) return result # --------------------------------------------------------------------------- # Prompt catalog (Transparency: "View current chat prompts") # --------------------------------------------------------------------------- @router.get("/chat/prompts/catalog") async def api_chat_prompts_catalog(): """Return every prompt template the orchestrator and participants use during a chat, grouped by phase, each with a short purpose and a list of runtime template variables. Used by the PromptCatalogModal in the settings menu's Transparency section. Shape: {"groups": [{"title": "...", "items": [{"name", "purpose", "variables", "template"}]}, ...]} """ return build_prompt_catalog() # --------------------------------------------------------------------------- # Auto-select participants (LLM-based ranking for "Select N Automatically") # --------------------------------------------------------------------------- @router.post("/chat/auto-select-participants") async def api_auto_select_participants(req: AutoSelectRequest): """Rank the candidate pool by relevance to the question and return the top `count` participant_ids. The frontend calls this just before /chat/start when the user has the auto-select toggle on. Returns: {"selected": [id, ...], "rationale": "short string"}. `selected` is exactly `count` long unless the candidate pool is smaller. Invalid / hallucinated ids are silently dropped and padded with the next unused candidates. """ if not req.question or not req.question.strip(): raise HTTPException(400, "Question is required") if not req.candidates: raise HTTPException(400, "At least one candidate is required") if req.count < 1: raise HTTPException(400, "count must be >= 1") candidates_payload = [c.dict() for c in req.candidates] orchestrator_id = req.orchestrator_model_id or settings.orchestrator_model result = await auto_select_participants( orchestrator_model_id=orchestrator_id, question=req.question, candidates=candidates_payload, count=req.count, ) return result # --------------------------------------------------------------------------- # Conversation limits (steppers in the settings menu) # --------------------------------------------------------------------------- @router.get("/chat/limits/defaults") async def api_chat_limits_defaults(): """Return defaults, bounds, and human-readable descriptions for the `ConversationLimits` knobs the user can tune in the settings menu. The frontend uses the `defaults` to initialize the steppers, the `bounds` to set min/max and clamp on input, and the `descriptions` to render the section headers and per-field help text. Keeping this server-driven means we add a knob in one place (services.models) and the UI picks it up without a frontend change beyond rendering. """ defaults = ConversationLimits() return { "defaults": { field_name: getattr(defaults, field_name) for field_name in CONVERSATION_LIMIT_BOUNDS.keys() }, "bounds": { field_name: {"min": lo, "max": hi} for field_name, (lo, hi) in CONVERSATION_LIMIT_BOUNDS.items() }, "descriptions": CONVERSATION_LIMIT_DESCRIPTIONS, } # --------------------------------------------------------------------------- # Demo questions # --------------------------------------------------------------------------- @router.get("/demo-questions") async def api_demo_questions(): from pathlib import Path path = Path(__file__).resolve().parent.parent / "data" / "demo_questions.json" with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data # --------------------------------------------------------------------------- # Start chat # --------------------------------------------------------------------------- def _neon_role_prompt_from_model_id(model_id: str) -> str: """Look up a Neon persona's HANA system_prompt from the client cache.""" if not model_id.startswith("neon:"): return "" parts = model_id.split(":", 2) if len(parts) != 3: return "" sp = hana_client.get_persona_system_prompt(parts[1], parts[2]) return (sp or "").strip() def _build_participant( sel: ParticipantSelectionPayload, expert_lookup: dict[str, ExpertPersonaPayload], model_assignments: dict[str, str], ) -> Participant: """Resolve a selection payload into a runnable Participant. Resolution order for the model: 1. Explicit per-conversation override in `model_assignments` 2. The persona's selection-time `model_id_override` 3. The bundled extra persona's default model 4. For Neon participants, the model_id is the participant_id itself 5. For Expert personas, the persona's bound model_id Resolution order for the role_prompt: 1. The selection's role_prompt (most flexible) 2. The matching expert persona's role_prompt 3. The bundled extra persona's role_prompt 4. For Neon participants: a thin role wrapper just naming the persona """ pid = sel.participant_id kind = sel.kind name = sel.name role_prompt = sel.role_prompt or "" model_id = sel.model_id_override or model_assignments.get(pid, "") if kind == "expert": ep = expert_lookup.get(pid) if ep is None: raise HTTPException(400, f"Expert persona payload missing for id {pid}") if not role_prompt: role_prompt = ep.role_prompt if not model_id: model_id = ep.model_id if not name: name = ep.name elif kind == "extra": ep = get_extra_persona(pid) if ep is None: raise HTTPException(400, f"Unknown extra persona: {pid}") if not role_prompt: role_prompt = ep.role_prompt if not model_id: model_id = ep.default_model_id if not name: name = ep.name elif kind == "neon": # The participant_id IS the model id for Neon personas, so it's # required to be a `neon:model@ver:persona` style string. if not pid.startswith("neon:"): raise HTTPException( 400, f"Neon participant_id must start with 'neon:': {pid}", ) if not model_id: model_id = pid if not role_prompt: role_prompt = _neon_role_prompt_from_model_id(model_id) if not role_prompt: role_prompt = ( f"You are {name}, a Neon.ai persona. Speak naturally in your " "own voice and bring the perspective your background suggests." ) elif kind == "human": # Human participants don't use an LLM at all; the orchestrator # pauses for their typed input. They still need a participant # row so the rest of the state machine (credential summary, # alliance detection, addressed-to routing, etc.) can refer to # them by id and name. if not name: raise HTTPException(400, "Human participant requires a name") return Participant( participant_id=pid, name=name, role_prompt="", model_id="", kind="human", enabled=True, display_name="Human participant", ) else: raise HTTPException(400, f"Unknown participant kind: {kind}") # When the user assigns a Neon model to a non-Neon persona, surface # that model's HANA persona prompt for inference context. if not role_prompt and model_id.startswith("neon:"): role_prompt = _neon_role_prompt_from_model_id(model_id) resolved = settings.resolve_model(model_id) if not resolved: raise HTTPException(400, f"Unknown model: {model_id}") return Participant( participant_id=pid, name=name, role_prompt=role_prompt, model_id=resolved["model_id"], kind=kind, enabled=True, base_url=resolved.get("base_url", ""), api_key=resolved.get("api_key", ""), display_name=resolved.get("display_name", model_id), is_neon=resolved.get("is_neon", False), hana_model_id=resolved.get("hana_model_id", ""), persona_name=resolved.get("persona_name", ""), neon_direct_vllm=resolved.get("neon_direct_vllm", False), vllm_base_url=resolved.get("vllm_base_url", ""), vllm_api_key=resolved.get("vllm_api_key", ""), ) @router.post("/chat/start") async def api_start_chat(req: StartChatRequest, request: Request): """Create a session and return a streaming SSE response for the conversation.""" if not req.question or not req.question.strip(): raise HTTPException(400, "Question is required") allowed, _ = check_rate_limit(request) if not allowed: return JSONResponse( status_code=429, content={ "detail": ( f"Daily conversation limit reached ({DAILY_LIMIT}/day). " "Sign in with HuggingFace as a neongeckocom org member " "for unlimited access." ), "remaining": 0, }, ) expert_lookup = {ep.participant_id: ep for ep in req.expert_personas} # Populate the HANA persona prompt cache so Neon role_prompt # resolution works even if /api/models hasn't been called yet. try: await hana_client.get_models() except Exception as exc: LOG.warning("HANA get_models during chat start failed: %s", exc) max_p = max(MIN_MAX_PARTICIPANTS, min(MAX_MAX_PARTICIPANTS, req.max_participants)) if len(req.participants) < 2: raise HTTPException(400, "Need at least 2 participants") if len(req.participants) > max_p: raise HTTPException( 400, f"Got {len(req.participants)} participants but max is {max_p}", ) participants: list[Participant] = [] for sel in req.participants: participants.append(_build_participant(sel, expert_lookup, req.model_assignments)) humans = [p for p in participants if p.kind == "human"] if len(humans) > 1: raise HTTPException(400, "Only one human participant is supported per session.") if humans and req.human_credential is None: raise HTTPException( 400, "Human participant requires a human_credential payload.", ) if humans and req.human_credential.participant_id != humans[0].participant_id: raise HTTPException( 400, "human_credential.participant_id must match the human participant.", ) record_conversation(request) session = create_session() session.question = req.question.strip() session.participants = participants session.orchestrator_model_id = req.orchestrator_model_id session.summarizer_model_id = req.summarizer_model_id session.max_participants = max_p # Attach the user-tunable limits and seed the runtime failsafe # caps from them. clamp_conversation_limits silently coerces any # missing or out-of-range values back to the defaults / bounds. session.limits = clamp_conversation_limits(req.limits) session.participant_message_cap = session.limits.participant_message_pause_at session.orchestrator_call_cap = session.limits.orchestrator_call_pause_at # Resilience: precompute the per-session alternate-participant pool # and LLM substitution chain so the orchestrator doesn't have to # walk the catalog at runtime when a participant fails. These are # only *consulted* when settings.speed_priority is on; building # them unconditionally keeps the start path simple and the work is # cheap (one cached HANA fetch + a few synchronous list builds). selected_ids = {p.participant_id for p in participants} candidate_pool: list[Participant] = [] # Extra personas first (general-purpose lenses; resolvable as # long as the matching API key is configured). for spec in EXTRA_PERSONAS: if spec.participant_id in selected_ids: continue resolved = settings.resolve_model(spec.default_model_id) if not resolved: continue candidate_pool.append(Participant( participant_id=spec.participant_id, name=spec.name, role_prompt=spec.role_prompt, model_id=resolved["model_id"], kind="extra", base_url=resolved.get("base_url", ""), api_key=resolved.get("api_key", ""), display_name=resolved.get("display_name", spec.default_model_id), )) # Neon personas next, from whatever HANA returns. HANA being # unreachable just yields an empty list — fine, the pool's already # got the extras. neon_hana_model_ids: list[str] = [] try: neon_models = await hana_client.get_models() except Exception as exc: # noqa: BLE001 LOG.warning("HANA models unavailable while building candidate pool: %s", exc) neon_models = [] for nm in neon_models or []: hana_mid = nm.get("model_id") if hana_mid: neon_hana_model_ids.append(hana_mid) for persona in nm.get("personas", []) or []: if persona.get("enabled") is False: continue persona_name = persona.get("persona_name") or "" pn_lower = persona_name.lower() if "vanilla" in pn_lower or "rag" in pn_lower: continue pid = f"neon:{hana_mid}:{persona_name}" if pid in selected_ids: continue resolved = settings.resolve_model(pid) if not resolved: continue candidate_pool.append(Participant( participant_id=pid, name=persona_name or hana_mid.split("/")[-1], role_prompt=( f"You are {persona_name}, a Neon.ai persona. Speak " "naturally in your own voice and bring the perspective " "your background suggests." ), model_id=resolved["model_id"], kind="neon", base_url=resolved.get("base_url", ""), api_key=resolved.get("api_key", ""), display_name=resolved.get("display_name", pid), is_neon=True, hana_model_id=resolved.get("hana_model_id", ""), persona_name=resolved.get("persona_name", ""), neon_direct_vllm=resolved.get("neon_direct_vllm", False), vllm_base_url=resolved.get("vllm_base_url", ""), vllm_api_key=resolved.get("vllm_api_key", ""), )) session.candidate_pool = candidate_pool session.substitution_chain = build_substitution_chain(neon_hana_model_ids) # Conversation-format plugin selection. Coerce unknown IDs to the # defaults via the get_structure/get_decision resolvers. from app.services.conversation import ( get_structure as _get_structure_cls, get_decision as _get_decision_cls, STRUCTURE_REGISTRY as _STRUCT_REG, DECISION_REGISTRY as _DEC_REG, ) if req.conversation_structure_id and req.conversation_structure_id in _STRUCT_REG: session.conversation_structure_id = req.conversation_structure_id if req.decision_method_id and req.decision_method_id in _DEC_REG: session.decision_method_id = req.decision_method_id if humans and req.human_credential is not None: session.human_credential = normalize_one_credential({ "participant_id": req.human_credential.participant_id, "name": req.human_credential.name, "expertise": req.human_credential.expertise, "personality": req.human_credential.personality, "credibility_for_question": req.human_credential.credibility_for_question, "bias_to_watch": req.human_credential.bias_to_watch, "is_human": True, }) async def event_stream(): yield ( "event: session\ndata: " + json.dumps({ "session_id": session.session_id, "participants": [ { "participant_id": p.participant_id, "name": p.name, "model_id": p.model_id, "model_display": p.display_name, "kind": p.kind, } for p in session.participants ], "max_participants": session.max_participants, "orchestrator_model_id": session.orchestrator_model_id or settings.orchestrator_model, "summarizer_model_id": session.summarizer_model_id, }) + "\n\n" ) async for chunk in run_conversation(session): yield chunk return StreamingResponse(event_stream(), media_type="text/event-stream") @router.post("/chat/{session_id}/continue") async def api_continue(session_id: str, reason: str = "messages"): session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") if not session.paused_for_continue: raise HTTPException(409, "Session is not paused") session.pending_continue = True return {"ok": True, "reason": reason} # --------------------------------------------------------------------------- # Human participant: turn response + credential intake Q&A # --------------------------------------------------------------------------- class HumanResponseRequest(BaseModel): """POST body for the human's response to a pending turn. `skip` flips this turn into a "declined to comment" note from the orchestrator rather than a participant message; `text` is ignored when skip is true. """ text: str = "" skip: bool = False @router.post("/chat/{session_id}/human-response") async def api_human_response(session_id: str, req: HumanResponseRequest): """Deliver the human participant's text for the current pending turn. Wakes the orchestrator coroutine waiting on the `human_io` slot for this session.""" session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") if session.awaiting_human is None: raise HTTPException(409, "Session is not awaiting a human turn") if not req.skip and not (req.text or "").strip(): raise HTTPException(400, "text is required unless skip is true") delivered = human_io.deliver_human_response( session_id, req.text, skip=req.skip, ) if not delivered: raise HTTPException(409, "No pending human turn for this session") return {"ok": True, "skipped": req.skip} class HumanCredentialEditRequest(BaseModel): """PATCH body for editing the human's credential summary mid-chat.""" name: str | None = None expertise: str | None = None personality: str | None = None credibility_for_question: float | None = None bias_to_watch: str | None = None @router.patch("/chat/{session_id}/credentials/human") async def api_edit_human_credential( session_id: str, req: HumanCredentialEditRequest, ): """Update the human participant's credential summary in place. The View Credential Summary modal lets the user tweak the human's entry (name, expertise, style, credibility, bias). Only fields provided in the body are changed; others are left as-is. The updated entry is reflected in subsequent participant prompts (the credentials_to_block call rebuilds the prompt block each turn). """ session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") if session.human_credential is None: raise HTTPException(404, "Session has no human participant") updated = dict(session.human_credential) for field_name in ( "name", "expertise", "personality", "credibility_for_question", "bias_to_watch", ): value = getattr(req, field_name) if value is not None: updated[field_name] = value updated["is_human"] = True updated = normalize_one_credential(updated) session.human_credential = updated # Also patch the entry inside session.credential_summary so the # View Credential Summary modal reflects the edit without waiting # for the next phase-refresh. for i, c in enumerate(session.credential_summary or []): if c.get("participant_id") == updated["participant_id"]: session.credential_summary[i] = updated break return {"ok": True, "credential": updated} class HumanCredentialFromProfileRequest(BaseModel): """Body for POST /api/chat/credentials/from-profile.""" name: str question: str = "" profile_text: str participant_id: str = "" orchestrator_model_id: str | None = None @router.post("/chat/credentials/from-profile") async def api_human_credential_from_profile(req: HumanCredentialFromProfileRequest): """Generate a structured credential summary from a human's profile text. The orchestrator assesses the self-description the same way it would a participant role prompt when building the Phase-1 Credential Summary (expertise, style, credibility on this question, biases). """ if not req.name.strip(): raise HTTPException(400, "name is required") if not req.profile_text.strip(): raise HTTPException(400, "profile_text is required") orchestrator_id = req.orchestrator_model_id or settings.orchestrator_model credential = await build_human_credential_from_profile( orchestrator_model_id=orchestrator_id, question=(req.question or "").strip(), name=req.name.strip(), profile_text=req.profile_text.strip(), participant_id=req.participant_id.strip(), ) return {"credential": credential} # Module-level registry of in-flight credential drafts. Each draft is a # tiny piece of state: the question being discussed, the human's name, # the question/answer history, and the configured cap. Drafts are # transient (lifetime = a few seconds of Q&A in the modal) so we don't # bother persisting them; the registry is cleared by the API when the # draft is finalized or abandoned. _credential_drafts: dict[str, dict[str, Any]] = {} class CredentialDraftStartRequest(BaseModel): """Body for POST /api/chat/credentials/draft - kicks off a Q&A.""" name: str question: str max_questions: int = 6 orchestrator_model_id: str | None = None class CredentialDraftAnswerRequest(BaseModel): """Body for POST /api/chat/credentials/draft/{draft_id}/answer.""" answer: str = "" def _intake_transcript(history: list[dict[str, str]]) -> str: """Render the Q&A history into a transcript snippet for the prompt. Each entry of history is {"q": "...", "a": "..."}. The last entry may have only "q" (the question the user is currently answering) when called BEFORE the first answer, but in practice we render history only after the LLM has emitted a question and the user has answered, so both keys are present. """ if not history: return CREDENTIAL_INTAKE_EMPTY_TRANSCRIPT lines: list[str] = [] for i, qa in enumerate(history, start=1): q = (qa.get("q") or "").strip() a = (qa.get("a") or "").strip() lines.append(f"Q{i}: {q}") lines.append(f"A{i}: {a}" if a else f"A{i}: (no answer yet)") return "\n".join(lines) async def _intake_turn(draft: dict[str, Any]) -> dict[str, Any]: """Run one orchestrator turn for the credential intake Q&A. Returns either {"kind": "question", "text": ...} or {"kind": "summary", "summary": {...}} as parsed from the orchestrator's JSON output. Falls back to a safe default question if parsing fails. """ transcript = _intake_transcript(draft["history"]) prompt = CREDENTIAL_INTAKE_TURN_PROMPT.format( name=draft["name"], question=draft["question"], max_questions=draft["max_questions"], questions_asked=draft["questions_asked"], transcript=transcript, ) _raw, parsed = await orchestrator_call( orchestrator_model_id=draft["orchestrator_model_id"], user_prompt=prompt, label="credential_intake", api_log=draft.get("api_log"), max_tokens=512, ) if isinstance(parsed, dict): kind = parsed.get("kind") if kind == "summary" and isinstance(parsed.get("summary"), dict): return {"kind": "summary", "summary": parsed["summary"]} if kind == "question" and isinstance(parsed.get("text"), str): return {"kind": "question", "text": parsed["text"].strip()} # Defensive fallback: if the model returned garbage, ask a sensible # next-question rather than crashing the modal. if draft["questions_asked"] >= draft["max_questions"]: return { "kind": "summary", "summary": { "name": draft["name"], "expertise": "(intake LLM did not return a summary)", "personality": "", "credibility_for_question": 0.5, "bias_to_watch": "", }, } return { "kind": "question", "text": ( "Could you tell me a bit about your background relevant to " f'this question: "{draft["question"]}"?' ), } @router.post("/chat/credentials/draft") async def api_credential_draft_start(req: CredentialDraftStartRequest): """Kick off a new credential-intake Q&A. Returns the draft id plus the LLM's first question (or, if it bailed immediately, a final summary).""" if not req.name.strip(): raise HTTPException(400, "name is required") if not req.question.strip(): raise HTTPException(400, "question is required") max_q = max(1, min(10, int(req.max_questions or 6))) import uuid as _uuid draft_id = str(_uuid.uuid4()) draft: dict[str, Any] = { "draft_id": draft_id, "name": req.name.strip(), "question": req.question.strip(), "max_questions": max_q, "questions_asked": 0, "history": [], "orchestrator_model_id": ( req.orchestrator_model_id or settings.orchestrator_model ), "api_log": [], } result = await _intake_turn(draft) if result["kind"] == "question": draft["questions_asked"] += 1 draft["history"].append({"q": result["text"], "a": ""}) _credential_drafts[draft_id] = draft return { "draft_id": draft_id, "kind": "question", "question": result["text"], "questions_asked": draft["questions_asked"], "max_questions": max_q, } # The intake LLM jumped straight to a summary (no answers needed). return { "draft_id": draft_id, "kind": "summary", "summary": result["summary"], "questions_asked": 0, "max_questions": max_q, } @router.post("/chat/credentials/draft/{draft_id}/answer") async def api_credential_draft_answer( draft_id: str, req: CredentialDraftAnswerRequest, ): """Submit the human's answer to the last question; receive either the LLM's next question or the final credential summary.""" draft = _credential_drafts.get(draft_id) if draft is None: raise HTTPException(404, "Draft not found or already finalized") if not draft["history"]: raise HTTPException(409, "Draft has no pending question to answer") # Stamp the answer onto the last question. draft["history"][-1]["a"] = (req.answer or "").strip() result = await _intake_turn(draft) if result["kind"] == "question": draft["questions_asked"] += 1 draft["history"].append({"q": result["text"], "a": ""}) return { "draft_id": draft_id, "kind": "question", "question": result["text"], "questions_asked": draft["questions_asked"], "max_questions": draft["max_questions"], } # Final summary; clear the draft from the registry. _credential_drafts.pop(draft_id, None) return { "draft_id": draft_id, "kind": "summary", "summary": result["summary"], "questions_asked": draft["questions_asked"], "max_questions": draft["max_questions"], } @router.delete("/chat/credentials/draft/{draft_id}") async def api_credential_draft_cancel(draft_id: str): """User abandoned the AI Q&A (e.g. closed the modal). No-op if already gone.""" _credential_drafts.pop(draft_id, None) return {"ok": True} # --------------------------------------------------------------------------- # Exports # --------------------------------------------------------------------------- @router.get("/chat/{session_id}/export") async def api_export_chat(session_id: str, fmt: str = "txt"): session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") if fmt == "md": return _export_md(session) if fmt == "csv-table": return _export_csv_table(session) return _export_txt(session) @router.get("/chat/{session_id}/credentials") async def api_credentials(session_id: str): """Return the orchestrator-generated Credential Summary for the current session. Built after Phase 1 and refreshed once after Phase 2 critique - so the response can be empty if the user opens the modal before Phase 1 finishes. """ session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") return { "session_id": session_id, "question": session.question, "credentials": session.credential_summary or [], } @router.get("/chat/{session_id}/api-log") async def api_export_log(session_id: str): session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") return {"session_id": session_id, "log": session.api_log} @router.get("/chat/{session_id}/table") async def api_table_view(session_id: str): session = get_session(session_id) if not session: raise HTTPException(404, "Session not found") from app.services.orchestrator import ensure_contribution_summaries try: await ensure_contribution_summaries(session) except Exception as exc: LOG.warning("Failed to build contribution summaries: %s", exc) rows = [] for p in session.participants: first = (session.initial_opinions or {}).get(p.participant_id, "") contribution = (session.contribution_summaries or {}).get(p.participant_id, "") revised = (session.final_opinions or {}).get(p.participant_id, "") final_msg = _last_consensus_message_for(session, p.participant_id) or revised rows.append({ "participant_id": p.participant_id, "name": p.name, "model_display": p.display_name, "first_opinion": first, "contribution_summary": contribution, "revised_opinion": revised, "final_opinion": final_msg, }) final_report = (session.final_report or {}).get("text", "") return { "session_id": session_id, "question": session.question, "final_report": final_report, "final_report_kind": (session.final_report or {}).get("kind", ""), "rows": rows, } def _last_consensus_message_for(session: Session, participant_id: str) -> str: """Return the participant's most recent message in the consensus or finalization phase - used as the 'final opinion' column.""" for m in reversed(session.messages): if m.get("speaker_id") != participant_id: continue if m.get("phase") in {Phase.CONSENSUS.value, Phase.FINALIZATION.value}: return m.get("text", "") return "" # --------------------------------------------------------------------------- # Export helpers # --------------------------------------------------------------------------- def _format_participants_block(session: Session) -> list[str]: return [ f"- {p.name} ({p.display_name})" for p in session.participants ] def _credentials_intro_line() -> str: """One-liner that explains where this block came from. Repeated in TXT and MD exports so the file is self-explanatory without the UI.""" return ( "Orchestrator-generated assessments of each participant's " "expertise, debating style, credibility on this question, and " "biases to watch. Built after Phase 1 (initial opinions) and " "refreshed once after Phase 2 (critique)." ) def _format_credential_block_txt(session: Session) -> list[str]: """Plain-text Credential Summary section. Returns [] if no summary has been built yet (e.g. user exports mid-Phase-1).""" creds = session.credential_summary or [] if not creds: return [] lines = ["Credential Summary", "-" * 40, _credentials_intro_line(), ""] for c in creds: name = c.get("name") or c.get("participant_id") or "(unknown)" lines.append(f"{name}") if c.get("expertise"): lines.append(f" Expertise: {c['expertise']}") if c.get("personality"): lines.append(f" Style: {c['personality']}") if c.get("credibility_for_question") is not None: try: score = float(c["credibility_for_question"]) lines.append(f" Credibility: {score:.2f} (0-1)") except (TypeError, ValueError): pass if c.get("bias_to_watch"): lines.append(f" Bias: {c['bias_to_watch']}") lines.append("") return lines def _format_credential_block_md(session: Session) -> list[str]: """Markdown Credential Summary section. Returns [] when empty.""" creds = session.credential_summary or [] if not creds: return [] lines = ["## Credential Summary", "", f"_{_credentials_intro_line()}_", ""] for c in creds: name = c.get("name") or c.get("participant_id") or "(unknown)" lines.append(f"### {name}") lines.append("") if c.get("expertise"): lines.append(f"- **Expertise:** {c['expertise']}") if c.get("personality"): lines.append(f"- **Style:** {c['personality']}") if c.get("credibility_for_question") is not None: try: score = float(c["credibility_for_question"]) lines.append(f"- **Credibility on this question:** {score:.2f} (0-1)") except (TypeError, ValueError): pass if c.get("bias_to_watch"): lines.append(f"- **Bias to watch:** {c['bias_to_watch']}") lines.append("") return lines def _credential_for(session: Session, participant_id: str) -> dict: """Lookup helper used by the CSV writer. Empty dict if not built yet.""" for c in session.credential_summary or []: if c.get("participant_id") == participant_id: return c return {} def _format_credibility_score(value: object) -> str: """CSV-safe formatting for the credibility number (rounded float). Returns "" when the value is missing or not numeric.""" if value is None: return "" try: return f"{float(value):.2f}" except (TypeError, ValueError): return "" def _export_txt(session: Session) -> dict: lines = ["CCAI Conversation Log", "=" * 40, ""] lines.append("Question:") lines.append(session.question) lines.append("") lines.append("Participants:") lines.extend(_format_participants_block(session)) lines.append("") cred_lines = _format_credential_block_txt(session) if cred_lines: lines.extend(cred_lines) for m in session.messages: speaker = m.get("speaker_name") or "(anon)" if m.get("role") == "orchestrator": speaker = "Orchestrator" lines.append(f"{speaker}: {m.get('text', '')}") lines.append("") if session.final_report and session.final_report.get("text"): lines.append("---") lines.append("Final Report:") lines.append(session.final_report["text"]) return {"filename": "ccai_chat.txt", "content": "\n".join(lines)} def _export_md(session: Session) -> dict: lines = ["# CCAI Conversation Log", ""] lines.append("## Question") lines.append("") lines.append(f"> {session.question}") lines.append("") lines.append("## Participants") lines.append("") for p in session.participants: lines.append(f"- **{p.name}** (*{p.display_name}*)") lines.append("") cred_lines = _format_credential_block_md(session) if cred_lines: lines.extend(cred_lines) lines.append("---") lines.append("") for m in session.messages: speaker = m.get("speaker_name") or "(anon)" is_orch = m.get("role") == "orchestrator" if is_orch: speaker = "Orchestrator" text = m.get("text", "") if is_orch: lines.append(f"_**{speaker}:**_ {text}") else: lines.append(f"**{speaker}:** {text}") lines.append("") if session.final_report and session.final_report.get("text"): lines.append("\n---\n") lines.append("## Final Report") lines.append("") lines.append(session.final_report["text"]) return {"filename": "ccai_chat.md", "content": "\n".join(lines)} def _export_csv_table(session: Session) -> dict: """RFC-4180 compliant CSV. csv.writer handles quoting/escaping. Columns include the orchestrator-generated Credential Summary so the table is self-contained: who each participant is (per the orchestrator's read), then what they said and how it evolved. """ buf = io.StringIO() writer = csv.writer(buf, quoting=csv.QUOTE_MINIMAL, lineterminator="\n") writer.writerow(["Question", session.question]) final_text = (session.final_report or {}).get("text", "") writer.writerow(["Final Group Opinion", final_text]) writer.writerow([]) writer.writerow([ "Participant", "Expertise (orchestrator's read)", "Style", "Credibility on this question (0-1)", "Bias to watch", "First opinion", "Conversation contribution", "Revised opinion", "Final opinion", ]) for p in session.participants: cred = _credential_for(session, p.participant_id) writer.writerow([ p.name, cred.get("expertise", ""), cred.get("personality", ""), _format_credibility_score(cred.get("credibility_for_question")), cred.get("bias_to_watch", ""), (session.initial_opinions or {}).get(p.participant_id, ""), (session.contribution_summaries or {}).get(p.participant_id, ""), (session.final_opinions or {}).get(p.participant_id, ""), _last_consensus_message_for(session, p.participant_id), ]) return {"filename": "ccai_chat_table.csv", "content": buf.getvalue()}