ask-the-web-agent / src /synthesis /formatter.py
debashis2007's picture
Upload folder using huggingface_hub
75bea1c verified
from __future__ import annotations
"""Response formatter for output generation."""
from typing import Any
class ResponseFormatter:
"""Formats responses for different output formats."""
def __init__(self):
"""Initialize the formatter."""
pass
def format_markdown(
self,
answer: str,
sources: list[dict[str, str]] | None = None,
follow_ups: list[str] | None = None,
confidence: float | None = None,
) -> str:
"""Format response as Markdown.
Args:
answer: Main answer text
sources: Optional list of source citations
follow_ups: Optional follow-up questions
confidence: Optional confidence score
Returns:
Formatted Markdown string
"""
parts = [answer]
# Add sources
if sources:
parts.append("\n\n---\n**Sources:**")
for i, source in enumerate(sources, 1):
title = source.get("title", "Source")
url = source.get("url", "")
if url:
parts.append(f"\n[{i}] [{title}]({url})")
else:
parts.append(f"\n[{i}] {title}")
# Add follow-up questions
if follow_ups:
parts.append("\n\n---\n*Related questions you might find helpful:*")
for q in follow_ups:
parts.append(f"\n- {q}")
# Add confidence indicator
if confidence is not None:
confidence_text = self._confidence_to_text(confidence)
parts.append(f"\n\n*Confidence: {confidence_text}*")
return "".join(parts)
def format_json(
self,
answer: str,
sources: list[dict[str, str]] | None = None,
follow_ups: list[str] | None = None,
confidence: float | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Format response as JSON-serializable dictionary.
Args:
answer: Main answer text
sources: Optional list of source citations
follow_ups: Optional follow-up questions
confidence: Optional confidence score
metadata: Optional additional metadata
Returns:
Dictionary with response data
"""
result: dict[str, Any] = {
"answer": answer,
"sources": sources or [],
"follow_up_questions": follow_ups or [],
}
if confidence is not None:
result["confidence"] = confidence
result["confidence_text"] = self._confidence_to_text(confidence)
if metadata:
result["metadata"] = metadata
return result
def format_plain_text(
self,
answer: str,
sources: list[dict[str, str]] | None = None,
) -> str:
"""Format response as plain text.
Args:
answer: Main answer text
sources: Optional list of source citations
Returns:
Plain text string
"""
parts = [answer]
if sources:
parts.append("\n\nSources:")
for i, source in enumerate(sources, 1):
title = source.get("title", "Source")
url = source.get("url", "")
parts.append(f"\n{i}. {title}")
if url:
parts.append(f"\n {url}")
return "".join(parts)
def format_streaming_chunk(self, chunk: str, is_final: bool = False) -> dict[str, Any]:
"""Format a streaming response chunk.
Args:
chunk: Text chunk
is_final: Whether this is the final chunk
Returns:
Dictionary with chunk data
"""
return {
"type": "chunk",
"content": chunk,
"is_final": is_final,
}
def _confidence_to_text(self, confidence: float) -> str:
"""Convert confidence score to text description.
Args:
confidence: Score from 0.0 to 1.0
Returns:
Text description
"""
if confidence >= 0.9:
return "Very High"
elif confidence >= 0.7:
return "High"
elif confidence >= 0.5:
return "Moderate"
elif confidence >= 0.3:
return "Low"
else:
return "Very Low"
def truncate_answer(self, answer: str, max_length: int = 2000) -> str:
"""Truncate answer to maximum length.
Args:
answer: Answer text
max_length: Maximum character length
Returns:
Truncated answer
"""
if len(answer) <= max_length:
return answer
# Truncate at word boundary
truncated = answer[:max_length]
last_space = truncated.rfind(" ")
if last_space > max_length * 0.8: # Don't truncate too much
truncated = truncated[:last_space]
return truncated + "..."
def highlight_keywords(self, text: str, keywords: list[str]) -> str:
"""Highlight keywords in text using markdown bold.
Args:
text: Input text
keywords: Keywords to highlight
Returns:
Text with highlighted keywords
"""
result = text
for keyword in keywords:
# Case-insensitive replacement while preserving case
import re
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
result = pattern.sub(lambda m: f"**{m.group()}**", result)
return result