File size: 8,203 Bytes
dff68cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d15462e
 
dff68cb
d15462e
dff68cb
d15462e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dff68cb
 
 
d15462e
 
 
 
 
dff68cb
 
 
 
d15462e
 
dff68cb
 
 
 
 
 
 
d15462e
 
 
dff68cb
d15462e
dff68cb
d15462e
dff68cb
d15462e
 
 
 
 
 
 
 
 
 
 
 
 
 
dff68cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d15462e
 
 
 
dff68cb
 
 
 
 
 
 
 
 
 
8c3fb3c
 
 
dff68cb
8c3fb3c
 
dff68cb
 
 
8c3fb3c
 
dff68cb
 
 
 
 
 
8c3fb3c
 
dff68cb
 
 
 
 
 
 
5f9a01b
 
 
 
 
 
 
dff68cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""
Tool definitions for the Legacy Dependency Solver.
Includes Crawl4AI batch crawler for efficient multi-URL processing.
"""
from typing import List, Dict, Any
import json
import sys
import asyncio
import concurrent.futures
from pydantic import BaseModel, Field

from google.adk.tools import FunctionTool
from .utils import logger
from .config import get_memory_service # Import memory service factory

# --- 1. Define Schema (Module level for pickling) ---
class SearchResult(BaseModel):
    relevant_facts: List[str] = Field(..., description="Specific facts/numbers found.")
    summary: str = Field(..., description="Concise summary related to the query.")
    confidence: str = Field(..., description="Confidence level (High/Medium/Low).")

# --- 2. Worker Functions (Run in Subprocess) ---


async def batch_crawl_tool(urls: List[str]) -> Dict[str, Any]:
    """
    Crawls a LIST of URLs in one go using AsyncWebCrawler directly.
    """
    logger.info(f"πŸš€ Batch Tool Triggered: Processing {len(urls)} URLs...")
    
    # Import here to avoid top-level dependency if not needed immediately
    from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
    
    # Shared Config
    browser_config = BrowserConfig(
        headless=True,
        ignore_https_errors=True,
        extra_args=["--ignore-certificate-errors", "--ignore-ssl-errors"]
    )
    run_config = CrawlerRunConfig(
        cache_mode=CacheMode.BYPASS,
        word_count_threshold=10,
    )
    
    results = []
    # limit to top 3
    target_urls = urls[:3]
    
    try:
        async with AsyncWebCrawler(config=browser_config) as crawler:
            for url in target_urls:
                try:
                    # Add timeout for each URL
                    crawl_result = await asyncio.wait_for(
                        crawler.arun(url=url, config=run_config),
                        timeout=30.0
                    )
                    if crawl_result.success:
                        results.append(f"--- SOURCE: {url} ---\n{crawl_result.markdown[:15000]}\n")
                    else:
                        results.append(f"--- SOURCE: {url} ---\n[Error: Failed to crawl]\n")
                except asyncio.TimeoutError:
                    results.append(f"--- SOURCE: {url} ---\n[Error: Timeout]\n")
                except Exception as e:
                    results.append(f"--- SOURCE: {url} ---\n[Exception: {str(e)}]\n")
                    
        return {
            "combined_content": "\n".join(results),
            "status": "completed"
        }
    except Exception as e:
        logger.error(f"❌ Batch crawl failed: {e}")
        return {"combined_content": f"Error: {str(e)}", "status": "failed"}

async def adaptive_crawl_tool(start_url: str, user_query: str) -> Dict[str, Any]:
    """
    Performs adaptive crawl using AsyncWebCrawler directly.
    """
    logger.info(f"πŸ› οΈ Tool Triggered: Adaptive Crawl on {start_url}")
    
    from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, AdaptiveConfig, LLMConfig
    from crawl4ai.extraction_strategy import LLMExtractionStrategy
    from crawl4ai import AdaptiveCrawler
    
    browser_config = BrowserConfig(
        headless=True,
        verbose=True,
        ignore_https_errors=True,
        extra_args=["--ignore-certificate-errors", "--ignore-ssl-errors"]
    )
    
    try:
        async with AsyncWebCrawler(config=browser_config) as crawler:
            # Phase 1: Discovery
            adaptive_config = AdaptiveConfig(
                max_pages=3,
                confidence_threshold=0.7,
                top_k_links=2,
            )
            
            adaptive = AdaptiveCrawler(crawler, config=adaptive_config)
            
            try:
                await adaptive.digest(start_url=start_url, query=user_query)
            except Exception as e:
                return {"error": f"Crawl failed during discovery: {str(e)}"}
                
            top_content = adaptive.get_relevant_content(top_k=1)
            if not top_content:
                return {"error": "No relevant content found via adaptive crawling."}
                
            best_url = top_content[0]['url']
            
            # Phase 2: Extraction
            dynamic_instruction = f"""
            Extract ONLY information matching this request: '{user_query}'.
            If not found, state that in the summary. Do not hallucinate.
            """
            
            extraction_config = CrawlerRunConfig(
                cache_mode=CacheMode.BYPASS,
                word_count_threshold=1,
                page_timeout=60000,
                extraction_strategy=LLMExtractionStrategy(
                    llm_config=LLMConfig(provider="ollama/qwen2.5:7b", api_token="ollama"),
                    schema=SearchResult.model_json_schema(),
                    extraction_type="schema",
                    instruction=dynamic_instruction,
                ),
            )
            
            try:
                result = await crawler.arun(url=best_url, config=extraction_config)
                if result.extracted_content:
                    return json.loads(result.extracted_content)
                return {"error": "Extraction returned empty content."}
            except json.JSONDecodeError:
                return {"raw_output": result.extracted_content}
            except Exception as e:
                return {"error": f"Extraction failed: {str(e)}"}
                
    except Exception as e:
        logger.error(f"❌ Adaptive crawl failed: {e}")
        return {"error": f"Crawl failed: {str(e)}"}


# Convert to ADK Tools
batch_tool = FunctionTool(batch_crawl_tool)
adaptive_tool = FunctionTool(adaptive_crawl_tool)


# ===== STATE MANAGEMENT TOOLS =====
from google.adk.tools import ToolContext

# Global state dictionary for persistence across agents
GLOBAL_STATE = {}

def save_context(tool_context: ToolContext, key: str, value: str) -> str:
    GLOBAL_STATE[key] = value
    logger.info(f"πŸ’Ύ State Saved (Global): {key} = {value}")
    return f"Saved {key} to state."

def retrieve_context(tool_context: ToolContext, key: str) -> str:
    value = GLOBAL_STATE.get(key, "Not found")
    logger.info(f"πŸ“‚ State Retrieved (Global): {key} = {value}")
    return str(value)

save_context_tool = FunctionTool(save_context)
retrieve_context_tool = FunctionTool(retrieve_context)

def submit_queries(tool_context: ToolContext, queries: List[str]) -> str:
    GLOBAL_STATE['search_queries'] = queries
    logger.info(f"πŸš€ Queries Submitted (Global): {queries}")
    return "Queries submitted successfully."

submit_queries_tool = FunctionTool(submit_queries)

def validate_requirements(tool_context: ToolContext, requirements_content: str) -> str:
    if not requirements_content:
        return "Error: Empty requirements content."
    
    # Relaxed validation for generic dependency files
    # We just check if it has some content and isn't purely whitespace
    if len(requirements_content.strip()) < 5:
         return "Error: Content too short to be a valid dependency file."
         
    logger.info("βœ… Dependency file validation passed (Generic check).")
    return "SUCCESS"

validate_tool = FunctionTool(validate_requirements)

# ===== MEMORY RETRIEVAL TOOL =====
async def retrieve_memory(query: str) -> str:
    """
    Searches long-term memory (Pinecone) for relevant past sessions.
    Use this to recall details from previous conversations.
    """
    logger.info(f"🧠 Searching Memory for: {query}")
    try:
        # Initialize service on demand (or use singleton if configured)
        memory_service = get_memory_service()
        results = await memory_service.search_memory(query)
        
        if not results:
            return "No relevant memories found."
            
        formatted_results = "\n---\n".join(results)
        return f"Found relevant memories:\n{formatted_results}"
        
    except Exception as e:
        logger.error(f"❌ Memory retrieval failed: {e}")
        return f"Error retrieving memory: {str(e)}"

retrieve_memory_tool = FunctionTool(retrieve_memory)