Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import logging | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from agents import RunContextWrapper, function_tool | |
| from src.agents.state import AgentContext, RetrievedDocument | |
| from src.utils.agent_utils import has_sufficient_context | |
| from src.config import get_settings | |
| from src.schemas import CEOContact, ChatCitation, ChatEmailNotification | |
| from src.services.blog import blog_search_service | |
| from src.services.email import lead_notification_email_service | |
| logger = logging.getLogger(__name__) | |
| class RetrievedChunk: | |
| index: int | |
| document_id: str | |
| title: str | |
| source_kind: str | |
| source_url: str | None | |
| content: str | |
| score: float | |
| vector_score: float | |
| overlap_count: int | |
| is_title_only: bool | |
| class RetrievalToolService: | |
| def retrieve_relevant_chunks(self, question: str) -> list[RetrievedChunk]: | |
| hits = blog_search_service.search(question) | |
| return [ | |
| RetrievedChunk( | |
| index=index, | |
| document_id=hit.document_id, | |
| title=hit.title, | |
| source_kind=hit.source_kind, | |
| source_url=hit.source_url, | |
| content=hit.content, | |
| score=hit.score, | |
| vector_score=hit.vector_score, | |
| overlap_count=hit.overlap_count, | |
| is_title_only=hit.is_title_only, | |
| ) | |
| for index, hit in enumerate(hits, start=1) | |
| ] | |
| def retrieve_brand_context(self, context: AgentContext, question: str) -> str: | |
| context.retrieved_tool_called = True | |
| chunks = self.retrieve_relevant_chunks(question) | |
| grouped_excerpts: dict[str, list[str]] = defaultdict(list) | |
| grouped_hits: dict[str, RetrievedChunk] = {} | |
| for chunk in chunks: | |
| grouped_excerpts[chunk.document_id].append(chunk.content) | |
| grouped_hits.setdefault(chunk.document_id, chunk) | |
| documents = [ | |
| RetrievedDocument( | |
| document_id=document_id, | |
| title=grouped_hits[document_id].title, | |
| source_url=grouped_hits[document_id].source_url, | |
| source_kind=grouped_hits[document_id].source_kind, | |
| excerpts=grouped_excerpts[document_id], | |
| score=grouped_hits[document_id].score, | |
| vector_score=grouped_hits[document_id].vector_score, | |
| overlap_count=grouped_hits[document_id].overlap_count, | |
| is_title_only=grouped_hits[document_id].is_title_only, | |
| ) | |
| for document_id in grouped_excerpts | |
| ] | |
| documents.sort(key=lambda item: item.score, reverse=True) | |
| context.retrieved_documents = documents | |
| insufficient = not has_sufficient_context(question, documents) | |
| if insufficient: | |
| context.citations = [] | |
| context.citation_ids = set() | |
| context.retrieval_status = "insufficient" | |
| return ( | |
| "No sufficiently grounded Blink Helsinki blog material was found for this request. " | |
| "You may still answer with general professional marketing or branding guidance if that helps the user. " | |
| "If you do, make clear that it is general guidance rather than Blink Helsinki's published view, and do not use any <doc-ref .../> tags." | |
| ) | |
| context.citations = [ | |
| ChatCitation( | |
| index=index, | |
| document_id=document.document_id, | |
| title=document.title, | |
| source_url=document.source_url, | |
| source_kind=document.source_kind, | |
| ) | |
| for index, document in enumerate(documents, start=1) | |
| ] | |
| context.citation_ids = {citation.document_id for citation in context.citations} | |
| context.retrieval_status = "ok" | |
| lines: list[str] = ["Use only the following verified Blink Helsinki material:"] | |
| for document in documents: | |
| lines.append(f'Document ID: {document.document_id}') | |
| lines.append(f"Title: {document.title}") | |
| if document.source_url: | |
| lines.append(f"URL: {document.source_url}") | |
| if document.is_title_only: | |
| lines.append("Corpus note: only title metadata is available for this article in the local corpus.") | |
| for excerpt in document.excerpts[:2]: | |
| lines.append(f"Excerpt: {excerpt}") | |
| lines.append("") | |
| lines.append('When you rely on one of these documents, append <doc-ref id="DOCUMENT_ID"/> immediately after the sentence.') | |
| return "\n".join(lines).strip() | |
| class CEOInterestService: | |
| def _contact_payload(question: str) -> CEOContact: | |
| ceo_email = get_settings().ceo_contact_email | |
| return CEOContact( | |
| email=ceo_email, | |
| ) | |
| def log_interest(self, context: AgentContext, request_summary: str, user_request: str) -> ChatEmailNotification: | |
| if context.email_notification and context.notified_ceo: | |
| return context.email_notification | |
| settings = get_settings() | |
| notification_recipient = settings.lead_notification_to_email or settings.ceo_contact_email | |
| contact = self._contact_payload(context.question) | |
| notification = lead_notification_email_service.queue( | |
| recipient_email=notification_recipient, | |
| request_summary=request_summary, | |
| user_request=user_request, | |
| source_question=context.question, | |
| ) | |
| logger.info( | |
| "CEO_EMAIL_LOG %s", | |
| json.dumps( | |
| { | |
| "recipient": notification_recipient, | |
| "summary": request_summary, | |
| "user_request": user_request, | |
| "source_question": context.question, | |
| "logged_at": notification.logged_at, | |
| "delivery_mode": notification.mode, | |
| "delivery_status": notification.status, | |
| "provider": notification.provider, | |
| }, | |
| ensure_ascii=False, | |
| ), | |
| ) | |
| context.should_handoff = True | |
| context.notified_ceo = True | |
| context.contact = contact | |
| context.email_notification = notification | |
| return notification | |
| _retrieval_service = RetrievalToolService() | |
| _ceo_interest_service = CEOInterestService() | |
| def retrieve_brand_context(ctx: RunContextWrapper[AgentContext], question: str) -> str: | |
| """Retrieve verified Blink Helsinki blog context before answering factual questions.""" | |
| return _retrieval_service.retrieve_brand_context(ctx.context, question) | |
| async def hand_off_ceo( | |
| ctx: RunContextWrapper[AgentContext], | |
| request_summary: str, | |
| user_request: str, | |
| ) -> str: | |
| """Record a CEO follow-up when the user agrees to contact, asks for pricing, or wants Blink to follow up. | |
| request_summary: Write this for the CEO email body. Summarize the conversation so far: what you and the user | |
| explored (topic, constraints), ideas or solution directions you proposed, what the user intends to do or | |
| achieve, and why a leadership or specialist conversation would help. Synthesize from the full thread—do | |
| not only paste or paraphrase the user's last message alone. | |
| user_request: The concrete follow-up lines: what the user wants Blink to do next, their explicit consent | |
| to be contacted, pricing/contact ask, or a short verbatim quote of agreement when useful. | |
| """ | |
| context = ctx.context | |
| info_parts = [] | |
| if context.user_name: | |
| info_parts.append(f"Name: {context.user_name}") | |
| if context.user_email: | |
| info_parts.append(f"Email: {context.user_email}") | |
| if context.user_phone: | |
| info_parts.append(f"Phone: {context.user_phone}") | |
| full_request = user_request | |
| if info_parts: | |
| full_request = "--- User Contact Info ---\n" + "\n".join(info_parts) + "\n\n--- User Request ---\n" + user_request | |
| _ceo_interest_service.log_interest(context, request_summary=request_summary, user_request=full_request) | |
| contact = context.contact | |
| if contact: | |
| return ( | |
| "CEO follow-up recorded successfully. " | |
| f"Specialist contact is available: {contact.email}. " | |
| "You can now reply to the user naturally and provide contact details as needed." | |
| ) | |
| return ( | |
| "CEO follow-up recorded successfully. " | |
| "You can now reply to the user naturally and confirm the specialist follow-up." | |
| ) | |