Spaces:
Sleeping
Sleeping
File size: 8,203 Bytes
dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb d15462e dff68cb 8c3fb3c dff68cb 8c3fb3c dff68cb 8c3fb3c dff68cb 8c3fb3c dff68cb 5f9a01b dff68cb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
"""
Tool definitions for the Legacy Dependency Solver.
Includes Crawl4AI batch crawler for efficient multi-URL processing.
"""
from typing import List, Dict, Any
import json
import sys
import asyncio
import concurrent.futures
from pydantic import BaseModel, Field
from google.adk.tools import FunctionTool
from .utils import logger
from .config import get_memory_service # Import memory service factory
# --- 1. Define Schema (Module level for pickling) ---
class SearchResult(BaseModel):
relevant_facts: List[str] = Field(..., description="Specific facts/numbers found.")
summary: str = Field(..., description="Concise summary related to the query.")
confidence: str = Field(..., description="Confidence level (High/Medium/Low).")
# --- 2. Worker Functions (Run in Subprocess) ---
async def batch_crawl_tool(urls: List[str]) -> Dict[str, Any]:
"""
Crawls a LIST of URLs in one go using AsyncWebCrawler directly.
"""
logger.info(f"π Batch Tool Triggered: Processing {len(urls)} URLs...")
# Import here to avoid top-level dependency if not needed immediately
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
# Shared Config
browser_config = BrowserConfig(
headless=True,
ignore_https_errors=True,
extra_args=["--ignore-certificate-errors", "--ignore-ssl-errors"]
)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=10,
)
results = []
# limit to top 3
target_urls = urls[:3]
try:
async with AsyncWebCrawler(config=browser_config) as crawler:
for url in target_urls:
try:
# Add timeout for each URL
crawl_result = await asyncio.wait_for(
crawler.arun(url=url, config=run_config),
timeout=30.0
)
if crawl_result.success:
results.append(f"--- SOURCE: {url} ---\n{crawl_result.markdown[:15000]}\n")
else:
results.append(f"--- SOURCE: {url} ---\n[Error: Failed to crawl]\n")
except asyncio.TimeoutError:
results.append(f"--- SOURCE: {url} ---\n[Error: Timeout]\n")
except Exception as e:
results.append(f"--- SOURCE: {url} ---\n[Exception: {str(e)}]\n")
return {
"combined_content": "\n".join(results),
"status": "completed"
}
except Exception as e:
logger.error(f"β Batch crawl failed: {e}")
return {"combined_content": f"Error: {str(e)}", "status": "failed"}
async def adaptive_crawl_tool(start_url: str, user_query: str) -> Dict[str, Any]:
"""
Performs adaptive crawl using AsyncWebCrawler directly.
"""
logger.info(f"π οΈ Tool Triggered: Adaptive Crawl on {start_url}")
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, AdaptiveConfig, LLMConfig
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from crawl4ai import AdaptiveCrawler
browser_config = BrowserConfig(
headless=True,
verbose=True,
ignore_https_errors=True,
extra_args=["--ignore-certificate-errors", "--ignore-ssl-errors"]
)
try:
async with AsyncWebCrawler(config=browser_config) as crawler:
# Phase 1: Discovery
adaptive_config = AdaptiveConfig(
max_pages=3,
confidence_threshold=0.7,
top_k_links=2,
)
adaptive = AdaptiveCrawler(crawler, config=adaptive_config)
try:
await adaptive.digest(start_url=start_url, query=user_query)
except Exception as e:
return {"error": f"Crawl failed during discovery: {str(e)}"}
top_content = adaptive.get_relevant_content(top_k=1)
if not top_content:
return {"error": "No relevant content found via adaptive crawling."}
best_url = top_content[0]['url']
# Phase 2: Extraction
dynamic_instruction = f"""
Extract ONLY information matching this request: '{user_query}'.
If not found, state that in the summary. Do not hallucinate.
"""
extraction_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=1,
page_timeout=60000,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="ollama/qwen2.5:7b", api_token="ollama"),
schema=SearchResult.model_json_schema(),
extraction_type="schema",
instruction=dynamic_instruction,
),
)
try:
result = await crawler.arun(url=best_url, config=extraction_config)
if result.extracted_content:
return json.loads(result.extracted_content)
return {"error": "Extraction returned empty content."}
except json.JSONDecodeError:
return {"raw_output": result.extracted_content}
except Exception as e:
return {"error": f"Extraction failed: {str(e)}"}
except Exception as e:
logger.error(f"β Adaptive crawl failed: {e}")
return {"error": f"Crawl failed: {str(e)}"}
# Convert to ADK Tools
batch_tool = FunctionTool(batch_crawl_tool)
adaptive_tool = FunctionTool(adaptive_crawl_tool)
# ===== STATE MANAGEMENT TOOLS =====
from google.adk.tools import ToolContext
# Global state dictionary for persistence across agents
GLOBAL_STATE = {}
def save_context(tool_context: ToolContext, key: str, value: str) -> str:
GLOBAL_STATE[key] = value
logger.info(f"πΎ State Saved (Global): {key} = {value}")
return f"Saved {key} to state."
def retrieve_context(tool_context: ToolContext, key: str) -> str:
value = GLOBAL_STATE.get(key, "Not found")
logger.info(f"π State Retrieved (Global): {key} = {value}")
return str(value)
save_context_tool = FunctionTool(save_context)
retrieve_context_tool = FunctionTool(retrieve_context)
def submit_queries(tool_context: ToolContext, queries: List[str]) -> str:
GLOBAL_STATE['search_queries'] = queries
logger.info(f"π Queries Submitted (Global): {queries}")
return "Queries submitted successfully."
submit_queries_tool = FunctionTool(submit_queries)
def validate_requirements(tool_context: ToolContext, requirements_content: str) -> str:
if not requirements_content:
return "Error: Empty requirements content."
# Relaxed validation for generic dependency files
# We just check if it has some content and isn't purely whitespace
if len(requirements_content.strip()) < 5:
return "Error: Content too short to be a valid dependency file."
logger.info("β
Dependency file validation passed (Generic check).")
return "SUCCESS"
validate_tool = FunctionTool(validate_requirements)
# ===== MEMORY RETRIEVAL TOOL =====
async def retrieve_memory(query: str) -> str:
"""
Searches long-term memory (Pinecone) for relevant past sessions.
Use this to recall details from previous conversations.
"""
logger.info(f"π§ Searching Memory for: {query}")
try:
# Initialize service on demand (or use singleton if configured)
memory_service = get_memory_service()
results = await memory_service.search_memory(query)
if not results:
return "No relevant memories found."
formatted_results = "\n---\n".join(results)
return f"Found relevant memories:\n{formatted_results}"
except Exception as e:
logger.error(f"β Memory retrieval failed: {e}")
return f"Error retrieving memory: {str(e)}"
retrieve_memory_tool = FunctionTool(retrieve_memory)
|