File size: 9,011 Bytes
741c3da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
"""Simple synchronous tools for LlamaIndex ReActAgent."""
import os
import time
import hashlib
import json
from typing import Optional
from datetime import datetime
from tavily import TavilyClient # Use sync client
from llama_index.core.tools import FunctionTool
# Global state store - simple in-memory storage
_workflow_state = {
"research_notes": {},
"report_content": "Not written yet.",
"review": "Review required.",
"structured_report": None
}
# Global cache to track recent tool calls
_tool_call_cache = {}
_cache_timeout = 30
def _generate_call_hash(tool_name: str, **kwargs) -> str:
"""Generate a hash for tool call deduplication."""
call_data = {"tool": tool_name, "args": kwargs}
call_str = json.dumps(call_data, sort_keys=True)
return hashlib.md5(call_str.encode()).hexdigest()
def _should_execute_call(tool_name: str, **kwargs) -> bool:
"""Check if a tool call should be executed or if it's a duplicate."""
current_time = time.time()
call_hash = _generate_call_hash(tool_name, **kwargs)
# Clean up old cache entries
expired_keys = [k for k, v in _tool_call_cache.items() if current_time - v > _cache_timeout]
for key in expired_keys:
del _tool_call_cache[key]
# Check if this call was made recently
if call_hash in _tool_call_cache:
return False
# Record this call
_tool_call_cache[call_hash] = current_time
return True
def search_web(query: str) -> str:
"""Search the web for information on a given query."""
try:
print(f"DEBUG: search_web called with query: '{query}'")
# Check for duplicate calls
if not _should_execute_call("search_web", query=query):
return f"Duplicate search call detected for query: '{query}'. Skipping to avoid redundant API calls."
# Use synchronous Tavily client
client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
result = client.search(query)
print(f"DEBUG: search_web executed successfully for query: '{query}'")
return str(result)
except Exception as e:
error_msg = f"Search failed: {str(e)}"
print(f"ERROR: search_web failed: {e}")
return error_msg
def record_notes(notes: str, notes_title: str) -> str:
"""Record notes on a given topic with a title."""
try:
print(f"DEBUG: record_notes called with title: '{notes_title}', notes length: {len(notes)}")
# Check for duplicate calls
if not _should_execute_call("record_notes", notes=notes, notes_title=notes_title):
return f"Duplicate notes recording detected for title: '{notes_title}'. Skipping to avoid redundant recording."
# Store in global state
_workflow_state["research_notes"][notes_title] = notes
print(f"DEBUG: Notes stored. Total research notes: {len(_workflow_state['research_notes'])}")
return f"Notes recorded successfully with title: '{notes_title}'. Total notes: {len(_workflow_state['research_notes'])}"
except Exception as e:
error_msg = f"Failed to record notes: {str(e)}"
print(f"ERROR: record_notes failed: {e}")
return error_msg
def write_report(report_content: str, title: str = "Research Report") -> str:
"""Write a structured report with the given content and title."""
try:
print(f"DEBUG: write_report FUNCTION ENTERED with title: '{title}', content length: {len(report_content)}")
print(f"DEBUG: Function arguments - report_content type: {type(report_content)}, title type: {type(title)}")
# Check for duplicate calls
if not _should_execute_call("write_report", report_content=report_content, title=title):
print("DEBUG: Duplicate call detected, returning early")
return "Duplicate report writing detected. Skipping to avoid redundant report generation."
print("DEBUG: Processing report content...")
# Extract sections from markdown content
import re
sections = re.findall(r'^#{1,3}\s+(.+)$', report_content, re.MULTILINE)
print(f"DEBUG: Found {len(sections)} sections: {sections}")
# Calculate word count
word_count = len(report_content.split())
print(f"DEBUG: Word count: {word_count}")
# Extract abstract (first paragraph after title)
lines = report_content.split('\n')
abstract = ""
for line in lines:
if line.strip() and not line.startswith('#'):
abstract = line.strip()
break
print(f"DEBUG: Abstract: {abstract[:100]}...")
# Create structured report
structured_report = {
"title": title,
"abstract": abstract[:200] + "..." if len(abstract) > 200 else abstract,
"content": report_content,
"sections": sections,
"word_count": word_count,
"generated_at": datetime.now().isoformat(),
"sources_used": list(_workflow_state["research_notes"].keys())
}
print("DEBUG: Structured report created")
# Store in global state
print("DEBUG: Storing in global state...")
_workflow_state["report_content"] = report_content
_workflow_state["structured_report"] = structured_report
print(f"DEBUG: Report stored successfully. Word count: {word_count}, Sections: {len(sections)}")
print(f"DEBUG: State keys now: {list(_workflow_state.keys())}")
print(f"DEBUG: State report_content length: {len(_workflow_state['report_content'])}")
result = f"Report written successfully! Title: '{title}', Word count: {word_count}, Sections: {len(sections)}"
print(f"DEBUG: Returning result: {result}")
return result
except Exception as e:
error_msg = f"Failed to write report: {str(e)}"
print(f"ERROR: write_report failed: {e}")
import traceback
traceback.print_exc()
return error_msg
def review_report(review: str) -> str:
"""Review a report and provide feedback."""
try:
print(f"DEBUG: review_report called with review: '{review[:100]}...'")
# Check for duplicate calls
if not _should_execute_call("review_report", review=review):
return "Duplicate review detected. Skipping to avoid redundant review submission."
# Store review in global state
_workflow_state["review"] = review
print(f"DEBUG: Review stored successfully")
return f"Report reviewed successfully. Review: {review[:100]}{'...' if len(review) > 100 else ''}"
except Exception as e:
error_msg = f"Failed to review report: {str(e)}"
print(f"ERROR: review_report failed: {e}")
return error_msg
def get_workflow_state() -> dict:
"""Get the current workflow state."""
return _workflow_state.copy()
def reset_workflow_state():
"""Reset the workflow state."""
global _workflow_state
_workflow_state = {
"research_notes": {},
"report_content": "Not written yet.",
"review": "Review required.",
"structured_report": None
}
# Create LlamaIndex FunctionTool instances with better descriptions
search_web_tool = FunctionTool.from_defaults(
fn=search_web,
name="search_web",
description=(
"Search the web for information on any topic. "
"Input: A search query string. "
"Output: Search results containing relevant information. "
"Use this to gather facts and information about your research topic."
),
)
record_notes_tool = FunctionTool.from_defaults(
fn=record_notes,
name="record_notes",
description=(
"Record research notes with a descriptive title. "
"Input: notes (string) - the content to save, notes_title (string) - a title for the notes. "
"Output: Confirmation that notes were saved. "
"Use this after searching to save important information you found."
),
)
write_report_tool = FunctionTool.from_defaults(
fn=write_report,
name="write_report",
description=(
"Write a comprehensive markdown report. "
"Input: report_content (string) - full markdown report content, title (string, optional) - report title. "
"Output: Confirmation that report was written. "
"The report_content should be well-structured markdown with headers, sections, and detailed content."
),
)
review_report_tool = FunctionTool.from_defaults(
fn=review_report,
name="review_report",
description=(
"Review a written report and provide feedback. "
"Input: review (string) - your review and feedback on the report. "
"Output: Confirmation that review was recorded. "
"Start with 'APPROVED:' if the report is satisfactory, otherwise provide specific improvement suggestions."
),
) |