Spaces:
Running
Running
| from __future__ import annotations | |
| import re | |
| import time | |
| import logging | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| import html | |
| from urllib.parse import quote_plus, urlparse | |
| from urllib.parse import parse_qs, unquote | |
| import httpx | |
| from app.config import ( | |
| TAVILY_API_KEY, | |
| NEWSAPI_KEY, | |
| ALPHAVANTAGE_API_KEY, | |
| JINA_READER_BASE, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Module-level connection pool | |
| _http_pool = httpx.Client( | |
| timeout=30, | |
| limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), | |
| ) | |
| # Simple TTL cache for market quotes (5 min) | |
| _quote_cache: Dict[str, Dict[str, Any]] = {} # {symbol: {"data": ..., "ts": ...}} | |
| _QUOTE_TTL = 300 # seconds | |
| _deep_read_cache: Dict[str, Dict[str, Any]] = {} | |
| _DEEP_READ_TTL = 900 | |
| _deep_search_cache: Dict[str, Dict[str, Any]] = {} | |
| _DEEP_SEARCH_TTL = 600 | |
| URL_PATTERN = re.compile(r"https?://\S+") | |
| TICKER_PATTERN = re.compile(r"\$([A-Z]{1,5})\b") | |
| _GENERIC_TRUSTED_DOMAINS = { | |
| "reuters.com": 0.95, | |
| "apnews.com": 0.92, | |
| "bloomberg.com": 0.95, | |
| "ft.com": 0.92, | |
| "wsj.com": 0.92, | |
| "cnbc.com": 0.86, | |
| "marketwatch.com": 0.84, | |
| "investopedia.com": 0.8, | |
| "wikipedia.org": 0.72, | |
| "sec.gov": 1.0, | |
| "federalreserve.gov": 1.0, | |
| "treasury.gov": 1.0, | |
| "ecb.europa.eu": 1.0, | |
| "imf.org": 0.98, | |
| "worldbank.org": 0.98, | |
| "nvidia.com": 0.9, | |
| "investor.nvidia.com": 0.97, | |
| } | |
| _LOWER_CONFIDENCE_DOMAINS = { | |
| "substack.com": 0.58, | |
| "blogspot.com": 0.45, | |
| "medium.com": 0.55, | |
| "dev.to": 0.5, | |
| } | |
| _BROWSER_FAVOR_DOMAINS = { | |
| "cnbc.com", | |
| "finance.yahoo.com", | |
| "seekingalpha.com", | |
| "futurumgroup.com", | |
| "marketwatch.com", | |
| } | |
| def extract_urls(text: str) -> List[str]: | |
| return URL_PATTERN.findall(text or "") | |
| def extract_ticker(text: str) -> Optional[str]: | |
| match = TICKER_PATTERN.search(text or "") | |
| if match: | |
| return match.group(1) | |
| return None | |
| def jina_read(url: str) -> str: | |
| try: | |
| target = url.replace("https://", "").replace("http://", "") | |
| full_url = f"{JINA_READER_BASE}{target}" | |
| with httpx.Client(timeout=30) as client: | |
| response = client.get(full_url) | |
| if response.status_code >= 400: | |
| return "" | |
| return response.text[:4000] | |
| except Exception: | |
| return "" | |
| def duckduckgo_search(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| results: List[Dict[str, Any]] = [] | |
| try: | |
| search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}" | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (compatible; Janus/1.0; +https://janus.local)" | |
| } | |
| response = _http_pool.get(search_url, headers=headers, follow_redirects=True) | |
| if response.status_code >= 400: | |
| return [] | |
| pattern = re.compile( | |
| r'class="result__a"[^>]*href="([^"]+)"[^>]*>(.*?)</a>', re.IGNORECASE | |
| ) | |
| seen = set() | |
| for match in pattern.finditer(response.text): | |
| raw_url = html.unescape(match.group(1) or "") | |
| url = raw_url | |
| if raw_url.startswith("//duckduckgo.com/l/?"): | |
| parsed = urlparse(f"https:{raw_url}") | |
| uddg = parse_qs(parsed.query).get("uddg", [""])[0] | |
| url = unquote(uddg) if uddg else "" | |
| title = re.sub(r"<[^>]+>", "", match.group(2)).strip() | |
| if not url or not url.startswith("http") or url in seen: | |
| continue | |
| seen.add(url) | |
| results.append({"title": title or url, "url": url, "source": "duckduckgo"}) | |
| if len(results) >= max_results: | |
| break | |
| except Exception as e: | |
| logger.warning(f"DuckDuckGo search error: {e}") | |
| return results | |
| def _direct_page_extract(url: str) -> str: | |
| try: | |
| headers = {"User-Agent": "Mozilla/5.0 (compatible; Janus/1.0)"} | |
| response = _http_pool.get(url, headers=headers, follow_redirects=True) | |
| if response.status_code >= 400: | |
| return "" | |
| text = re.sub(r"<script[^>]*>.*?</script>", "", response.text, flags=re.DOTALL) | |
| text = re.sub(r"<style[^>]*>.*?</style>", "", text, flags=re.DOTALL) | |
| text = re.sub(r"<[^>]+>", " ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text[:8000] | |
| except Exception: | |
| return "" | |
| def _sanitize_extracted_text(text: str, limit: int = 8000) -> str: | |
| cleaned = str(text or "") | |
| noise_patterns = [ | |
| r"Title:\s*", | |
| r"URL Source:\s*[^\n]+", | |
| r"Published Time:\s*[^\n]+", | |
| r"Markdown Content:\s*", | |
| r"\bSkip Navigation\b", | |
| r"\bLivestream\b", | |
| r"\bMenu\b", | |
| r"\bSearch\b", | |
| r"\bSign In\b", | |
| r"\bSubscribe\b", | |
| r"\[Image [^\]]+\]", | |
| ] | |
| for pattern in noise_patterns: | |
| cleaned = re.sub(pattern, " ", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", cleaned) | |
| cleaned = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", cleaned) | |
| cleaned = re.sub(r"[#*_`>-]+", " ", cleaned) | |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| return cleaned[:limit] | |
| def _best_excerpt(text: str, limit: int = 280) -> str: | |
| cleaned = _sanitize_extracted_text(text, limit=3000) | |
| if not cleaned: | |
| return "" | |
| sentences = re.split(r"(?<=[.!?])\s+", cleaned) | |
| for sentence in sentences: | |
| candidate = sentence.strip().lstrip("|:- ") | |
| if len(candidate) < 60: | |
| continue | |
| lowered = candidate.lower() | |
| if any( | |
| token in lowered | |
| for token in [ | |
| "cookie", | |
| "subscribe", | |
| "sign in", | |
| "javascript", | |
| "skip to main content", | |
| "accessibility", | |
| "stock advisor", | |
| "join the motley fool", | |
| ] | |
| ): | |
| continue | |
| return candidate[:limit] | |
| return cleaned[:limit] | |
| def crawler_read(url: str) -> Dict[str, Any]: | |
| try: | |
| from app.services.crawler import JanusCrawler | |
| crawler = JanusCrawler() | |
| result = crawler.crawl_sync(url) | |
| if result.success: | |
| return { | |
| "title": result.title or url, | |
| "content": _sanitize_extracted_text(result.markdown, limit=12000), | |
| "links": result.links[:20], | |
| "metadata": result.metadata, | |
| "source": "janus_crawler", | |
| } | |
| except Exception as e: | |
| logger.debug(f"Janus crawler unavailable for {url}: {e}") | |
| return {} | |
| def _base_domain(domain: str) -> str: | |
| parts = [part for part in (domain or "").lower().split(".") if part] | |
| if len(parts) >= 2: | |
| return ".".join(parts[-2:]) | |
| return domain.lower() | |
| def score_source_credibility(url: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| metadata = metadata or {} | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace("www.", "") | |
| root = _base_domain(domain) | |
| try: | |
| from app.domain_packs.finance.source_checker import check_source_credibility | |
| finance_assessment = check_source_credibility(url) | |
| if finance_assessment.get("reason") == "known_trusted_source": | |
| return { | |
| "score": finance_assessment.get("credibility_score", 0.5), | |
| "reason": finance_assessment.get("reason", "known_trusted_source"), | |
| "domain": finance_assessment.get("domain", domain), | |
| } | |
| except Exception: | |
| pass | |
| if domain in _GENERIC_TRUSTED_DOMAINS or root in _GENERIC_TRUSTED_DOMAINS: | |
| return { | |
| "score": _GENERIC_TRUSTED_DOMAINS.get(domain, _GENERIC_TRUSTED_DOMAINS.get(root, 0.9)), | |
| "reason": "trusted_domain", | |
| "domain": domain, | |
| } | |
| if domain.endswith(".gov") or domain.endswith(".edu"): | |
| return {"score": 0.96, "reason": "government_or_education", "domain": domain} | |
| if domain in _LOWER_CONFIDENCE_DOMAINS or root in _LOWER_CONFIDENCE_DOMAINS: | |
| return { | |
| "score": _LOWER_CONFIDENCE_DOMAINS.get(domain, _LOWER_CONFIDENCE_DOMAINS.get(root, 0.5)), | |
| "reason": "opinion_or_blog_domain", | |
| "domain": domain, | |
| } | |
| score = 0.58 | |
| reason = "unknown_domain" | |
| path = parsed.path.lower() | |
| if any(token in path for token in ["/news", "/press", "/press-release", "/investor", "/earnings"]): | |
| score += 0.08 | |
| reason = "official_or_news_path" | |
| if metadata.get("published_at"): | |
| score += 0.04 | |
| if metadata.get("author"): | |
| score += 0.02 | |
| return {"score": min(score, 0.9), "reason": reason, "domain": domain} | |
| def plan_deep_research(query: str, max_results: int = 5, follow_links: int = 1) -> Dict[str, Any]: | |
| query_lower = (query or "").lower() | |
| deep_terms = [ | |
| "across the web", | |
| "deep web", | |
| "research", | |
| "investigate", | |
| "compare", | |
| "earnings", | |
| "guidance", | |
| "supply chain", | |
| "data center", | |
| "competition", | |
| "regulation", | |
| "policy", | |
| ] | |
| score = sum(1 for term in deep_terms if term in query_lower) | |
| word_count = len(re.findall(r"[a-z0-9_]+", query_lower)) | |
| if word_count > 10: | |
| score += 1 | |
| planned_results = max_results | |
| planned_hops = follow_links | |
| if score >= 3: | |
| planned_results = max(max_results, 6) | |
| planned_hops = max(follow_links, 2) | |
| elif score >= 1: | |
| planned_results = max(max_results, 4) | |
| planned_hops = max(follow_links, 1) | |
| if os.getenv("JANUS_PREFER_BROWSER_CRAWL", "false").lower() == "true": | |
| planned_results = min(planned_results, 4) | |
| planned_hops = min(planned_hops, 1) | |
| return { | |
| "max_results": min(planned_results, 8), | |
| "follow_links": min(planned_hops, 2), | |
| } | |
| def expand_research_queries(query: str, max_variants: int = 4) -> List[str]: | |
| query = (query or "").strip() | |
| if not query: | |
| return [] | |
| if os.getenv("JANUS_PREFER_BROWSER_CRAWL", "false").lower() == "true": | |
| max_variants = min(max_variants, 2) | |
| variants = [query] | |
| lower = query.lower() | |
| if any(term in lower for term in ["earnings", "guidance", "revenue", "margin"]): | |
| variants.extend( | |
| [ | |
| f"{query} investor relations", | |
| f"{query} official results", | |
| f"{query} analyst reaction", | |
| ] | |
| ) | |
| elif any(term in lower for term in ["stock", "company", "market", "demand", "supply", "ai"]): | |
| variants.extend( | |
| [ | |
| f"{query} official source", | |
| f"{query} latest analysis", | |
| f"{query} news and filings", | |
| ] | |
| ) | |
| else: | |
| variants.extend( | |
| [ | |
| f"{query} official source", | |
| f"{query} latest developments", | |
| ] | |
| ) | |
| deduped = [] | |
| seen = set() | |
| for item in variants: | |
| normalized = item.strip().lower() | |
| if normalized and normalized not in seen: | |
| seen.add(normalized) | |
| deduped.append(item.strip()) | |
| if len(deduped) >= max_variants: | |
| break | |
| return deduped | |
| def _should_prefer_browser(url: str, query: str = "") -> bool: | |
| if os.getenv("JANUS_PREFER_BROWSER_CRAWL", "false").lower() == "true": | |
| return True | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace("www.", "") | |
| root = _base_domain(domain) | |
| if domain in _BROWSER_FAVOR_DOMAINS or root in _BROWSER_FAVOR_DOMAINS: | |
| return True | |
| query_lower = (query or "").lower() | |
| return any(term in query_lower for term in ["headline", "latest", "live", "filing", "press release"]) | |
| def _choose_follow_links( | |
| links: List[str], base_url: str, query: str, limit: int = 1 | |
| ) -> List[str]: | |
| query_words = { | |
| token for token in re.findall(r"[a-z0-9_]+", query.lower()) if len(token) >= 4 | |
| } | |
| base_netloc = urlparse(base_url).netloc | |
| candidates = [] | |
| seen = set() | |
| for link in links: | |
| parsed = urlparse(link) | |
| if parsed.scheme not in {"http", "https"}: | |
| continue | |
| if not parsed.netloc or parsed.netloc != base_netloc: | |
| continue | |
| if link in seen: | |
| continue | |
| seen.add(link) | |
| score = sum(1 for word in query_words if word in link.lower()) | |
| if any(token in link.lower() for token in ["earnings", "results", "revenue", "guidance", "ai", "data-center", "investor", "quarter"]): | |
| score += 2 | |
| candidates.append((score, link)) | |
| candidates.sort(key=lambda item: item[0], reverse=True) | |
| return [link for _, link in candidates[:limit]] | |
| def _follow_relevant_links( | |
| seed_url: str, | |
| links: List[str], | |
| query: str, | |
| remaining_hops: int, | |
| ) -> List[Dict[str, Any]]: | |
| if remaining_hops <= 0 or not links: | |
| return [] | |
| reads = [] | |
| for link in _choose_follow_links(links, seed_url, query, limit=1): | |
| nested = deep_read_url(link, follow_links=0, query=query) | |
| if not nested: | |
| continue | |
| credibility = score_source_credibility(link, nested.get("metadata", {})) | |
| nested["credibility"] = credibility | |
| nested["domain"] = credibility.get("domain") | |
| reads.append( | |
| { | |
| "url": link, | |
| "title": nested.get("title") or link, | |
| "content": nested.get("content", "")[:2500], | |
| "credibility_score": credibility.get("score", 0.5), | |
| "credibility_reason": credibility.get("reason", "unknown_domain"), | |
| } | |
| ) | |
| if remaining_hops > 1 and nested.get("related_reads"): | |
| reads.extend(nested.get("related_reads", [])[:1]) | |
| return reads | |
| def deep_read_url(url: str, follow_links: int = 0, query: str = "") -> Dict[str, Any]: | |
| cache_key = f"{url}|{follow_links}|{query[:120]}|{os.getenv('JANUS_PREFER_BROWSER_CRAWL', 'false')}" | |
| cached = _deep_read_cache.get(cache_key) | |
| if cached and (time.time() - cached["ts"]) < _DEEP_READ_TTL: | |
| return cached["data"] | |
| prefer_browser = _should_prefer_browser(url, query) | |
| content = "" | |
| source = "" | |
| links: List[str] = [] | |
| metadata: Dict[str, Any] = {} | |
| title = url | |
| if prefer_browser: | |
| crawled = crawler_read(url) | |
| if crawled: | |
| content = crawled.get("content", "") | |
| links = crawled.get("links", []) | |
| metadata = crawled.get("metadata", {}) | |
| title = crawled.get("title") or title | |
| source = crawled.get("source", "crawler") | |
| if not content: | |
| content = jina_read(url) | |
| source = "jina_reader" if content else source | |
| if not content: | |
| crawled = crawler_read(url) | |
| if crawled: | |
| content = crawled.get("content", "") | |
| links = crawled.get("links", []) | |
| metadata = crawled.get("metadata", {}) | |
| title = crawled.get("title") or title | |
| source = crawled.get("source", "crawler") | |
| if not content: | |
| content = _direct_page_extract(url) | |
| source = "direct_fetch" if content else source | |
| if not content: | |
| return {} | |
| related_reads = [] | |
| if follow_links > 0 and links: | |
| related_reads = _follow_relevant_links(url, links, query, remaining_hops=follow_links) | |
| credibility = score_source_credibility(url, metadata) | |
| result = { | |
| "title": metadata.get("title") or title, | |
| "url": url, | |
| "content": _sanitize_extracted_text(content, limit=8000), | |
| "source": source, | |
| "metadata": metadata, | |
| "domain": credibility.get("domain"), | |
| "credibility": credibility, | |
| "related_reads": related_reads, | |
| } | |
| _deep_read_cache[cache_key] = {"data": result, "ts": time.time()} | |
| return result | |
| def deep_web_search( | |
| query: str, max_results: int = 5, follow_links: int = 1 | |
| ) -> List[Dict[str, Any]]: | |
| cache_key = f"{query}|{max_results}|{follow_links}|{os.getenv('JANUS_PREFER_BROWSER_CRAWL', 'false')}" | |
| cached = _deep_search_cache.get(cache_key) | |
| if cached and (time.time() - cached["ts"]) < _DEEP_SEARCH_TTL: | |
| return cached["data"] | |
| plan = plan_deep_research(query, max_results=max_results, follow_links=follow_links) | |
| max_results = plan["max_results"] | |
| follow_links = plan["follow_links"] | |
| started_at = time.time() | |
| candidates: List[Dict[str, Any]] = [] | |
| tavily_results = tavily_search(query, max_results=max_results) | |
| for item in tavily_results: | |
| candidates.append( | |
| { | |
| "title": item.get("title", item.get("url", "")), | |
| "url": item.get("url", ""), | |
| "snippet": item.get("content", ""), | |
| "source": "tavily", | |
| } | |
| ) | |
| ddg_results = duckduckgo_search(query, max_results=max_results) | |
| candidates.extend(ddg_results) | |
| seen_urls = set() | |
| merged = [] | |
| for item in candidates: | |
| url = item.get("url", "") | |
| if not url or url in seen_urls: | |
| continue | |
| seen_urls.add(url) | |
| merged.append(item) | |
| if len(merged) >= max_results: | |
| break | |
| enriched = [] | |
| for item in merged: | |
| if time.time() - started_at > 20: | |
| break | |
| deep = deep_read_url(item.get("url", ""), follow_links=follow_links, query=query) | |
| if not deep: | |
| continue | |
| enriched.append( | |
| { | |
| "title": deep.get("title") or item.get("title", "Untitled"), | |
| "url": item.get("url", ""), | |
| "content": deep.get("content", "") or item.get("snippet", ""), | |
| "source": item.get("source", deep.get("source", "web")), | |
| "domain": deep.get("domain"), | |
| "credibility_score": (deep.get("credibility") or {}).get("score", 0.5), | |
| "credibility_reason": (deep.get("credibility") or {}).get("reason", "unknown_domain"), | |
| "metadata": deep.get("metadata", {}), | |
| "related_reads": deep.get("related_reads", []), | |
| } | |
| ) | |
| enriched.sort( | |
| key=lambda item: ( | |
| item.get("credibility_score", 0.0), | |
| len(item.get("content", "")), | |
| ), | |
| reverse=True, | |
| ) | |
| result = enriched[:max_results] | |
| _deep_search_cache[cache_key] = {"data": result, "ts": time.time()} | |
| return result | |
| def deep_web_research_bundle( | |
| query: str, | |
| max_results: int = 6, | |
| follow_links: int = 1, | |
| max_variants: int = 4, | |
| ) -> Dict[str, Any]: | |
| plan = plan_deep_research(query, max_results=max_results, follow_links=follow_links) | |
| variants = expand_research_queries(query, max_variants=max_variants) | |
| merged: Dict[str, Dict[str, Any]] = {} | |
| started_at = time.time() | |
| for variant in variants: | |
| if time.time() - started_at > 30: | |
| break | |
| results = deep_web_search( | |
| variant, | |
| max_results=max(3, min(plan["max_results"], max_results)), | |
| follow_links=plan["follow_links"], | |
| ) | |
| for item in results: | |
| url = item.get("url", "") | |
| if not url: | |
| continue | |
| current = merged.get(url) | |
| candidate = {**item, "query_variant": variant} | |
| if current is None or candidate.get("credibility_score", 0.0) > current.get( | |
| "credibility_score", 0.0 | |
| ): | |
| merged[url] = candidate | |
| ranked = sorted( | |
| merged.values(), | |
| key=lambda item: ( | |
| item.get("credibility_score", 0.0), | |
| len(item.get("content", "")), | |
| ), | |
| reverse=True, | |
| ) | |
| if ranked and ranked[0].get("credibility_score", 0.0) >= 0.85 and len(ranked) >= 1: | |
| if os.getenv("JANUS_PREFER_BROWSER_CRAWL", "false").lower() == "true": | |
| break | |
| results = sorted( | |
| merged.values(), | |
| key=lambda item: ( | |
| item.get("credibility_score", 0.0), | |
| len(item.get("content", "")), | |
| ), | |
| reverse=True, | |
| )[:max_results] | |
| synthesis = synthesize_deep_web_results(query, results) | |
| return { | |
| "query": query, | |
| "query_variants": variants, | |
| "results": results, | |
| "synthesis": synthesis, | |
| } | |
| def synthesize_deep_web_results(query: str, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| if not results: | |
| return { | |
| "summary": "No credible deep-web results retrieved.", | |
| "top_sources": [], | |
| "key_points": [], | |
| "avg_credibility": 0.0, | |
| } | |
| top_sources = [ | |
| { | |
| "title": item.get("title"), | |
| "url": item.get("url"), | |
| "domain": item.get("domain"), | |
| "credibility_score": item.get("credibility_score", 0.0), | |
| "credibility_reason": item.get("credibility_reason", "unknown"), | |
| } | |
| for item in results[:4] | |
| ] | |
| avg_credibility = sum(item.get("credibility_score", 0.0) for item in results[: min(len(results), 5)]) / max(min(len(results), 5), 1) | |
| key_points = [] | |
| for item in results[:3]: | |
| point = _best_excerpt(item.get("content", ""), limit=280) | |
| if point: | |
| key_points.append( | |
| { | |
| "point": point, | |
| "source": item.get("title") or item.get("domain"), | |
| "credibility_score": item.get("credibility_score", 0.0), | |
| } | |
| ) | |
| source_lines = "; ".join( | |
| f"{item.get('title', item.get('domain', 'source'))} [{item.get('credibility_score', 0.0):.2f}]" | |
| for item in top_sources[:3] | |
| ) | |
| summary = ( | |
| f"Janus reviewed {len(results)} deep public-web results for '{query}'. " | |
| f"Highest-ranked sources: {source_lines}. " | |
| f"Average credibility of the top results: {avg_credibility:.2f}." | |
| ) | |
| return { | |
| "summary": summary, | |
| "top_sources": top_sources, | |
| "key_points": key_points, | |
| "avg_credibility": round(avg_credibility, 3), | |
| } | |
| def tavily_search(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| if not TAVILY_API_KEY: | |
| return [] | |
| try: | |
| payload = { | |
| "api_key": TAVILY_API_KEY, | |
| "query": query, | |
| "search_depth": "basic", | |
| "max_results": max_results, | |
| } | |
| response = _http_pool.post("https://api.tavily.com/search", json=payload) | |
| if response.status_code >= 400: | |
| logger.warning(f"Tavily search returned {response.status_code}") | |
| return [] | |
| data = response.json() | |
| return data.get("results", []) | |
| except Exception as e: | |
| logger.error(f"Tavily search error: {e}") | |
| return [] | |
| def news_search(query: str, page_size: int = 5) -> List[Dict[str, Any]]: | |
| if not NEWSAPI_KEY: | |
| return [] | |
| try: | |
| params = { | |
| "q": query, | |
| "pageSize": page_size, | |
| "language": "en", | |
| "sortBy": "publishedAt", | |
| "apiKey": NEWSAPI_KEY, | |
| } | |
| response = _http_pool.get("https://newsapi.org/v2/everything", params=params) | |
| if response.status_code >= 400: | |
| logger.warning(f"NewsAPI returned {response.status_code}") | |
| return [] | |
| data = response.json() | |
| return data.get("articles", []) | |
| except Exception as e: | |
| logger.error(f"NewsAPI error: {e}") | |
| return [] | |
| def market_quote(symbol: str) -> Dict[str, Any]: | |
| if not ALPHAVANTAGE_API_KEY or not symbol: | |
| return {} | |
| # Check cache first | |
| cached = _quote_cache.get(symbol) | |
| if cached and (time.time() - cached["ts"]) < _QUOTE_TTL: | |
| logger.debug(f"Market quote cache hit: {symbol}") | |
| return cached["data"] | |
| try: | |
| params = { | |
| "function": "GLOBAL_QUOTE", | |
| "symbol": symbol, | |
| "apikey": ALPHAVANTAGE_API_KEY, | |
| } | |
| response = _http_pool.get("https://www.alphavantage.co/query", params=params) | |
| if response.status_code >= 400: | |
| logger.warning(f"Alpha Vantage returned {response.status_code}") | |
| return {} | |
| data = response.json() | |
| quote = data.get("Global Quote", {}) | |
| # Cache the result | |
| _quote_cache[symbol] = {"data": quote, "ts": time.time()} | |
| return quote | |
| except Exception as e: | |
| logger.error(f"Alpha Vantage error: {e}") | |
| return {} | |
| def build_external_context(user_input: str) -> str: | |
| chunks: List[str] = [] | |
| urls = extract_urls(user_input) | |
| for url in urls[:2]: | |
| content = jina_read(url) | |
| if content: | |
| chunks.append(f"[Jina Reader for {url}]\n{content}") | |
| search_results = tavily_search(user_input, max_results=4) | |
| if search_results: | |
| formatted = [] | |
| for item in search_results[:4]: | |
| formatted.append( | |
| f"- {item.get('title', 'Untitled')}\n {item.get('url', '')}\n {item.get('content', '')[:300]}" | |
| ) | |
| chunks.append("[Tavily Search]\n" + "\n".join(formatted)) | |
| deep_results = deep_web_search(user_input, max_results=3, follow_links=1) | |
| if deep_results: | |
| synthesized = synthesize_deep_web_results(user_input, deep_results) | |
| chunks.append(f"[Deep Web Brief]\n{synthesized.get('summary', '')}") | |
| formatted = [] | |
| for item in deep_results[:3]: | |
| related = item.get("related_reads", []) | |
| related_text = "" | |
| if related: | |
| related_text = "\n Related: " + " | ".join( | |
| f"{r.get('url', '')} [{r.get('credibility_score', 0.0):.2f}]: {str(r.get('content', ''))[:140]}" for r in related[:1] | |
| ) | |
| formatted.append( | |
| f"- {item.get('title', 'Untitled')}\n {item.get('url', '')}\n Credibility: {item.get('credibility_score', 0.0):.2f} ({item.get('credibility_reason', 'unknown')})\n {str(item.get('content', ''))[:500]}{related_text}" | |
| ) | |
| chunks.append("[Deep Web Search]\n" + "\n".join(formatted)) | |
| articles = news_search(user_input, page_size=4) | |
| if articles: | |
| formatted = [] | |
| for item in articles[:4]: | |
| formatted.append( | |
| f"- {item.get('title', 'Untitled')}\n {item.get('url', '')}\n {str(item.get('description', ''))[:300]}" | |
| ) | |
| chunks.append("[NewsAPI]\n" + "\n".join(formatted)) | |
| ticker = extract_ticker(user_input) | |
| if ticker: | |
| quote = market_quote(ticker) | |
| if quote: | |
| chunks.append(f"[Alpha Vantage Quote for {ticker}]\n{quote}") | |
| if not chunks: | |
| return "No external API context available." | |
| return "\n\n".join(chunks) | |