chatbot-rag-fi / src /agents /tools.py
ABAO77's picture
Upload 147 files
0df80b4 verified
from __future__ import annotations
import json
import logging
from collections import defaultdict
from dataclasses import dataclass
from agents import RunContextWrapper, function_tool
from src.agents.state import AgentContext, RetrievedDocument
from src.utils.agent_utils import has_sufficient_context
from src.config import get_settings
from src.schemas import CEOContact, ChatCitation, ChatEmailNotification
from src.services.blog import blog_search_service
from src.services.email import lead_notification_email_service
logger = logging.getLogger(__name__)
@dataclass(slots=True)
class RetrievedChunk:
index: int
document_id: str
title: str
source_kind: str
source_url: str | None
content: str
score: float
vector_score: float
overlap_count: int
is_title_only: bool
class RetrievalToolService:
def retrieve_relevant_chunks(self, question: str) -> list[RetrievedChunk]:
hits = blog_search_service.search(question)
return [
RetrievedChunk(
index=index,
document_id=hit.document_id,
title=hit.title,
source_kind=hit.source_kind,
source_url=hit.source_url,
content=hit.content,
score=hit.score,
vector_score=hit.vector_score,
overlap_count=hit.overlap_count,
is_title_only=hit.is_title_only,
)
for index, hit in enumerate(hits, start=1)
]
def retrieve_brand_context(self, context: AgentContext, question: str) -> str:
context.retrieved_tool_called = True
chunks = self.retrieve_relevant_chunks(question)
grouped_excerpts: dict[str, list[str]] = defaultdict(list)
grouped_hits: dict[str, RetrievedChunk] = {}
for chunk in chunks:
grouped_excerpts[chunk.document_id].append(chunk.content)
grouped_hits.setdefault(chunk.document_id, chunk)
documents = [
RetrievedDocument(
document_id=document_id,
title=grouped_hits[document_id].title,
source_url=grouped_hits[document_id].source_url,
source_kind=grouped_hits[document_id].source_kind,
excerpts=grouped_excerpts[document_id],
score=grouped_hits[document_id].score,
vector_score=grouped_hits[document_id].vector_score,
overlap_count=grouped_hits[document_id].overlap_count,
is_title_only=grouped_hits[document_id].is_title_only,
)
for document_id in grouped_excerpts
]
documents.sort(key=lambda item: item.score, reverse=True)
context.retrieved_documents = documents
insufficient = not has_sufficient_context(question, documents)
if insufficient:
context.citations = []
context.citation_ids = set()
context.retrieval_status = "insufficient"
return (
"No sufficiently grounded Blink Helsinki blog material was found for this request. "
"You may still answer with general professional marketing or branding guidance if that helps the user. "
"If you do, make clear that it is general guidance rather than Blink Helsinki's published view, and do not use any <doc-ref .../> tags."
)
context.citations = [
ChatCitation(
index=index,
document_id=document.document_id,
title=document.title,
source_url=document.source_url,
source_kind=document.source_kind,
)
for index, document in enumerate(documents, start=1)
]
context.citation_ids = {citation.document_id for citation in context.citations}
context.retrieval_status = "ok"
lines: list[str] = ["Use only the following verified Blink Helsinki material:"]
for document in documents:
lines.append(f'Document ID: {document.document_id}')
lines.append(f"Title: {document.title}")
if document.source_url:
lines.append(f"URL: {document.source_url}")
if document.is_title_only:
lines.append("Corpus note: only title metadata is available for this article in the local corpus.")
for excerpt in document.excerpts[:2]:
lines.append(f"Excerpt: {excerpt}")
lines.append("")
lines.append('When you rely on one of these documents, append <doc-ref id="DOCUMENT_ID"/> immediately after the sentence.')
return "\n".join(lines).strip()
class CEOInterestService:
@staticmethod
def _contact_payload(question: str) -> CEOContact:
ceo_email = get_settings().ceo_contact_email
return CEOContact(
email=ceo_email,
)
def log_interest(self, context: AgentContext, request_summary: str, user_request: str) -> ChatEmailNotification:
if context.email_notification and context.notified_ceo:
return context.email_notification
settings = get_settings()
notification_recipient = settings.lead_notification_to_email or settings.ceo_contact_email
contact = self._contact_payload(context.question)
notification = lead_notification_email_service.queue(
recipient_email=notification_recipient,
request_summary=request_summary,
user_request=user_request,
source_question=context.question,
)
logger.info(
"CEO_EMAIL_LOG %s",
json.dumps(
{
"recipient": notification_recipient,
"summary": request_summary,
"user_request": user_request,
"source_question": context.question,
"logged_at": notification.logged_at,
"delivery_mode": notification.mode,
"delivery_status": notification.status,
"provider": notification.provider,
},
ensure_ascii=False,
),
)
context.should_handoff = True
context.notified_ceo = True
context.contact = contact
context.email_notification = notification
return notification
_retrieval_service = RetrievalToolService()
_ceo_interest_service = CEOInterestService()
@function_tool
def retrieve_brand_context(ctx: RunContextWrapper[AgentContext], question: str) -> str:
"""Retrieve verified Blink Helsinki blog context before answering factual questions."""
return _retrieval_service.retrieve_brand_context(ctx.context, question)
@function_tool(needs_approval=True)
async def hand_off_ceo(
ctx: RunContextWrapper[AgentContext],
request_summary: str,
user_request: str,
) -> str:
"""Record a CEO follow-up when the user agrees to contact, asks for pricing, or wants Blink to follow up.
request_summary: Write this for the CEO email body. Summarize the conversation so far: what you and the user
explored (topic, constraints), ideas or solution directions you proposed, what the user intends to do or
achieve, and why a leadership or specialist conversation would help. Synthesize from the full thread—do
not only paste or paraphrase the user's last message alone.
user_request: The concrete follow-up lines: what the user wants Blink to do next, their explicit consent
to be contacted, pricing/contact ask, or a short verbatim quote of agreement when useful.
"""
context = ctx.context
info_parts = []
if context.user_name:
info_parts.append(f"Name: {context.user_name}")
if context.user_email:
info_parts.append(f"Email: {context.user_email}")
if context.user_phone:
info_parts.append(f"Phone: {context.user_phone}")
full_request = user_request
if info_parts:
full_request = "--- User Contact Info ---\n" + "\n".join(info_parts) + "\n\n--- User Request ---\n" + user_request
_ceo_interest_service.log_interest(context, request_summary=request_summary, user_request=full_request)
contact = context.contact
if contact:
return (
"CEO follow-up recorded successfully. "
f"Specialist contact is available: {contact.email}. "
"You can now reply to the user naturally and provide contact details as needed."
)
return (
"CEO follow-up recorded successfully. "
"You can now reply to the user naturally and confirm the specialist follow-up."
)