""" Result Merger Utility Merges and ranks results from parallel tool execution (RAG + Web). """ from typing import List, Dict, Any, Optional def merge_parallel_results(results: Dict[str, Any]) -> List[Dict[str, Any]]: """ Merge results from parallel tool execution (RAG + Web). Args: results: Dictionary with keys like "rag" and "web" containing tool outputs Returns: List of merged context entries, sorted by score (descending) """ final_context = [] # Extract RAG results if "rag" in results and results["rag"]: rag_data = results["rag"] # Handle different RAG response formats if isinstance(rag_data, dict): hits = rag_data.get("results") or rag_data.get("hits") or [] elif isinstance(rag_data, list): hits = rag_data else: hits = [] for hit in hits: if isinstance(hit, dict): content = hit.get("text") or hit.get("content") or str(hit) score = hit.get("score", 0.0) doc_id = hit.get("doc_id") or hit.get("id") source = hit.get("source") or hit.get("url") or "internal_doc" else: content = str(hit) score = 0.5 # Default score for non-dict hits doc_id = None source = "internal_doc" if content: final_context.append({ "source": "internal_policy", "text": content, "score": float(score), "doc_id": doc_id, "source_url": source if isinstance(source, str) else None }) # Extract Web results if "web" in results and results["web"]: web_data = results["web"] # Handle different Web response formats if isinstance(web_data, dict): items = web_data.get("results") or web_data.get("items") or [] elif isinstance(web_data, list): items = web_data else: items = [] for item in items: if isinstance(item, dict): title = item.get("title") or item.get("headline") or "" snippet = item.get("snippet") or item.get("summary") or item.get("text") or "" url = item.get("url") or item.get("link") or "" # Web results get a baseline confidence score score = item.get("score", 0.5) else: title = "" snippet = str(item) url = "" score = 0.5 if snippet or title: # Combine title and snippet for better context text = f"{title}\n{snippet}" if title else snippet final_context.append({ "source": "live_web", "text": text, "score": float(score), "url": url, "title": title }) # Sort by score descending (highest relevance first) final_context.sort(key=lambda x: x["score"], reverse=True) return final_context def format_merged_context_for_prompt(merged_context: List[Dict[str, Any]], max_items: int = 10) -> str: """ Format merged context into a readable prompt section. Args: merged_context: List of merged context entries from merge_parallel_results max_items: Maximum number of items to include Returns: Formatted string ready for LLM prompt """ if not merged_context: return "" sections = [] for entry in merged_context[:max_items]: source_label = entry.get("source", "unknown") text = entry.get("text", "") score = entry.get("score", 0.0) # Format based on source type if source_label == "internal_policy": source_url = entry.get("source_url") if source_url: sections.append(f"[INTERNAL DOCUMENT - {source_url}]\n{text}") else: sections.append(f"[INTERNAL DOCUMENT]\n{text}") elif source_label == "live_web": url = entry.get("url", "") title = entry.get("title", "") if url: sections.append(f"[WEB SOURCE - {url}]\n{title}\n{text}") else: sections.append(f"[WEB SOURCE]\n{title}\n{text}") else: sections.append(f"[{source_label.upper()}]\n{text}") return "\n\n---\n\n".join(sections)