Spaces:
Sleeping
Sleeping
| """ | |
| Result Merger Utility | |
| Merges and ranks results from parallel tool execution (RAG + Web). | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| def merge_parallel_results(results: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Merge results from parallel tool execution (RAG + Web). | |
| Args: | |
| results: Dictionary with keys like "rag" and "web" containing tool outputs | |
| Returns: | |
| List of merged context entries, sorted by score (descending) | |
| """ | |
| final_context = [] | |
| # Extract RAG results | |
| if "rag" in results and results["rag"]: | |
| rag_data = results["rag"] | |
| # Handle different RAG response formats | |
| if isinstance(rag_data, dict): | |
| hits = rag_data.get("results") or rag_data.get("hits") or [] | |
| elif isinstance(rag_data, list): | |
| hits = rag_data | |
| else: | |
| hits = [] | |
| for hit in hits: | |
| if isinstance(hit, dict): | |
| content = hit.get("text") or hit.get("content") or str(hit) | |
| score = hit.get("score", 0.0) | |
| doc_id = hit.get("doc_id") or hit.get("id") | |
| source = hit.get("source") or hit.get("url") or "internal_doc" | |
| else: | |
| content = str(hit) | |
| score = 0.5 # Default score for non-dict hits | |
| doc_id = None | |
| source = "internal_doc" | |
| if content: | |
| final_context.append({ | |
| "source": "internal_policy", | |
| "text": content, | |
| "score": float(score), | |
| "doc_id": doc_id, | |
| "source_url": source if isinstance(source, str) else None | |
| }) | |
| # Extract Web results | |
| if "web" in results and results["web"]: | |
| web_data = results["web"] | |
| # Handle different Web response formats | |
| if isinstance(web_data, dict): | |
| items = web_data.get("results") or web_data.get("items") or [] | |
| elif isinstance(web_data, list): | |
| items = web_data | |
| else: | |
| items = [] | |
| for item in items: | |
| if isinstance(item, dict): | |
| title = item.get("title") or item.get("headline") or "" | |
| snippet = item.get("snippet") or item.get("summary") or item.get("text") or "" | |
| url = item.get("url") or item.get("link") or "" | |
| # Web results get a baseline confidence score | |
| score = item.get("score", 0.5) | |
| else: | |
| title = "" | |
| snippet = str(item) | |
| url = "" | |
| score = 0.5 | |
| if snippet or title: | |
| # Combine title and snippet for better context | |
| text = f"{title}\n{snippet}" if title else snippet | |
| final_context.append({ | |
| "source": "live_web", | |
| "text": text, | |
| "score": float(score), | |
| "url": url, | |
| "title": title | |
| }) | |
| # Sort by score descending (highest relevance first) | |
| final_context.sort(key=lambda x: x["score"], reverse=True) | |
| return final_context | |
| def format_merged_context_for_prompt(merged_context: List[Dict[str, Any]], | |
| max_items: int = 10) -> str: | |
| """ | |
| Format merged context into a readable prompt section. | |
| Args: | |
| merged_context: List of merged context entries from merge_parallel_results | |
| max_items: Maximum number of items to include | |
| Returns: | |
| Formatted string ready for LLM prompt | |
| """ | |
| if not merged_context: | |
| return "" | |
| sections = [] | |
| for entry in merged_context[:max_items]: | |
| source_label = entry.get("source", "unknown") | |
| text = entry.get("text", "") | |
| score = entry.get("score", 0.0) | |
| # Format based on source type | |
| if source_label == "internal_policy": | |
| source_url = entry.get("source_url") | |
| if source_url: | |
| sections.append(f"[INTERNAL DOCUMENT - {source_url}]\n{text}") | |
| else: | |
| sections.append(f"[INTERNAL DOCUMENT]\n{text}") | |
| elif source_label == "live_web": | |
| url = entry.get("url", "") | |
| title = entry.get("title", "") | |
| if url: | |
| sections.append(f"[WEB SOURCE - {url}]\n{title}\n{text}") | |
| else: | |
| sections.append(f"[WEB SOURCE]\n{title}\n{text}") | |
| else: | |
| sections.append(f"[{source_label.upper()}]\n{text}") | |
| return "\n\n---\n\n".join(sections) | |