from __future__ import annotations import json import logging from collections import defaultdict from dataclasses import dataclass from agents import RunContextWrapper, function_tool from src.agents.state import AgentContext, RetrievedDocument from src.utils.agent_utils import has_sufficient_context from src.config import get_settings from src.schemas import CEOContact, ChatCitation, ChatEmailNotification from src.services.blog import blog_search_service from src.services.email import lead_notification_email_service logger = logging.getLogger(__name__) @dataclass(slots=True) class RetrievedChunk: index: int document_id: str title: str source_kind: str source_url: str | None content: str score: float vector_score: float overlap_count: int is_title_only: bool class RetrievalToolService: def retrieve_relevant_chunks(self, question: str) -> list[RetrievedChunk]: hits = blog_search_service.search(question) return [ RetrievedChunk( index=index, document_id=hit.document_id, title=hit.title, source_kind=hit.source_kind, source_url=hit.source_url, content=hit.content, score=hit.score, vector_score=hit.vector_score, overlap_count=hit.overlap_count, is_title_only=hit.is_title_only, ) for index, hit in enumerate(hits, start=1) ] def retrieve_brand_context(self, context: AgentContext, question: str) -> str: context.retrieved_tool_called = True chunks = self.retrieve_relevant_chunks(question) grouped_excerpts: dict[str, list[str]] = defaultdict(list) grouped_hits: dict[str, RetrievedChunk] = {} for chunk in chunks: grouped_excerpts[chunk.document_id].append(chunk.content) grouped_hits.setdefault(chunk.document_id, chunk) documents = [ RetrievedDocument( document_id=document_id, title=grouped_hits[document_id].title, source_url=grouped_hits[document_id].source_url, source_kind=grouped_hits[document_id].source_kind, excerpts=grouped_excerpts[document_id], score=grouped_hits[document_id].score, vector_score=grouped_hits[document_id].vector_score, overlap_count=grouped_hits[document_id].overlap_count, is_title_only=grouped_hits[document_id].is_title_only, ) for document_id in grouped_excerpts ] documents.sort(key=lambda item: item.score, reverse=True) context.retrieved_documents = documents insufficient = not has_sufficient_context(question, documents) if insufficient: context.citations = [] context.citation_ids = set() context.retrieval_status = "insufficient" return ( "No sufficiently grounded Blink Helsinki blog material was found for this request. " "You may still answer with general professional marketing or branding guidance if that helps the user. " "If you do, make clear that it is general guidance rather than Blink Helsinki's published view, and do not use any tags." ) context.citations = [ ChatCitation( index=index, document_id=document.document_id, title=document.title, source_url=document.source_url, source_kind=document.source_kind, ) for index, document in enumerate(documents, start=1) ] context.citation_ids = {citation.document_id for citation in context.citations} context.retrieval_status = "ok" lines: list[str] = ["Use only the following verified Blink Helsinki material:"] for document in documents: lines.append(f'Document ID: {document.document_id}') lines.append(f"Title: {document.title}") if document.source_url: lines.append(f"URL: {document.source_url}") if document.is_title_only: lines.append("Corpus note: only title metadata is available for this article in the local corpus.") for excerpt in document.excerpts[:2]: lines.append(f"Excerpt: {excerpt}") lines.append("") lines.append('When you rely on one of these documents, append immediately after the sentence.') return "\n".join(lines).strip() class CEOInterestService: @staticmethod def _contact_payload(question: str) -> CEOContact: ceo_email = get_settings().ceo_contact_email return CEOContact( email=ceo_email, ) def log_interest(self, context: AgentContext, request_summary: str, user_request: str) -> ChatEmailNotification: if context.email_notification and context.notified_ceo: return context.email_notification settings = get_settings() notification_recipient = settings.lead_notification_to_email or settings.ceo_contact_email contact = self._contact_payload(context.question) notification = lead_notification_email_service.queue( recipient_email=notification_recipient, request_summary=request_summary, user_request=user_request, source_question=context.question, ) logger.info( "CEO_EMAIL_LOG %s", json.dumps( { "recipient": notification_recipient, "summary": request_summary, "user_request": user_request, "source_question": context.question, "logged_at": notification.logged_at, "delivery_mode": notification.mode, "delivery_status": notification.status, "provider": notification.provider, }, ensure_ascii=False, ), ) context.should_handoff = True context.notified_ceo = True context.contact = contact context.email_notification = notification return notification _retrieval_service = RetrievalToolService() _ceo_interest_service = CEOInterestService() @function_tool def retrieve_brand_context(ctx: RunContextWrapper[AgentContext], question: str) -> str: """Retrieve verified Blink Helsinki blog context before answering factual questions.""" return _retrieval_service.retrieve_brand_context(ctx.context, question) @function_tool(needs_approval=True) async def hand_off_ceo( ctx: RunContextWrapper[AgentContext], request_summary: str, user_request: str, ) -> str: """Record a CEO follow-up when the user agrees to contact, asks for pricing, or wants Blink to follow up. request_summary: Write this for the CEO email body. Summarize the conversation so far: what you and the user explored (topic, constraints), ideas or solution directions you proposed, what the user intends to do or achieve, and why a leadership or specialist conversation would help. Synthesize from the full thread—do not only paste or paraphrase the user's last message alone. user_request: The concrete follow-up lines: what the user wants Blink to do next, their explicit consent to be contacted, pricing/contact ask, or a short verbatim quote of agreement when useful. """ context = ctx.context info_parts = [] if context.user_name: info_parts.append(f"Name: {context.user_name}") if context.user_email: info_parts.append(f"Email: {context.user_email}") if context.user_phone: info_parts.append(f"Phone: {context.user_phone}") full_request = user_request if info_parts: full_request = "--- User Contact Info ---\n" + "\n".join(info_parts) + "\n\n--- User Request ---\n" + user_request _ceo_interest_service.log_interest(context, request_summary=request_summary, user_request=full_request) contact = context.contact if contact: return ( "CEO follow-up recorded successfully. " f"Specialist contact is available: {contact.email}. " "You can now reply to the user naturally and provide contact details as needed." ) return ( "CEO follow-up recorded successfully. " "You can now reply to the user naturally and confirm the specialist follow-up." )