Spaces:
Sleeping
Sleeping
File size: 4,702 Bytes
6d531e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
"""
Result Merger Utility
Merges and ranks results from parallel tool execution (RAG + Web).
"""
from typing import List, Dict, Any, Optional
def merge_parallel_results(results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Merge results from parallel tool execution (RAG + Web).
Args:
results: Dictionary with keys like "rag" and "web" containing tool outputs
Returns:
List of merged context entries, sorted by score (descending)
"""
final_context = []
# Extract RAG results
if "rag" in results and results["rag"]:
rag_data = results["rag"]
# Handle different RAG response formats
if isinstance(rag_data, dict):
hits = rag_data.get("results") or rag_data.get("hits") or []
elif isinstance(rag_data, list):
hits = rag_data
else:
hits = []
for hit in hits:
if isinstance(hit, dict):
content = hit.get("text") or hit.get("content") or str(hit)
score = hit.get("score", 0.0)
doc_id = hit.get("doc_id") or hit.get("id")
source = hit.get("source") or hit.get("url") or "internal_doc"
else:
content = str(hit)
score = 0.5 # Default score for non-dict hits
doc_id = None
source = "internal_doc"
if content:
final_context.append({
"source": "internal_policy",
"text": content,
"score": float(score),
"doc_id": doc_id,
"source_url": source if isinstance(source, str) else None
})
# Extract Web results
if "web" in results and results["web"]:
web_data = results["web"]
# Handle different Web response formats
if isinstance(web_data, dict):
items = web_data.get("results") or web_data.get("items") or []
elif isinstance(web_data, list):
items = web_data
else:
items = []
for item in items:
if isinstance(item, dict):
title = item.get("title") or item.get("headline") or ""
snippet = item.get("snippet") or item.get("summary") or item.get("text") or ""
url = item.get("url") or item.get("link") or ""
# Web results get a baseline confidence score
score = item.get("score", 0.5)
else:
title = ""
snippet = str(item)
url = ""
score = 0.5
if snippet or title:
# Combine title and snippet for better context
text = f"{title}\n{snippet}" if title else snippet
final_context.append({
"source": "live_web",
"text": text,
"score": float(score),
"url": url,
"title": title
})
# Sort by score descending (highest relevance first)
final_context.sort(key=lambda x: x["score"], reverse=True)
return final_context
def format_merged_context_for_prompt(merged_context: List[Dict[str, Any]],
max_items: int = 10) -> str:
"""
Format merged context into a readable prompt section.
Args:
merged_context: List of merged context entries from merge_parallel_results
max_items: Maximum number of items to include
Returns:
Formatted string ready for LLM prompt
"""
if not merged_context:
return ""
sections = []
for entry in merged_context[:max_items]:
source_label = entry.get("source", "unknown")
text = entry.get("text", "")
score = entry.get("score", 0.0)
# Format based on source type
if source_label == "internal_policy":
source_url = entry.get("source_url")
if source_url:
sections.append(f"[INTERNAL DOCUMENT - {source_url}]\n{text}")
else:
sections.append(f"[INTERNAL DOCUMENT]\n{text}")
elif source_label == "live_web":
url = entry.get("url", "")
title = entry.get("title", "")
if url:
sections.append(f"[WEB SOURCE - {url}]\n{title}\n{text}")
else:
sections.append(f"[WEB SOURCE]\n{title}\n{text}")
else:
sections.append(f"[{source_label.upper()}]\n{text}")
return "\n\n---\n\n".join(sections)
|