MaliosDark commited on
Commit
5680110
·
verified ·
1 Parent(s): c5de26c

Add AGI module: sofia_tools_advanced.py

Browse files
Files changed (1) hide show
  1. sofia_tools_advanced.py +363 -0
sofia_tools_advanced.py ADDED
@@ -0,0 +1,363 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ SOFIA Advanced Tool Integration System
4
+ Expanded tool capabilities with APIs, databases, and web scraping
5
+ """
6
+
7
+ import json, re, math, datetime, requests, sqlite3, os
8
+ import numpy as np
9
+ from typing import Dict, Any, List, Optional, Union
10
+ import logging
11
+
12
+ logging.basicConfig(level=logging.INFO)
13
+ logger = logging.getLogger(__name__)
14
+
15
+ class Tool:
16
+ def __init__(self, name: str, description: str):
17
+ self.name = name
18
+ self.description = description
19
+
20
+ def can_handle(self, query: str) -> bool:
21
+ raise NotImplementedError
22
+
23
+ def execute(self, query: str) -> Dict[str, Any]:
24
+ raise NotImplementedError
25
+
26
+ class CalculatorTool(Tool):
27
+ def __init__(self):
28
+ super().__init__("calculator", "Performs mathematical calculations")
29
+
30
+ def can_handle(self, query: str) -> bool:
31
+ patterns = [
32
+ r'\d+\s*[\+\-\*\/]\s*\d+',
33
+ r'calculate\s+.+',
34
+ r'what\s+is\s+.+[\+\-\*\/].+',
35
+ r'solve\s+.+',
36
+ ]
37
+ return any(re.search(pattern, query.lower()) for pattern in patterns)
38
+
39
+ def execute(self, query: str) -> Dict[str, Any]:
40
+ try:
41
+ expr_match = re.search(r'(\d+(?:\.\d+)?\s*[\+\-\*\/]\s*\d+(?:\.\d+)?)', query.lower())
42
+ if expr_match:
43
+ expression = expr_match.group(1).replace(' ', '')
44
+ result = eval(expression)
45
+ return {
46
+ "tool": "calculator",
47
+ "expression": expression,
48
+ "result": result,
49
+ "type": "arithmetic"
50
+ }
51
+
52
+ if 'sqrt' in query.lower():
53
+ numbers = re.findall(r'\d+(?:\.\d+)?', query)
54
+ if numbers:
55
+ num = float(numbers[0])
56
+ result = math.sqrt(num)
57
+ return {
58
+ "tool": "calculator",
59
+ "expression": f"sqrt({num})",
60
+ "result": result,
61
+ "type": "square_root"
62
+ }
63
+
64
+ except Exception as e:
65
+ return {"tool": "calculator", "error": f"Could not calculate: {str(e)}"}
66
+
67
+ return {"tool": "calculator", "error": "No valid calculation found"}
68
+
69
+ class TimeTool(Tool):
70
+ def __init__(self):
71
+ super().__init__("time", "Provides current time, date, and temporal information")
72
+
73
+ def can_handle(self, query: str) -> bool:
74
+ time_keywords = ['time', 'date', 'day', 'hour', 'minute', 'second', 'now', 'today', 'tomorrow', 'yesterday']
75
+ return any(keyword in query.lower() for keyword in time_keywords)
76
+
77
+ def execute(self, query: str) -> Dict[str, Any]:
78
+ now = datetime.datetime.now()
79
+
80
+ result = {
81
+ "tool": "time",
82
+ "current_time": now.strftime("%H:%M:%S"),
83
+ "current_date": now.strftime("%Y-%m-%d"),
84
+ "day_of_week": now.strftime("%A"),
85
+ "month": now.strftime("%B"),
86
+ "year": now.year,
87
+ "timezone": "UTC"
88
+ }
89
+
90
+ query_lower = query.lower()
91
+ if 'tomorrow' in query_lower:
92
+ tomorrow = now + datetime.timedelta(days=1)
93
+ result['tomorrow'] = tomorrow.strftime("%Y-%m-%d (%A)")
94
+ elif 'yesterday' in query_lower:
95
+ yesterday = now - datetime.timedelta(days=1)
96
+ result['yesterday'] = yesterday.strftime("%Y-%m-%d (%A)")
97
+
98
+ return result
99
+
100
+ class SearchTool(Tool):
101
+ def __init__(self):
102
+ super().__init__("search", "Performs web searches and information retrieval")
103
+
104
+ def can_handle(self, query: str) -> bool:
105
+ search_keywords = ['search', 'find', 'lookup', 'what is', 'who is', 'where is', 'how to']
106
+ return any(keyword in query.lower() for keyword in search_keywords)
107
+
108
+ def execute(self, query: str) -> Dict[str, Any]:
109
+ try:
110
+ search_term = self._extract_search_term(query)
111
+
112
+ if any(word in query.lower() for word in ['what is', 'who is', 'definition']):
113
+ return self._wikipedia_search(search_term)
114
+
115
+ return {
116
+ "tool": "search",
117
+ "search_term": search_term,
118
+ "result": f"Search results for '{search_term}' would be retrieved from web sources",
119
+ "type": "general_search",
120
+ "suggestion": "Consider using specific APIs for better results"
121
+ }
122
+
123
+ except Exception as e:
124
+ return {"tool": "search", "error": f"Search failed: {str(e)}"}
125
+
126
+ def _extract_search_term(self, query: str) -> str:
127
+ query = re.sub(r'^(what|who|where|how|when|why)\s+is\s+', '', query.lower())
128
+ query = re.sub(r'^(search|find|lookup)\s+(for\s+)?', '', query.lower())
129
+ return query.strip()
130
+
131
+ def _wikipedia_search(self, term: str) -> Dict[str, Any]:
132
+ try:
133
+ import wikipedia
134
+ page = wikipedia.page(term, auto_suggest=True)
135
+ summary = page.summary[:500] + "..." if len(page.summary) > 500 else page.summary
136
+
137
+ return {
138
+ "tool": "search",
139
+ "search_term": term,
140
+ "result": summary,
141
+ "source": "Wikipedia",
142
+ "url": page.url,
143
+ "type": "wikipedia_summary"
144
+ }
145
+ except Exception as e:
146
+ return {"tool": "search", "error": f"Wikipedia search failed: {str(e)}"}
147
+
148
+ class DatabaseTool(Tool):
149
+ def __init__(self, db_path: str = "sofia_memory.db"):
150
+ super().__init__("database", "Access and query local knowledge database")
151
+ self.db_path = db_path
152
+ self._init_database()
153
+
154
+ def can_handle(self, query: str) -> bool:
155
+ db_keywords = ['remember', 'recall', 'stored', 'database', 'knowledge', 'facts']
156
+ return any(keyword in query.lower() for keyword in db_keywords)
157
+
158
+ def execute(self, query: str) -> Dict[str, Any]:
159
+ try:
160
+ query_lower = query.lower()
161
+
162
+ if 'remember' in query_lower or 'store' in query_lower:
163
+ return self._store_information(query)
164
+ elif 'recall' in query_lower or 'retrieve' in query_lower:
165
+ return self._retrieve_information(query)
166
+ else:
167
+ return self._query_database(query)
168
+
169
+ except Exception as e:
170
+ return {"tool": "database", "error": f"Database operation failed: {str(e)}"}
171
+
172
+ def _init_database(self):
173
+ conn = sqlite3.connect(self.db_path)
174
+ cursor = conn.cursor()
175
+
176
+ cursor.execute('''
177
+ CREATE TABLE IF NOT EXISTS knowledge (
178
+ id INTEGER PRIMARY KEY,
179
+ topic TEXT,
180
+ content TEXT,
181
+ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
182
+ source TEXT
183
+ )
184
+ ''')
185
+
186
+ cursor.execute('''
187
+ CREATE TABLE IF NOT EXISTS facts (
188
+ id INTEGER PRIMARY KEY,
189
+ fact TEXT UNIQUE,
190
+ category TEXT,
191
+ confidence REAL,
192
+ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
193
+ )
194
+ ''')
195
+
196
+ conn.commit()
197
+ conn.close()
198
+
199
+ def _store_information(self, query: str) -> Dict[str, Any]:
200
+ content = re.sub(r'(remember|store)\s+', '', query, flags=re.IGNORECASE).strip()
201
+
202
+ conn = sqlite3.connect(self.db_path)
203
+ cursor = conn.cursor()
204
+
205
+ cursor.execute(
206
+ "INSERT INTO knowledge (topic, content, source) VALUES (?, ?, ?)",
207
+ ("user_input", content, "conversation")
208
+ )
209
+
210
+ conn.commit()
211
+ conn.close()
212
+
213
+ return {
214
+ "tool": "database",
215
+ "operation": "store",
216
+ "content": content,
217
+ "status": "stored"
218
+ }
219
+
220
+ def _retrieve_information(self, query: str) -> Dict[str, Any]:
221
+ conn = sqlite3.connect(self.db_path)
222
+ cursor = conn.cursor()
223
+
224
+ cursor.execute(
225
+ "SELECT content, timestamp FROM knowledge ORDER BY timestamp DESC LIMIT 5"
226
+ )
227
+
228
+ results = cursor.fetchall()
229
+ conn.close()
230
+
231
+ return {
232
+ "tool": "database",
233
+ "operation": "retrieve",
234
+ "results": [{"content": row[0], "timestamp": row[1]} for row in results],
235
+ "count": len(results)
236
+ }
237
+
238
+ def _query_database(self, query: str) -> Dict[str, Any]:
239
+ conn = sqlite3.connect(self.db_path)
240
+ cursor = conn.cursor()
241
+
242
+ cursor.execute("SELECT COUNT(*) FROM knowledge")
243
+ knowledge_count = cursor.fetchone()[0]
244
+
245
+ cursor.execute("SELECT COUNT(*) FROM facts")
246
+ facts_count = cursor.fetchone()[0]
247
+
248
+ conn.close()
249
+
250
+ return {
251
+ "tool": "database",
252
+ "operation": "stats",
253
+ "knowledge_entries": knowledge_count,
254
+ "facts_stored": facts_count
255
+ }
256
+
257
+ class ToolManager:
258
+ def __init__(self):
259
+ self.tools = [
260
+ CalculatorTool(),
261
+ TimeTool(),
262
+ SearchTool(),
263
+ DatabaseTool()
264
+ ]
265
+ print(f"🔧 Loaded {len(self.tools)} advanced tools: {[t.name for t in self.tools]}")
266
+
267
+ def execute_tools(self, query: str) -> List[Dict[str, Any]]:
268
+ results = []
269
+ for tool in self.tools:
270
+ if tool.can_handle(query):
271
+ print(f"🛠️ Using {tool.name}: {tool.description}")
272
+ result = tool.execute(query)
273
+ if 'error' not in result:
274
+ results.append(result)
275
+ else:
276
+ print(f"⚠️ Tool {tool.name} failed: {result['error']}")
277
+ return results
278
+
279
+ def get_available_tools(self) -> List[Dict[str, str]]:
280
+ return [{"name": tool.name, "description": tool.description} for tool in self.tools]
281
+
282
+ class AdvancedToolAugmentedSOFIA:
283
+ def __init__(self):
284
+ from conversational_sofia import ConversationalSOFIA
285
+ self.sofia = ConversationalSOFIA()
286
+ self.tool_manager = ToolManager()
287
+
288
+ def process_query(self, query: str) -> Dict[str, Any]:
289
+ print(f"🤖 Processing: '{query}'")
290
+
291
+ tool_results = self.tool_manager.execute_tools(query)
292
+
293
+ if tool_results:
294
+ tool_contexts = []
295
+ for result in tool_results:
296
+ formatted = self._format_tool_result(result)
297
+ tool_contexts.append(f"Tool {result['tool']}: {formatted}")
298
+
299
+ enhanced_query = f"{query}\n\nTool Results:\n" + "\n".join(tool_contexts)
300
+ else:
301
+ enhanced_query = query
302
+
303
+ response, embedding = self.sofia.chat(enhanced_query)
304
+
305
+ return {
306
+ "response": response,
307
+ "embedding": embedding,
308
+ "tools_used": tool_results,
309
+ "tool_count": len(tool_results),
310
+ "enhanced_query": enhanced_query
311
+ }
312
+
313
+ def _format_tool_result(self, result: Dict[str, Any]) -> str:
314
+ tool_type = result.get('tool', '')
315
+
316
+ if tool_type == 'time':
317
+ return f"{result.get('current_time', 'N/A')} on {result.get('current_date', 'N/A')} ({result.get('day_of_week', 'N/A')})"
318
+
319
+ elif tool_type == 'calculator':
320
+ if 'expression' in result:
321
+ return f"{result['expression']} = {result['result']}"
322
+ else:
323
+ return str(result.get('result', 'N/A'))
324
+
325
+ elif tool_type == 'search':
326
+ return result.get('result', 'Search completed')
327
+
328
+ elif tool_type == 'database':
329
+ if result.get('operation') == 'retrieve':
330
+ return f"Retrieved {result.get('count', 0)} knowledge entries"
331
+ elif result.get('operation') == 'store':
332
+ return f"Stored: {result.get('content', 'N/A')}"
333
+ else:
334
+ return f"Database has {result.get('knowledge_entries', 0)} entries"
335
+
336
+ else:
337
+ return str(result.get('result', 'Tool executed successfully'))
338
+
339
+ def main():
340
+ import sys
341
+
342
+ if len(sys.argv) < 2:
343
+ print("Usage: python sofia_tools.py 'query'")
344
+ print("\nAvailable tools:")
345
+ tool_manager = ToolManager()
346
+ for tool_info in tool_manager.get_available_tools():
347
+ print(f" - {tool_info['name']}: {tool_info['description']}")
348
+ return
349
+
350
+ user_input = ' '.join(sys.argv[1:])
351
+
352
+ tool_sofia = AdvancedToolAugmentedSOFIA()
353
+ result = tool_sofia.process_query(user_input)
354
+
355
+ print(f"\n🤖 SOFIA: {result['response']}")
356
+ if result['tools_used']:
357
+ print(f"\n🛠️ Advanced Tools Used: {result['tool_count']}")
358
+ for tool_result in result['tools_used']:
359
+ formatted = tool_sofia._format_tool_result(tool_result)
360
+ print(f" - {tool_result['tool']}: {formatted}")
361
+
362
+ if __name__ == "__main__":
363
+ main()