IntegraChat / backend /api /services /result_merger.py
nothingworry's picture
Multi-Tool Parallel Execution
6d531e9
raw
history blame
4.7 kB
"""
Result Merger Utility
Merges and ranks results from parallel tool execution (RAG + Web).
"""
from typing import List, Dict, Any, Optional
def merge_parallel_results(results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Merge results from parallel tool execution (RAG + Web).
Args:
results: Dictionary with keys like "rag" and "web" containing tool outputs
Returns:
List of merged context entries, sorted by score (descending)
"""
final_context = []
# Extract RAG results
if "rag" in results and results["rag"]:
rag_data = results["rag"]
# Handle different RAG response formats
if isinstance(rag_data, dict):
hits = rag_data.get("results") or rag_data.get("hits") or []
elif isinstance(rag_data, list):
hits = rag_data
else:
hits = []
for hit in hits:
if isinstance(hit, dict):
content = hit.get("text") or hit.get("content") or str(hit)
score = hit.get("score", 0.0)
doc_id = hit.get("doc_id") or hit.get("id")
source = hit.get("source") or hit.get("url") or "internal_doc"
else:
content = str(hit)
score = 0.5 # Default score for non-dict hits
doc_id = None
source = "internal_doc"
if content:
final_context.append({
"source": "internal_policy",
"text": content,
"score": float(score),
"doc_id": doc_id,
"source_url": source if isinstance(source, str) else None
})
# Extract Web results
if "web" in results and results["web"]:
web_data = results["web"]
# Handle different Web response formats
if isinstance(web_data, dict):
items = web_data.get("results") or web_data.get("items") or []
elif isinstance(web_data, list):
items = web_data
else:
items = []
for item in items:
if isinstance(item, dict):
title = item.get("title") or item.get("headline") or ""
snippet = item.get("snippet") or item.get("summary") or item.get("text") or ""
url = item.get("url") or item.get("link") or ""
# Web results get a baseline confidence score
score = item.get("score", 0.5)
else:
title = ""
snippet = str(item)
url = ""
score = 0.5
if snippet or title:
# Combine title and snippet for better context
text = f"{title}\n{snippet}" if title else snippet
final_context.append({
"source": "live_web",
"text": text,
"score": float(score),
"url": url,
"title": title
})
# Sort by score descending (highest relevance first)
final_context.sort(key=lambda x: x["score"], reverse=True)
return final_context
def format_merged_context_for_prompt(merged_context: List[Dict[str, Any]],
max_items: int = 10) -> str:
"""
Format merged context into a readable prompt section.
Args:
merged_context: List of merged context entries from merge_parallel_results
max_items: Maximum number of items to include
Returns:
Formatted string ready for LLM prompt
"""
if not merged_context:
return ""
sections = []
for entry in merged_context[:max_items]:
source_label = entry.get("source", "unknown")
text = entry.get("text", "")
score = entry.get("score", 0.0)
# Format based on source type
if source_label == "internal_policy":
source_url = entry.get("source_url")
if source_url:
sections.append(f"[INTERNAL DOCUMENT - {source_url}]\n{text}")
else:
sections.append(f"[INTERNAL DOCUMENT]\n{text}")
elif source_label == "live_web":
url = entry.get("url", "")
title = entry.get("title", "")
if url:
sections.append(f"[WEB SOURCE - {url}]\n{title}\n{text}")
else:
sections.append(f"[WEB SOURCE]\n{title}\n{text}")
else:
sections.append(f"[{source_label.upper()}]\n{text}")
return "\n\n---\n\n".join(sections)