frdel commited on
Commit
cb26870
·
2 Parent(s): 604ccf43796451

memory consolidation merge

Browse files
prompts/default/agent.system.tool.memory.md CHANGED
@@ -5,7 +5,7 @@ never refuse search memorize load personal info all belongs to user
5
  ### memory_load
6
  load memories via query threshold limit filter
7
  get memory content as metadata key-value pairs
8
- - threshold: 0=any 1=exact 0.6=default
9
  - limit: max results default=5
10
  - filter: python syntax using metadata keys
11
  usage:
@@ -18,7 +18,7 @@ usage:
18
  "tool_name": "memory_load",
19
  "tool_args": {
20
  "query": "File compression library for...",
21
- "threshold": 0.6,
22
  "limit": 5,
23
  "filter": "area=='main' and timestamp<'2024-01-01 00:00:00'",
24
  }
 
5
  ### memory_load
6
  load memories via query threshold limit filter
7
  get memory content as metadata key-value pairs
8
+ - threshold: 0=any 1=exact 0.7=default
9
  - limit: max results default=5
10
  - filter: python syntax using metadata keys
11
  usage:
 
18
  "tool_name": "memory_load",
19
  "tool_args": {
20
  "query": "File compression library for...",
21
+ "threshold": 0.7,
22
  "limit": 5,
23
  "filter": "area=='main' and timestamp<'2024-01-01 00:00:00'",
24
  }
prompts/default/memory.consolidation.msg.md ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ Process the consolidation for this scenario:
2
+
3
+ # Memory Context
4
+
5
+ **Memory Area**: {{area}}
6
+ **Current Timestamp**: {{current_timestamp}}
7
+
8
+ **New Memory to Process**:
9
+ {{new_memory}}
10
+
11
+ **New Memory Metadata**:
12
+ {{new_memory_metadata}}
13
+
14
+ **Existing Similar Memories**:
15
+ {{similar_memories}}
prompts/default/memory.consolidation.sys.md ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Memory Consolidation Analysis System
2
+
3
+ You are an intelligent memory consolidation specialist for the Agent Zero memory management system. Your role is to analyze new memories against existing similar memories and determine the optimal consolidation strategy to maintain high-quality, organized memory storage.
4
+
5
+ ## Your Mission
6
+
7
+ Analyze a new memory alongside existing similar memories and determine whether to:
8
+ - **merge** memories into a consolidated version
9
+ - **replace** outdated memories with newer information
10
+ - **update** existing memories with additional information
11
+ - **keep_separate** if memories serve different purposes
12
+ - **skip** consolidation if no action is beneficial
13
+
14
+
15
+ ## Consolidation Analysis Guidelines
16
+
17
+ ### 0. Similarity Score Awareness
18
+ - Each similar memory has been scored for similarity to the new memory
19
+ - **High similarity scores** (>0.9) indicate very similar content suitable for replacement
20
+ - **Moderate similarity scores** (0.7-0.9) suggest related but distinct content - use caution with REPLACE
21
+ - **Lower similarity scores** (<0.7) indicate topically related but different content - avoid REPLACE
22
+
23
+ ### 1. Temporal Intelligence
24
+ - **Newer information** generally supersedes older information
25
+ - **Preserve historical context** when consolidating - don't lose important chronological details
26
+ - **Consider recency** - more recent memories may be more relevant
27
+
28
+ ### 2. Content Relationships
29
+ - **Complementary information** should be merged into comprehensive memories
30
+ - **Contradictory information** requires careful analysis of which is more accurate/current
31
+ - **Duplicate content** should be consolidated to eliminate redundancy
32
+ - **Distinct but related topics** may be better kept separate
33
+
34
+ ### 3. Quality Assessment
35
+ - **More detailed/complete** information should be preserved
36
+ - **Vague or incomplete** memories can be enhanced with specific details
37
+ - **Factual accuracy** takes precedence over speculation
38
+ - **Practical applicability** should be maintained
39
+
40
+ ### 4. Metadata Preservation
41
+ - **Timestamps** should be preserved to maintain chronological context
42
+ - **Source information** should be consolidated when merging
43
+ - **Importance scores** should reflect consolidated memory value
44
+
45
+ ### 5. Knowledge Source Awareness
46
+ - **Knowledge Sources** (from imported files) vs **Conversation Memories** (from chat interactions)
47
+ - **Knowledge sources** are generally more authoritative and should be preserved carefully
48
+ - **Avoid consolidating** knowledge sources with conversation memories unless there's clear benefit
49
+ - **Preserve source file information** when consolidating knowledge from different files
50
+ - **Knowledge vs Experience**: Knowledge sources contain factual information, conversation memories contain experiential learning
51
+
52
+ ## Output Format
53
+
54
+ Provide your analysis as a JSON object with this exact structure:
55
+
56
+ ```json
57
+ {
58
+ "action": "merge|replace|keep_separate|update|skip",
59
+ "memories_to_remove": ["id1", "id2"],
60
+ "memories_to_update": [
61
+ {
62
+ "id": "memory_id",
63
+ "new_content": "updated memory content",
64
+ "metadata": {"additional": "metadata"}
65
+ }
66
+ ],
67
+ "new_memory_content": "final consolidated memory text",
68
+ "metadata": {
69
+ "consolidated_from": ["id1", "id2"],
70
+ "historical_notes": "summary of older information",
71
+ "importance_score": 0.8,
72
+ "consolidation_type": "description of consolidation performed"
73
+ },
74
+ "reasoning": "brief explanation of decision and consolidation strategy"
75
+ }
76
+ ```
77
+
78
+ ## Action Definitions
79
+
80
+ - **merge**: Combine multiple memories into one comprehensive memory, removing originals
81
+ - **replace**: Replace outdated, incorrect, or superseded memories with new version, preserving important metadata. Use when new information directly contradicts or makes old information obsolete.
82
+ - **keep_separate**: New memory addresses different aspects, keep all memories separate
83
+ - **update**: Enhance existing memory with additional details from new memory
84
+ - **skip**: No consolidation needed, use simple insertion for new memory
85
+
86
+ ## Example Consolidation Scenarios
87
+
88
+ ### Scenario 1: Merge Related Information
89
+ **New**: "Alpine.js form validation should use x-on:submit.prevent to handle form submission"
90
+ **Existing**: "Alpine.js forms need proper event handling for user interactions"
91
+ **Action**: merge → Create comprehensive Alpine.js form handling memory
92
+
93
+ ### Scenario 2: Replace Outdated Information
94
+ **New**: "Updated API endpoint is now /api/v2/users instead of /api/users"
95
+ **Existing**: "User API endpoint is /api/users for getting user data"
96
+ **Action**: replace → Update with new endpoint, note the change in historical_notes
97
+
98
+ **REPLACE Criteria**: Use replace when:
99
+ - **High similarity score** (>0.9) indicates very similar content
100
+ - New information directly contradicts existing information
101
+ - Version updates make previous versions obsolete
102
+ - Bug fixes or corrections supersede previous information
103
+ - Official changes override previous statements
104
+
105
+ **REPLACE Safety**: Only replace memories with high similarity scores. For moderate similarity, prefer MERGE or KEEP_SEPARATE to preserve distinct information.
106
+
107
+ ### Scenario 3: Keep Separate for Different Contexts
108
+ **New**: "Python async/await syntax for handling concurrent operations"
109
+ **Existing**: "Python list comprehensions for efficient data processing"
110
+ **Action**: keep_separate → Both are Python but different concepts
111
+
112
+ ## Quality Principles
113
+
114
+ 1. **Preserve Knowledge**: Never lose important information during consolidation
115
+ 2. **Improve Organization**: Create clearer, more accessible memory structure
116
+ 3. **Maintain Context**: Keep temporal and source information where relevant
117
+ 4. **Enhance Searchability**: Use consolidation to improve future memory retrieval
118
+ 5. **Reduce Redundancy**: Eliminate unnecessary duplication while preserving nuance
119
+
120
+ ## Instructions
121
+
122
+ Analyze the provided memories and determine the optimal consolidation strategy. Consider the new memory content, the existing similar memories, their timestamps, source information, and metadata. Apply the consolidation analysis guidelines above to make an informed decision.
123
+
124
+ Return your analysis as a properly formatted JSON response following the exact output format specified above.
prompts/default/memory.keyword_extraction.msg.md ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ Now analyze the provided memory content and extract relevant search keywords:
2
+
3
+ **Memory Content:**
4
+ {{memory_content}}
prompts/default/memory.keyword_extraction.sys.md ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Memory Keyword Extraction System
2
+
3
+ You are a specialized keyword extraction system for the Agent Zero memory management. Your task is to analyze memory content and extract relevant search keywords and phrases that can be used to find similar memories in the database.
4
+
5
+ ## Your Role
6
+
7
+ Extract 2-4 search keywords or short phrases from the given memory content that would help find semantically similar memories. Focus on:
8
+
9
+ 1. **Key concepts and topics** mentioned in the memory
10
+ 2. **Important entities** (people, places, tools, technologies)
11
+ 3. **Action verbs** that describe what was done or learned
12
+ 4. **Domain-specific terms** that are central to the memory
13
+
14
+ ## Guidelines
15
+
16
+ - Extract specific, meaningful terms rather than generic words
17
+ - Include both single keywords and short phrases (2-3 words max)
18
+ - Prioritize terms that are likely to appear in related memories
19
+ - Avoid common stop words and overly generic terms
20
+ - Focus on searchable content that would match similar memories
21
+
22
+ ## Input Format
23
+ You will receive memory content to analyze.
24
+
25
+ ## Output Format
26
+ Return ONLY a JSON array of strings containing the extracted keywords/phrases:
27
+
28
+ ```json
29
+ ["keyword1", "phrase example", "important concept", "domain term"]
30
+ ```
31
+
32
+ ## Examples
33
+
34
+ **Memory Content**: "Successfully implemented OAuth authentication using JWT tokens for the user login system. The solution handles token refresh and validation properly."
35
+
36
+ **Output**:
37
+ ```json
38
+ ["OAuth authentication", "JWT tokens", "user login", "token refresh", "authentication implementation"]
39
+ ```
40
+
41
+ **Memory Content**: "Fixed the database connection timeout issue by increasing the connection pool size and optimizing slow queries with proper indexing."
42
+
43
+ **Output**:
44
+ ```json
45
+ ["database connection", "timeout issue", "connection pool", "query optimization", "indexing"]
46
+ ```
47
+
48
+ **Memory Content**: "Learned that Alpine.js x-data components should use camelCase for method names and snake_case for data properties to follow best practices."
49
+
50
+ **Output**:
51
+ ```json
52
+ ["Alpine.js", "x-data components", "camelCase methods", "naming conventions"]
53
+ ```
python/api/import_knowledge.py CHANGED
@@ -16,12 +16,22 @@ class ImportKnowledge(ApiHandler):
16
  context = self.get_context(ctxid)
17
 
18
  file_list = request.files.getlist("files[]")
19
- KNOWLEDGE_FOLDER = files.get_abs_path(memory.get_custom_knowledge_subdir_abs(context.agent0),"main")
 
 
 
 
 
 
 
 
 
 
20
 
21
  saved_filenames = []
22
 
23
  for file in file_list:
24
- if file:
25
  filename = secure_filename(file.filename) # type: ignore
26
  file.save(os.path.join(KNOWLEDGE_FOLDER, filename))
27
  saved_filenames.append(filename)
@@ -33,4 +43,4 @@ class ImportKnowledge(ApiHandler):
33
  return {
34
  "message": "Knowledge Imported",
35
  "filenames": saved_filenames[:5]
36
- }
 
16
  context = self.get_context(ctxid)
17
 
18
  file_list = request.files.getlist("files[]")
19
+ KNOWLEDGE_FOLDER = files.get_abs_path(memory.get_custom_knowledge_subdir_abs(context.agent0), "main")
20
+
21
+ # Ensure knowledge folder exists (create if missing)
22
+ try:
23
+ os.makedirs(KNOWLEDGE_FOLDER, exist_ok=True)
24
+ except (OSError, PermissionError) as e:
25
+ raise Exception(f"Failed to create knowledge folder {KNOWLEDGE_FOLDER}: {e}")
26
+
27
+ # Verify the directory is accessible
28
+ if not os.access(KNOWLEDGE_FOLDER, os.W_OK):
29
+ raise Exception(f"Knowledge folder {KNOWLEDGE_FOLDER} is not writable")
30
 
31
  saved_filenames = []
32
 
33
  for file in file_list:
34
+ if file and file.filename:
35
  filename = secure_filename(file.filename) # type: ignore
36
  file.save(os.path.join(KNOWLEDGE_FOLDER, filename))
37
  saved_filenames.append(filename)
 
43
  return {
44
  "message": "Knowledge Imported",
45
  "filenames": saved_filenames[:5]
46
+ }
python/extensions/message_loop_prompts_after/_50_recall_memories.py CHANGED
@@ -2,6 +2,7 @@ import asyncio
2
  from python.helpers.extension import Extension
3
  from python.helpers.memory import Memory
4
  from agent import LoopData
 
5
 
6
  DATA_NAME_TASK = "_recall_memories_task"
7
 
@@ -10,8 +11,8 @@ class RecallMemories(Extension):
10
 
11
  INTERVAL = 3
12
  HISTORY = 10000
13
- RESULTS = 3
14
- THRESHOLD = 0.6
15
 
16
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
17
 
@@ -88,8 +89,7 @@ class RecallMemories(Extension):
88
  memories_text = ""
89
  for memory in memories:
90
  memories_text += memory.page_content + "\n\n"
91
- memories_text = memories_text.strip()
92
-
93
  # log the full results
94
  log_item.update(memories=memories_text)
95
 
 
2
  from python.helpers.extension import Extension
3
  from python.helpers.memory import Memory
4
  from agent import LoopData
5
+ from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD
6
 
7
  DATA_NAME_TASK = "_recall_memories_task"
8
 
 
11
 
12
  INTERVAL = 3
13
  HISTORY = 10000
14
+ RESULTS = 5
15
+ THRESHOLD = DEFAULT_MEMORY_THRESHOLD
16
 
17
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
18
 
 
89
  memories_text = ""
90
  for memory in memories:
91
  memories_text += memory.page_content + "\n\n"
92
+
 
93
  # log the full results
94
  log_item.update(memories=memories_text)
95
 
python/extensions/message_loop_prompts_after/_51_recall_solutions.py CHANGED
@@ -2,16 +2,18 @@ import asyncio
2
  from python.helpers.extension import Extension
3
  from python.helpers.memory import Memory
4
  from agent import LoopData
 
5
 
6
  DATA_NAME_TASK = "_recall_solutions_task"
7
 
 
8
  class RecallSolutions(Extension):
9
 
10
  INTERVAL = 3
11
  HISTORY = 10000
12
- SOLUTIONS_COUNT = 2
13
- INSTRUMENTS_COUNT = 2
14
- THRESHOLD = 0.6
15
 
16
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
17
 
@@ -26,11 +28,11 @@ class RecallSolutions(Extension):
26
 
27
  async def search_solutions(self, loop_data: LoopData, **kwargs):
28
 
29
- #cleanup
30
  extras = loop_data.extras_persistent
31
  if "solutions" in extras:
32
  del extras["solutions"]
33
-
34
  # try:
35
 
36
  # show full util message
 
2
  from python.helpers.extension import Extension
3
  from python.helpers.memory import Memory
4
  from agent import LoopData
5
+ from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD
6
 
7
  DATA_NAME_TASK = "_recall_solutions_task"
8
 
9
+
10
  class RecallSolutions(Extension):
11
 
12
  INTERVAL = 3
13
  HISTORY = 10000
14
+ SOLUTIONS_COUNT = 3
15
+ INSTRUMENTS_COUNT = 3
16
+ THRESHOLD = DEFAULT_MEMORY_THRESHOLD
17
 
18
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
19
 
 
28
 
29
  async def search_solutions(self, loop_data: LoopData, **kwargs):
30
 
31
+ # cleanup
32
  extras = loop_data.extras_persistent
33
  if "solutions" in extras:
34
  del extras["solutions"]
35
+
36
  # try:
37
 
38
  # show full util message
python/extensions/monologue_end/_50_memorize_fragments.py CHANGED
@@ -4,12 +4,11 @@ from python.helpers.memory import Memory
4
  from python.helpers.dirty_json import DirtyJson
5
  from agent import LoopData
6
  from python.helpers.log import LogItem
 
7
 
8
 
9
  class MemorizeMemories(Extension):
10
 
11
- REPLACE_THRESHOLD = 0.9
12
-
13
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
14
  # try:
15
 
@@ -20,7 +19,8 @@ class MemorizeMemories(Extension):
20
  )
21
 
22
  # memorize in background
23
- asyncio.create_task(self.memorize(loop_data, log_item))
 
24
 
25
  async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
26
 
@@ -77,37 +77,75 @@ class MemorizeMemories(Extension):
77
  else:
78
  log_item.update(heading=f"{len(memories)} entries to memorize.")
79
 
80
- # save chat history
81
- db = await Memory.get(self.agent)
82
-
83
  memories_txt = ""
84
- rem = []
 
 
85
  for memory in memories:
86
- # solution to plain text:
87
  txt = f"{memory}"
88
  memories_txt += "\n\n" + txt
89
- log_item.update(memories=memories_txt.strip())
90
-
91
- # remove previous fragments too similiar to this one
92
- if self.REPLACE_THRESHOLD > 0:
93
- rem += await db.delete_documents_by_query(
94
- query=txt,
95
- threshold=self.REPLACE_THRESHOLD,
96
- filter=f"area=='{Memory.Area.FRAGMENTS.value}'",
 
97
  )
98
- if rem:
99
- rem_txt = "\n\n".join(Memory.format_docs_plain(rem))
100
- log_item.update(replaced=rem_txt)
101
 
102
- # insert new solution
103
- await db.insert_text(text=txt, metadata={"area": Memory.Area.FRAGMENTS.value})
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  log_item.update(
106
- result=f"{len(memories)} entries memorized.",
107
- heading=f"{len(memories)} entries memorized.",
 
 
 
 
108
  )
109
- if rem:
110
- log_item.stream(result=f"\nReplaced {len(rem)} previous memories.")
111
 
112
  # except Exception as e:
113
  # err = errors.format_error(e)
 
4
  from python.helpers.dirty_json import DirtyJson
5
  from agent import LoopData
6
  from python.helpers.log import LogItem
7
+ from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD
8
 
9
 
10
  class MemorizeMemories(Extension):
11
 
 
 
12
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
13
  # try:
14
 
 
19
  )
20
 
21
  # memorize in background
22
+ task = asyncio.create_task(self.memorize(loop_data, log_item))
23
+ return task
24
 
25
  async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
26
 
 
77
  else:
78
  log_item.update(heading=f"{len(memories)} entries to memorize.")
79
 
80
+ # Process memories with intelligent consolidation
 
 
81
  memories_txt = ""
82
+ total_processed = 0
83
+ total_consolidated = 0
84
+
85
  for memory in memories:
86
+ # Convert memory to plain text
87
  txt = f"{memory}"
88
  memories_txt += "\n\n" + txt
89
+
90
+ try:
91
+ # Use intelligent consolidation system
92
+ from python.helpers.memory_consolidation import create_memory_consolidator
93
+ consolidator = create_memory_consolidator(
94
+ self.agent,
95
+ similarity_threshold=DEFAULT_MEMORY_THRESHOLD, # More permissive for discovery
96
+ max_similar_memories=8,
97
+ max_llm_context_memories=4
98
  )
 
 
 
99
 
100
+ # Create memory item-specific log for detailed tracking
101
+ memory_log = self.agent.context.log.log(
102
+ type="util",
103
+ heading=f"Processing memory fragment: {txt[:50]}...",
104
+ temp=False,
105
+ update_progress="none" # Don't affect status bar
106
+ )
107
+
108
+ # Process with intelligent consolidation
109
+ result_obj = await consolidator.process_new_memory(
110
+ new_memory=txt,
111
+ area=Memory.Area.FRAGMENTS.value,
112
+ metadata={"area": Memory.Area.FRAGMENTS.value},
113
+ log_item=memory_log
114
+ )
115
 
116
+ # Update the individual log item with completion status but keep it temporary
117
+ if result_obj.get("success"):
118
+ total_consolidated += 1
119
+ memory_log.update(
120
+ result="Fragment processed successfully",
121
+ heading=f"Memory fragment completed: {txt[:50]}...",
122
+ temp=False, # Show completion message
123
+ update_progress="none" # Show briefly then disappear
124
+ )
125
+ else:
126
+ memory_log.update(
127
+ result="Fragment processing failed",
128
+ heading=f"Memory fragment failed: {txt[:50]}...",
129
+ temp=False, # Show completion message
130
+ update_progress="none" # Show briefly then disappear
131
+ )
132
+ total_processed += 1
133
+
134
+ except Exception as e:
135
+ # Log error but continue processing
136
+ log_item.update(consolidation_error=str(e))
137
+ total_processed += 1
138
+
139
+ # Update final results with structured logging
140
+ memories_txt = memories_txt.strip()
141
  log_item.update(
142
+ heading=f"Memorization completed: {total_processed} memories processed, {total_consolidated} intelligently consolidated",
143
+ memories=memories_txt,
144
+ result=f"{total_processed} memories processed, {total_consolidated} intelligently consolidated",
145
+ memories_processed=total_processed,
146
+ memories_consolidated=total_consolidated,
147
+ update_progress="none"
148
  )
 
 
149
 
150
  # except Exception as e:
151
  # err = errors.format_error(e)
python/extensions/monologue_end/_51_memorize_solutions.py CHANGED
@@ -4,12 +4,11 @@ from python.helpers.memory import Memory
4
  from python.helpers.dirty_json import DirtyJson
5
  from agent import LoopData
6
  from python.helpers.log import LogItem
 
7
 
8
 
9
  class MemorizeSolutions(Extension):
10
 
11
- REPLACE_THRESHOLD = 0.9
12
-
13
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
14
  # try:
15
 
@@ -20,7 +19,8 @@ class MemorizeSolutions(Extension):
20
  )
21
 
22
  # memorize in background
23
- asyncio.create_task(self.memorize(loop_data, log_item))
 
24
 
25
  async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
26
  # get system message and chat history for util llm
@@ -78,13 +78,13 @@ class MemorizeSolutions(Extension):
78
  heading=f"{len(solutions)} successful solutions to memorize."
79
  )
80
 
81
- # save chat history
82
- db = await Memory.get(self.agent)
83
-
84
  solutions_txt = ""
85
- rem = []
 
 
86
  for solution in solutions:
87
- # solution to plain text:
88
  if isinstance(solution, dict):
89
  problem = solution.get('problem', 'Unknown problem')
90
  solution_text = solution.get('solution', 'Unknown solution')
@@ -94,28 +94,65 @@ class MemorizeSolutions(Extension):
94
  txt = f"# Solution\n {str(solution)}"
95
  solutions_txt += txt + "\n\n"
96
 
97
- # remove previous solutions too similiar to this one
98
- if self.REPLACE_THRESHOLD > 0:
99
- rem += await db.delete_documents_by_query(
100
- query=txt,
101
- threshold=self.REPLACE_THRESHOLD,
102
- filter=f"area=='{Memory.Area.SOLUTIONS.value}'",
 
 
103
  )
104
- if rem:
105
- rem_txt = "\n\n".join(Memory.format_docs_plain(rem))
106
- log_item.update(replaced=rem_txt)
107
 
108
- # insert new solution
109
- await db.insert_text(text=txt, metadata={"area": Memory.Area.SOLUTIONS.value})
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  solutions_txt = solutions_txt.strip()
112
- log_item.update(solutions=solutions_txt)
113
  log_item.update(
114
- result=f"{len(solutions)} solutions memorized.",
115
- heading=f"{len(solutions)} solutions memorized.",
 
 
 
 
116
  )
117
- if rem:
118
- log_item.stream(result=f"\nReplaced {len(rem)} previous solutions.")
119
 
120
  # except Exception as e:
121
  # err = errors.format_error(e)
 
4
  from python.helpers.dirty_json import DirtyJson
5
  from agent import LoopData
6
  from python.helpers.log import LogItem
7
+ from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD
8
 
9
 
10
  class MemorizeSolutions(Extension):
11
 
 
 
12
  async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
13
  # try:
14
 
 
19
  )
20
 
21
  # memorize in background
22
+ task = asyncio.create_task(self.memorize(loop_data, log_item))
23
+ return task
24
 
25
  async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
26
  # get system message and chat history for util llm
 
78
  heading=f"{len(solutions)} successful solutions to memorize."
79
  )
80
 
81
+ # Process solutions with intelligent consolidation
 
 
82
  solutions_txt = ""
83
+ total_processed = 0
84
+ total_consolidated = 0
85
+
86
  for solution in solutions:
87
+ # Convert solution to structured text
88
  if isinstance(solution, dict):
89
  problem = solution.get('problem', 'Unknown problem')
90
  solution_text = solution.get('solution', 'Unknown solution')
 
94
  txt = f"# Solution\n {str(solution)}"
95
  solutions_txt += txt + "\n\n"
96
 
97
+ try:
98
+ # Use intelligent consolidation system
99
+ from python.helpers.memory_consolidation import create_memory_consolidator
100
+ consolidator = create_memory_consolidator(
101
+ self.agent,
102
+ similarity_threshold=DEFAULT_MEMORY_THRESHOLD, # More permissive for discovery
103
+ max_similar_memories=6, # Fewer for solutions (more complex)
104
+ max_llm_context_memories=3
105
  )
 
 
 
106
 
107
+ # Create solution-specific log for detailed tracking
108
+ solution_log = self.agent.context.log.log(
109
+ type="util",
110
+ heading=f"Processing solution: {txt[:50]}...",
111
+ temp=False,
112
+ update_progress="none" # Don't affect status bar
113
+ )
114
+
115
+ # Process with intelligent consolidation
116
+ result_obj = await consolidator.process_new_memory(
117
+ new_memory=txt,
118
+ area=Memory.Area.SOLUTIONS.value,
119
+ metadata={"area": Memory.Area.SOLUTIONS.value},
120
+ log_item=solution_log
121
+ )
122
 
123
+ # Update the individual log item with completion status but keep it temporary
124
+ if result_obj.get("success"):
125
+ total_consolidated += 1
126
+ solution_log.update(
127
+ result="Solution processed successfully",
128
+ heading=f"Solution completed: {txt[:50]}...",
129
+ temp=False, # Show completion message
130
+ update_progress="none" # Show briefly then disappear
131
+ )
132
+ else:
133
+ solution_log.update(
134
+ result="Solution processing failed",
135
+ heading=f"Solution failed: {txt[:50]}...",
136
+ temp=False, # Show completion message
137
+ update_progress="none" # Show briefly then disappear
138
+ )
139
+ total_processed += 1
140
+
141
+ except Exception as e:
142
+ # Log error but continue processing
143
+ log_item.update(consolidation_error=str(e))
144
+ total_processed += 1
145
+
146
+ # Update final results with structured logging
147
  solutions_txt = solutions_txt.strip()
 
148
  log_item.update(
149
+ heading=f"Solution memorization completed: {total_processed} solutions processed, {total_consolidated} intelligently consolidated",
150
+ solutions=solutions_txt,
151
+ result=f"{total_processed} solutions processed, {total_consolidated} intelligently consolidated",
152
+ solutions_processed=total_processed,
153
+ solutions_consolidated=total_consolidated,
154
+ update_progress="none"
155
  )
 
 
156
 
157
  # except Exception as e:
158
  # err = errors.format_error(e)
python/helpers/knowledge_import.py CHANGED
@@ -1,17 +1,13 @@
1
  import glob
2
  import os
3
  import hashlib
4
- import json
5
  from typing import Any, Dict, Literal, TypedDict
6
  from langchain_community.document_loaders import (
7
  CSVLoader,
8
- JSONLoader,
9
  PyPDFLoader,
10
  TextLoader,
11
  UnstructuredHTMLLoader,
12
- UnstructuredMarkdownLoader,
13
  )
14
- from python.helpers import files
15
  from python.helpers.log import LogItem
16
  from python.helpers.print_style import PrintStyle
17
 
@@ -41,34 +37,72 @@ def load_knowledge(
41
  metadata: dict[str, Any] = {},
42
  filename_pattern: str = "**/*",
43
  ) -> Dict[str, KnowledgeImport]:
 
 
44
 
45
- # from python.helpers.memory import Memory
 
 
46
 
47
  # Mapping file extensions to corresponding loader classes
 
48
  file_types_loaders = {
49
  "txt": TextLoader,
50
  "pdf": PyPDFLoader,
51
  "csv": CSVLoader,
52
  "html": UnstructuredHTMLLoader,
53
- # "json": JSONLoader,
54
- "json": TextLoader,
55
- # "md": UnstructuredMarkdownLoader,
56
- "md": TextLoader,
57
  }
58
 
59
  cnt_files = 0
60
  cnt_docs = 0
61
 
62
- # for area in Memory.Area:
63
- # subdir = files.get_abs_path(knowledge_dir, area.value)
64
-
65
- # if not os.path.exists(knowledge_dir):
66
- # os.makedirs(knowledge_dir)
67
- # continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
  # Fetch all files in the directory with specified extensions
70
- kn_files = glob.glob(knowledge_dir + "/" + filename_pattern, recursive=True)
71
- kn_files = [f for f in kn_files if os.path.isfile(f)]
 
 
 
 
 
 
72
 
73
  if kn_files:
74
  PrintStyle.standard(
@@ -80,48 +114,96 @@ def load_knowledge(
80
  )
81
 
82
  for file_path in kn_files:
83
- ext = file_path.split(".")[-1].lower()
84
- if ext in file_types_loaders:
 
 
 
 
 
 
 
 
85
  checksum = calculate_checksum(file_path)
86
- file_key = file_path # os.path.relpath(file_path, knowledge_dir)
 
87
 
88
- # Load existing data from the index or create a new entry
89
- file_data = index.get(file_key, {})
90
 
 
 
 
 
 
 
 
 
 
 
91
  if file_data.get("checksum") == checksum:
92
  file_data["state"] = "original"
93
  else:
94
  file_data["state"] = "changed"
95
 
 
96
  if file_data["state"] == "changed":
97
  file_data["checksum"] = checksum
98
  loader_cls = file_types_loaders[ext]
99
- loader = loader_cls(
100
- file_path,
101
- **(
102
- text_loader_kwargs
103
- if ext in ["txt", "csv", "html", "md"]
104
- else {}
105
- ),
106
- )
107
- file_data["documents"] = loader.load_and_split()
108
- for doc in file_data["documents"]:
109
- doc.metadata = {**doc.metadata, **metadata}
110
- cnt_files += 1
111
- cnt_docs += len(file_data["documents"])
112
- # PrintStyle.standard(f"Imported {len(file_data['documents'])} documents from {file_path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
 
114
  # Update the index
115
- index[file_key] = file_data # type: ignore
 
 
 
 
116
 
117
- # loop index where state is not set and mark it as removed
118
- for file_key, file_data in index.items():
119
- if not file_data.get("state", ""):
 
120
  index[file_key]["state"] = "removed"
121
 
122
- PrintStyle.standard(f"Processed {cnt_docs} documents from {cnt_files} files.")
123
- if log_item:
124
- log_item.stream(
125
- progress=f"\nProcessed {cnt_docs} documents from {cnt_files} files."
126
- )
 
 
 
127
  return index
 
1
  import glob
2
  import os
3
  import hashlib
 
4
  from typing import Any, Dict, Literal, TypedDict
5
  from langchain_community.document_loaders import (
6
  CSVLoader,
 
7
  PyPDFLoader,
8
  TextLoader,
9
  UnstructuredHTMLLoader,
 
10
  )
 
11
  from python.helpers.log import LogItem
12
  from python.helpers.print_style import PrintStyle
13
 
 
37
  metadata: dict[str, Any] = {},
38
  filename_pattern: str = "**/*",
39
  ) -> Dict[str, KnowledgeImport]:
40
+ """
41
+ Load knowledge files from a directory with change detection and metadata enhancement.
42
 
43
+ This function now includes enhanced error handling and compatibility with the
44
+ intelligent memory consolidation system.
45
+ """
46
 
47
  # Mapping file extensions to corresponding loader classes
48
+ # Note: Using TextLoader for JSON and MD to avoid parsing issues with consolidation
49
  file_types_loaders = {
50
  "txt": TextLoader,
51
  "pdf": PyPDFLoader,
52
  "csv": CSVLoader,
53
  "html": UnstructuredHTMLLoader,
54
+ "json": TextLoader, # Use TextLoader for better consolidation compatibility
55
+ "md": TextLoader, # Use TextLoader for better consolidation compatibility
 
 
56
  }
57
 
58
  cnt_files = 0
59
  cnt_docs = 0
60
 
61
+ # Validate and create knowledge directory if needed
62
+ if not knowledge_dir:
63
+ if log_item:
64
+ log_item.stream(progress="\nNo knowledge directory specified")
65
+ PrintStyle(font_color="yellow").print("No knowledge directory specified")
66
+ return index
67
+
68
+ if not os.path.exists(knowledge_dir):
69
+ try:
70
+ os.makedirs(knowledge_dir, exist_ok=True)
71
+ # Verify the directory was actually created and is accessible
72
+ if not os.path.exists(knowledge_dir) or not os.access(knowledge_dir, os.R_OK):
73
+ error_msg = f"Knowledge directory {knowledge_dir} was created but is not accessible"
74
+ if log_item:
75
+ log_item.stream(progress=f"\n{error_msg}")
76
+ PrintStyle(font_color="red").print(error_msg)
77
+ return index
78
+
79
+ if log_item:
80
+ log_item.stream(progress=f"\nCreated knowledge directory: {knowledge_dir}")
81
+ PrintStyle(font_color="green").print(f"Created knowledge directory: {knowledge_dir}")
82
+ except (OSError, PermissionError) as e:
83
+ error_msg = f"Failed to create knowledge directory {knowledge_dir}: {e}"
84
+ if log_item:
85
+ log_item.stream(progress=f"\n{error_msg}")
86
+ PrintStyle(font_color="red").print(error_msg)
87
+ return index
88
+
89
+ # Final accessibility check for existing directories
90
+ if not os.access(knowledge_dir, os.R_OK):
91
+ error_msg = f"Knowledge directory {knowledge_dir} exists but is not readable"
92
+ if log_item:
93
+ log_item.stream(progress=f"\n{error_msg}")
94
+ PrintStyle(font_color="red").print(error_msg)
95
+ return index
96
 
97
  # Fetch all files in the directory with specified extensions
98
+ try:
99
+ kn_files = glob.glob(os.path.join(knowledge_dir, filename_pattern), recursive=True)
100
+ kn_files = [f for f in kn_files if os.path.isfile(f) and not os.path.basename(f).startswith('.')]
101
+ except Exception as e:
102
+ PrintStyle(font_color="red").print(f"Error scanning knowledge directory {knowledge_dir}: {e}")
103
+ if log_item:
104
+ log_item.stream(progress=f"\nError scanning directory: {e}")
105
+ return index
106
 
107
  if kn_files:
108
  PrintStyle.standard(
 
114
  )
115
 
116
  for file_path in kn_files:
117
+ try:
118
+ # Get file extension safely
119
+ file_parts = os.path.basename(file_path).split('.')
120
+ if len(file_parts) < 2:
121
+ continue # Skip files without extensions
122
+
123
+ ext = file_parts[-1].lower()
124
+ if ext not in file_types_loaders:
125
+ continue # Skip unsupported file types
126
+
127
  checksum = calculate_checksum(file_path)
128
+ if not checksum:
129
+ continue # Skip files with checksum errors
130
 
131
+ file_key = file_path
 
132
 
133
+ # Load existing data from the index or create a new entry
134
+ file_data: KnowledgeImport = index.get(file_key, {
135
+ "file": file_key,
136
+ "checksum": "",
137
+ "ids": [],
138
+ "state": "changed",
139
+ "documents": []
140
+ })
141
+
142
+ # Check if file has changed
143
  if file_data.get("checksum") == checksum:
144
  file_data["state"] = "original"
145
  else:
146
  file_data["state"] = "changed"
147
 
148
+ # Process changed files
149
  if file_data["state"] == "changed":
150
  file_data["checksum"] = checksum
151
  loader_cls = file_types_loaders[ext]
152
+
153
+ try:
154
+ loader = loader_cls(
155
+ file_path,
156
+ **(
157
+ text_loader_kwargs
158
+ if ext in ["txt", "csv", "html", "md"]
159
+ else {}
160
+ ),
161
+ )
162
+ documents = loader.load_and_split()
163
+
164
+ # Enhanced metadata for better consolidation compatibility
165
+ enhanced_metadata = {
166
+ **metadata,
167
+ "source_file": os.path.basename(file_path),
168
+ "source_path": file_path,
169
+ "file_type": ext,
170
+ "knowledge_source": True, # Flag to distinguish from conversation memories
171
+ "import_timestamp": None, # Will be set when inserted into memory
172
+ }
173
+
174
+ # Apply metadata to all documents
175
+ for doc in documents:
176
+ doc.metadata = {**doc.metadata, **enhanced_metadata}
177
+
178
+ file_data["documents"] = documents
179
+ cnt_files += 1
180
+ cnt_docs += len(documents)
181
+
182
+ except Exception as e:
183
+ PrintStyle(font_color="red").print(f"Error loading {file_path}: {e}")
184
+ if log_item:
185
+ log_item.stream(progress=f"\nError loading {os.path.basename(file_path)}: {e}")
186
+ continue
187
 
188
  # Update the index
189
+ index[file_key] = file_data
190
+
191
+ except Exception as e:
192
+ PrintStyle(font_color="red").print(f"Error processing {file_path}: {e}")
193
+ continue
194
 
195
+ # Mark removed files
196
+ current_files = set(kn_files)
197
+ for file_key, file_data in list(index.items()):
198
+ if file_key not in current_files and not file_data.get("state"):
199
  index[file_key]["state"] = "removed"
200
 
201
+ # Log results
202
+ if cnt_files > 0 or cnt_docs > 0:
203
+ PrintStyle.standard(f"Processed {cnt_docs} documents from {cnt_files} files.")
204
+ if log_item:
205
+ log_item.stream(
206
+ progress=f"\nProcessed {cnt_docs} documents from {cnt_files} files."
207
+ )
208
+
209
  return index
python/helpers/memory_consolidation.py ADDED
@@ -0,0 +1,791 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ from dataclasses import dataclass, field
4
+ from datetime import datetime, timezone
5
+ from typing import Any, Dict, List, Optional
6
+ from enum import Enum
7
+
8
+ from langchain_core.documents import Document
9
+
10
+ from python.helpers.memory import Memory
11
+ from python.helpers.dirty_json import DirtyJson
12
+ from python.helpers.log import LogItem
13
+ from python.helpers.print_style import PrintStyle
14
+ from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD
15
+ from agent import Agent
16
+
17
+
18
+ class ConsolidationAction(Enum):
19
+ """Actions that can be taken during memory consolidation."""
20
+ MERGE = "merge"
21
+ REPLACE = "replace"
22
+ KEEP_SEPARATE = "keep_separate"
23
+ UPDATE = "update"
24
+ SKIP = "skip"
25
+
26
+
27
+ @dataclass
28
+ class ConsolidationConfig:
29
+ """Configuration for memory consolidation behavior."""
30
+ similarity_threshold: float = DEFAULT_MEMORY_THRESHOLD
31
+ max_similar_memories: int = 10
32
+ consolidation_sys_prompt: str = "memory.consolidation.sys.md"
33
+ consolidation_msg_prompt: str = "memory.consolidation.msg.md"
34
+ max_llm_context_memories: int = 5
35
+ keyword_extraction_sys_prompt: str = "memory.keyword_extraction.sys.md"
36
+ keyword_extraction_msg_prompt: str = "memory.keyword_extraction.msg.md"
37
+ processing_timeout_seconds: int = 60
38
+ # Add safety threshold for REPLACE actions
39
+ replace_similarity_threshold: float = 0.9 # Higher threshold for replacement safety
40
+
41
+
42
+ @dataclass
43
+ class ConsolidationResult:
44
+ """Result of memory consolidation analysis."""
45
+ action: ConsolidationAction
46
+ memories_to_remove: List[str] = field(default_factory=list)
47
+ memories_to_update: List[Dict[str, Any]] = field(default_factory=list)
48
+ new_memory_content: str = ""
49
+ metadata: Dict[str, Any] = field(default_factory=dict)
50
+ reasoning: str = ""
51
+
52
+
53
+ @dataclass
54
+ class MemoryAnalysisContext:
55
+ """Context for LLM memory analysis."""
56
+ new_memory: str
57
+ similar_memories: List[Document]
58
+ area: str
59
+ timestamp: str
60
+ existing_metadata: Dict[str, Any]
61
+
62
+
63
+ class MemoryConsolidator:
64
+ """
65
+ Intelligent memory consolidation system that uses LLM analysis to determine
66
+ optimal memory organization and automatically consolidates related memories.
67
+ """
68
+
69
+ def __init__(self, agent: Agent, config: Optional[ConsolidationConfig] = None):
70
+ self.agent = agent
71
+ self.config = config or ConsolidationConfig()
72
+
73
+ async def process_new_memory(
74
+ self,
75
+ new_memory: str,
76
+ area: str,
77
+ metadata: Dict[str, Any],
78
+ log_item: Optional[LogItem] = None
79
+ ) -> dict:
80
+ """
81
+ Process a new memory through the intelligent consolidation pipeline.
82
+
83
+ Args:
84
+ new_memory: The new memory content to process
85
+ area: Memory area (MAIN, FRAGMENTS, SOLUTIONS, INSTRUMENTS)
86
+ metadata: Initial metadata for the memory
87
+ log_item: Optional log item for progress tracking
88
+
89
+ Returns:
90
+ dict: {"success": bool, "memory_ids": [str, ...]}
91
+ """
92
+ try:
93
+ # Start processing with timeout
94
+ processing_task = asyncio.create_task(
95
+ self._process_memory_with_consolidation(new_memory, area, metadata, log_item)
96
+ )
97
+
98
+ result = await asyncio.wait_for(
99
+ processing_task,
100
+ timeout=self.config.processing_timeout_seconds
101
+ )
102
+ return result
103
+
104
+ except asyncio.TimeoutError:
105
+ PrintStyle().error(f"Memory consolidation timeout for area {area}")
106
+ return {"success": False, "memory_ids": []}
107
+
108
+ except Exception as e:
109
+ PrintStyle().error(f"Memory consolidation error for area {area}: {str(e)}")
110
+ return {"success": False, "memory_ids": []}
111
+
112
+ async def _process_memory_with_consolidation(
113
+ self,
114
+ new_memory: str,
115
+ area: str,
116
+ metadata: Dict[str, Any],
117
+ log_item: Optional[LogItem] = None
118
+ ) -> dict:
119
+ """Execute the full consolidation pipeline."""
120
+
121
+ if log_item:
122
+ log_item.update(progress="Starting intelligent memory consolidation...")
123
+
124
+ # Step 1: Discover similar memories
125
+ similar_memories = await self._find_similar_memories(new_memory, area, log_item)
126
+
127
+ # this block always returns
128
+ if not similar_memories:
129
+ # No similar memories found, insert directly
130
+ if log_item:
131
+ log_item.update(
132
+ progress="No similar memories found, inserting new memory",
133
+ temp=True
134
+ )
135
+ try:
136
+ db = await Memory.get(self.agent)
137
+ if 'timestamp' not in metadata:
138
+ metadata['timestamp'] = self._get_timestamp()
139
+ memory_id = await db.insert_text(new_memory, metadata)
140
+ if log_item:
141
+ log_item.update(
142
+ result="Memory inserted successfully",
143
+ memory_ids=[memory_id],
144
+ consolidation_action="direct_insert"
145
+ )
146
+ return {"success": True, "memory_ids": [memory_id]}
147
+ except Exception as e:
148
+ PrintStyle().error(f"Direct memory insertion failed: {str(e)}")
149
+ if log_item:
150
+ log_item.update(result=f"Memory insertion failed: {str(e)}")
151
+ return {"success": False, "memory_ids": []}
152
+
153
+ if log_item:
154
+ log_item.update(
155
+ progress=f"Found {len(similar_memories)} similar memories, analyzing...",
156
+ temp=True,
157
+ similar_memories_count=len(similar_memories)
158
+ )
159
+
160
+ # Step 2: Validate that similar memories still exist (they might have been deleted by previous consolidations)
161
+ if similar_memories:
162
+ memory_ids_to_check = [doc.metadata.get('id') for doc in similar_memories if doc.metadata.get('id')]
163
+ # Filter out None values and ensure all IDs are strings
164
+ memory_ids_to_check = [str(id) for id in memory_ids_to_check if id is not None]
165
+ db = await Memory.get(self.agent)
166
+ still_existing = db.db.get_by_ids(memory_ids_to_check)
167
+ existing_ids = {doc.metadata.get('id') for doc in still_existing}
168
+
169
+ # Filter out deleted memories
170
+ valid_similar_memories = [doc for doc in similar_memories if doc.metadata.get('id') in existing_ids]
171
+
172
+ if len(valid_similar_memories) != len(similar_memories):
173
+ deleted_count = len(similar_memories) - len(valid_similar_memories)
174
+ if log_item:
175
+ log_item.update(
176
+ progress=f"Filtered out {deleted_count} deleted memories, {len(valid_similar_memories)} remain for analysis",
177
+ temp=True,
178
+ race_condition_detected=True,
179
+ deleted_similar_memories_count=deleted_count
180
+ )
181
+ similar_memories = valid_similar_memories
182
+
183
+ # If no valid similar memories remain after filtering, insert directly
184
+ if not similar_memories:
185
+ if log_item:
186
+ log_item.update(
187
+ progress="No valid similar memories remain, inserting new memory",
188
+ temp=True
189
+ )
190
+ try:
191
+ db = await Memory.get(self.agent)
192
+ if 'timestamp' not in metadata:
193
+ metadata['timestamp'] = self._get_timestamp()
194
+ memory_id = await db.insert_text(new_memory, metadata)
195
+ if log_item:
196
+ log_item.update(
197
+ result="Memory inserted successfully (no valid similar memories)",
198
+ memory_ids=[memory_id],
199
+ consolidation_action="direct_insert_filtered"
200
+ )
201
+ return {"success": True, "memory_ids": [memory_id]}
202
+ except Exception as e:
203
+ PrintStyle().error(f"Direct memory insertion failed: {str(e)}")
204
+ if log_item:
205
+ log_item.update(result=f"Memory insertion failed: {str(e)}")
206
+ return {"success": False, "memory_ids": []}
207
+
208
+ # Step 3: Analyze with LLM (now with validated memories)
209
+ analysis_context = MemoryAnalysisContext(
210
+ new_memory=new_memory,
211
+ similar_memories=similar_memories,
212
+ area=area,
213
+ timestamp=self._get_timestamp(),
214
+ existing_metadata=metadata
215
+ )
216
+
217
+ consolidation_result = await self._analyze_memory_consolidation(analysis_context, log_item)
218
+
219
+ if consolidation_result.action == ConsolidationAction.SKIP:
220
+ if log_item:
221
+ log_item.update(
222
+ progress="LLM analysis suggests skipping consolidation",
223
+ temp=True
224
+ )
225
+ try:
226
+ db = await Memory.get(self.agent)
227
+ if 'timestamp' not in metadata:
228
+ metadata['timestamp'] = self._get_timestamp()
229
+ memory_id = await db.insert_text(new_memory, metadata)
230
+ if log_item:
231
+ log_item.update(
232
+ result="Memory inserted (consolidation skipped)",
233
+ memory_ids=[memory_id],
234
+ consolidation_action="skip",
235
+ reasoning=consolidation_result.reasoning or "LLM analysis suggested skipping"
236
+ )
237
+ return {"success": True, "memory_ids": [memory_id]}
238
+ except Exception as e:
239
+ PrintStyle().error(f"Skip consolidation insertion failed: {str(e)}")
240
+ if log_item:
241
+ log_item.update(result=f"Memory insertion failed: {str(e)}")
242
+ return {"success": False, "memory_ids": []}
243
+
244
+ # Step 4: Apply consolidation decisions
245
+ memory_ids = await self._apply_consolidation_result(
246
+ consolidation_result,
247
+ area,
248
+ analysis_context.existing_metadata, # Pass original metadata
249
+ log_item
250
+ )
251
+
252
+ if log_item:
253
+ if memory_ids:
254
+ log_item.update(
255
+ result=f"Consolidation completed: {consolidation_result.action.value}",
256
+ memory_ids=memory_ids,
257
+ consolidation_action=consolidation_result.action.value,
258
+ reasoning=consolidation_result.reasoning or "No specific reasoning provided",
259
+ memories_processed=len(similar_memories) + 1 # +1 for new memory
260
+ )
261
+ else:
262
+ log_item.update(
263
+ result=f"Consolidation failed: {consolidation_result.action.value}",
264
+ consolidation_action=consolidation_result.action.value,
265
+ reasoning=consolidation_result.reasoning or "Consolidation operation failed"
266
+ )
267
+
268
+ return {"success": bool(memory_ids), "memory_ids": memory_ids or []}
269
+
270
+ async def _gather_consolidated_metadata(
271
+ self,
272
+ db,
273
+ result: ConsolidationResult,
274
+ original_metadata: Dict[str, Any]
275
+ ) -> Dict[str, Any]:
276
+ """
277
+ Gather and merge metadata from memories being consolidated to preserve important fields.
278
+ This ensures critical metadata like priority, source, etc. is preserved during consolidation.
279
+ """
280
+ try:
281
+ # Start with the new memory's metadata as base
282
+ consolidated_metadata = dict(original_metadata)
283
+
284
+ # Collect all memory IDs that will be involved in consolidation
285
+ memory_ids = []
286
+
287
+ # Add memories to be removed (MERGE, REPLACE actions)
288
+ if result.memories_to_remove:
289
+ memory_ids.extend(result.memories_to_remove)
290
+
291
+ # Add memories to be updated (UPDATE action)
292
+ if result.memories_to_update:
293
+ for update_info in result.memories_to_update:
294
+ memory_id = update_info.get('id')
295
+ if memory_id:
296
+ memory_ids.append(memory_id)
297
+
298
+ # Retrieve original memories to extract their metadata
299
+ if memory_ids:
300
+ original_memories = await db.aget_by_ids(memory_ids)
301
+
302
+ # Merge ALL metadata fields from original memories
303
+ for memory in original_memories:
304
+ memory_metadata = memory.metadata
305
+
306
+ # Process ALL metadata fields from the original memory
307
+ for field_name, field_value in memory_metadata.items():
308
+ if field_name not in consolidated_metadata:
309
+ # Field doesn't exist in consolidated metadata, add it
310
+ consolidated_metadata[field_name] = field_value
311
+ elif field_name in consolidated_metadata:
312
+ # Field exists in both - handle special merge cases
313
+ if field_name == 'tags' and isinstance(field_value, list) and isinstance(consolidated_metadata[field_name], list):
314
+ # Merge tags lists and remove duplicates
315
+ merged_tags = list(set(consolidated_metadata[field_name] + field_value))
316
+ consolidated_metadata[field_name] = merged_tags
317
+ # For all other fields, keep the new memory's value (don't overwrite)
318
+ # This preserves the new memory's metadata when there are conflicts
319
+
320
+ return consolidated_metadata
321
+
322
+ except Exception as e:
323
+ # If metadata gathering fails, return original metadata as fallback
324
+ PrintStyle(font_color="yellow").print(f"Failed to gather consolidated metadata: {str(e)}")
325
+ return original_metadata
326
+
327
+ async def _find_similar_memories(
328
+ self,
329
+ new_memory: str,
330
+ area: str,
331
+ log_item: Optional[LogItem] = None
332
+ ) -> List[Document]:
333
+ """
334
+ Find similar memories using both semantic similarity and keyword matching.
335
+ Now includes knowledge source awareness and similarity scores for validation.
336
+ """
337
+ db = await Memory.get(self.agent)
338
+
339
+ # Step 1: Extract keywords/queries for enhanced search
340
+ search_queries = await self._extract_search_keywords(new_memory, log_item)
341
+
342
+ all_similar = []
343
+
344
+ # Step 2: Semantic similarity search with scores
345
+ semantic_similar = await db.search_similarity_threshold(
346
+ query=new_memory,
347
+ limit=self.config.max_similar_memories,
348
+ threshold=self.config.similarity_threshold,
349
+ filter=f"area == '{area}'"
350
+ )
351
+ all_similar.extend(semantic_similar)
352
+
353
+ # Step 3: Keyword-based searches
354
+ for query in search_queries:
355
+ if query.strip():
356
+ # Fix division by zero: ensure len(search_queries) > 0
357
+ queries_count = max(1, len(search_queries)) # Prevent division by zero
358
+ keyword_similar = await db.search_similarity_threshold(
359
+ query=query.strip(),
360
+ limit=max(3, self.config.max_similar_memories // queries_count),
361
+ threshold=self.config.similarity_threshold,
362
+ filter=f"area == '{area}'"
363
+ )
364
+ all_similar.extend(keyword_similar)
365
+
366
+ # Step 4: Deduplicate by document ID and store similarity info
367
+ seen_ids = set()
368
+ unique_similar = []
369
+ for doc in all_similar:
370
+ doc_id = doc.metadata.get('id')
371
+ if doc_id and doc_id not in seen_ids:
372
+ seen_ids.add(doc_id)
373
+ unique_similar.append(doc)
374
+
375
+ # Step 5: Calculate similarity scores for replacement validation
376
+ # Since FAISS doesn't directly expose similarity scores, use ranking-based estimation
377
+ # CRITICAL: All documents must have similarity >= search_threshold since FAISS returned them
378
+ # FIXED: Use conservative scoring that keeps all scores in safe consolidation range
379
+ similarity_scores = {}
380
+ total_docs = len(unique_similar)
381
+ search_threshold = self.config.similarity_threshold
382
+ safety_threshold = self.config.replace_similarity_threshold
383
+
384
+ for i, doc in enumerate(unique_similar):
385
+ doc_id = doc.metadata.get('id')
386
+ if doc_id:
387
+ # Convert ranking to similarity score with conservative distribution
388
+ if total_docs == 1:
389
+ ranking_similarity = 1.0 # Single document gets perfect score
390
+ else:
391
+ # Use conservative scoring: distribute between safety_threshold and 1.0
392
+ # This ensures all scores are suitable for consolidation
393
+ # First document gets 1.0, last gets safety_threshold (0.9 by default)
394
+ ranking_factor = 1.0 - (i / (total_docs - 1))
395
+ score_range = 1.0 - safety_threshold # e.g., 1.0 - 0.9 = 0.1
396
+ ranking_similarity = safety_threshold + (score_range * ranking_factor)
397
+
398
+ # Ensure minimum score is search_threshold for logical consistency
399
+ ranking_similarity = max(ranking_similarity, search_threshold)
400
+
401
+ similarity_scores[doc_id] = ranking_similarity
402
+
403
+ # Step 6: Add similarity score to document metadata for LLM analysis
404
+ for doc in unique_similar:
405
+ doc_id = doc.metadata.get('id')
406
+ estimated_similarity = similarity_scores.get(doc_id, 0.7)
407
+ # Store for later validation
408
+ doc.metadata['_consolidation_similarity'] = estimated_similarity
409
+
410
+ # Step 7: Limit to max context for LLM
411
+ limited_similar = unique_similar[:self.config.max_llm_context_memories]
412
+
413
+ return limited_similar
414
+
415
+ async def _extract_search_keywords(
416
+ self,
417
+ new_memory: str,
418
+ log_item: Optional[LogItem] = None
419
+ ) -> List[str]:
420
+ """Extract search keywords/queries from new memory using utility LLM."""
421
+
422
+ try:
423
+ system_prompt = self.agent.read_prompt(
424
+ self.config.keyword_extraction_sys_prompt,
425
+ )
426
+
427
+ message_prompt = self.agent.read_prompt(
428
+ self.config.keyword_extraction_msg_prompt,
429
+ memory_content=new_memory
430
+ )
431
+
432
+ # Call utility LLM to extract search queries
433
+ keywords_response = await self.agent.call_utility_model(
434
+ system=system_prompt,
435
+ message=message_prompt,
436
+ background=True
437
+ )
438
+
439
+ # Parse the response - expect JSON array of strings
440
+ keywords_json = DirtyJson.parse_string(keywords_response.strip())
441
+
442
+ if isinstance(keywords_json, list):
443
+ return [str(k) for k in keywords_json if k]
444
+ elif isinstance(keywords_json, str):
445
+ return [keywords_json]
446
+ else:
447
+ return []
448
+
449
+ except Exception as e:
450
+ PrintStyle().warning(f"Keyword extraction failed: {str(e)}")
451
+ # Fallback: use intelligent truncation for search
452
+ # Take first 200 chars if short, or first sentence if longer, but cap at 200 chars
453
+ if len(new_memory) <= 200:
454
+ fallback_content = new_memory
455
+ else:
456
+ first_sentence = new_memory.split('.')[0]
457
+ fallback_content = first_sentence[:200] if len(first_sentence) <= 200 else new_memory[:200]
458
+ return [fallback_content.strip()]
459
+
460
+ async def _analyze_memory_consolidation(
461
+ self,
462
+ context: MemoryAnalysisContext,
463
+ log_item: Optional[LogItem] = None
464
+ ) -> ConsolidationResult:
465
+ """Use LLM to analyze memory consolidation options."""
466
+
467
+ try:
468
+ # Prepare similar memories text
469
+ similar_memories_text = ""
470
+ for i, doc in enumerate(context.similar_memories):
471
+ timestamp = doc.metadata.get('timestamp', 'unknown')
472
+ doc_id = doc.metadata.get('id', f'doc_{i}')
473
+ similar_memories_text += f"ID: {doc_id}\nTimestamp: {timestamp}\nContent: {doc.page_content}\n\n"
474
+
475
+ # Build system prompt
476
+ system_prompt = self.agent.read_prompt(
477
+ self.config.consolidation_sys_prompt,
478
+ )
479
+
480
+ # Build message prompt
481
+ message_prompt = self.agent.read_prompt(
482
+ self.config.consolidation_msg_prompt,
483
+ new_memory=context.new_memory,
484
+ similar_memories=similar_memories_text.strip(),
485
+ area=context.area,
486
+ current_timestamp=context.timestamp,
487
+ new_memory_metadata=json.dumps(context.existing_metadata, indent=2)
488
+ )
489
+
490
+ analysis_response = await self.agent.call_utility_model(
491
+ system=system_prompt,
492
+ message=message_prompt,
493
+ callback=None,
494
+ background=True
495
+ )
496
+
497
+ # Parse LLM response
498
+ result_json = DirtyJson.parse_string(analysis_response.strip())
499
+
500
+ if not isinstance(result_json, dict):
501
+ raise ValueError("LLM response is not a valid JSON object")
502
+
503
+ # Parse consolidation result
504
+ action_str = result_json.get('action', 'skip')
505
+ try:
506
+ action = ConsolidationAction(action_str.lower())
507
+ except ValueError:
508
+ action = ConsolidationAction.SKIP
509
+
510
+ # Determine appropriate fallback for new_memory_content based on action
511
+ if action in [ConsolidationAction.MERGE, ConsolidationAction.REPLACE]:
512
+ # For MERGE/REPLACE, if no content provided, it's an error - don't use original
513
+ default_content = ""
514
+ else:
515
+ # For KEEP_SEPARATE/UPDATE/SKIP, original memory is appropriate fallback
516
+ default_content = context.new_memory
517
+
518
+ return ConsolidationResult(
519
+ action=action,
520
+ memories_to_remove=result_json.get('memories_to_remove', []),
521
+ memories_to_update=result_json.get('memories_to_update', []),
522
+ new_memory_content=result_json.get('new_memory_content', default_content),
523
+ metadata=result_json.get('metadata', {}),
524
+ reasoning=result_json.get('reasoning', '')
525
+ )
526
+
527
+ except Exception as e:
528
+ PrintStyle().warning(f"LLM consolidation analysis failed: {str(e)}")
529
+ # Fallback: skip consolidation
530
+ return ConsolidationResult(
531
+ action=ConsolidationAction.SKIP,
532
+ reasoning=f"Analysis failed: {str(e)}"
533
+ )
534
+
535
+ async def _apply_consolidation_result(
536
+ self,
537
+ result: ConsolidationResult,
538
+ area: str,
539
+ original_metadata: Dict[str, Any], # Add original metadata parameter
540
+ log_item: Optional[LogItem] = None
541
+ ) -> list:
542
+ """Apply the consolidation decisions to the memory database."""
543
+
544
+ try:
545
+ db = await Memory.get(self.agent)
546
+
547
+ # Retrieve metadata from memories being consolidated to preserve important fields
548
+ consolidated_metadata = await self._gather_consolidated_metadata(db, result, original_metadata)
549
+
550
+ # Handle each action type specifically
551
+ if result.action == ConsolidationAction.KEEP_SEPARATE:
552
+ return await self._handle_keep_separate(db, result, area, consolidated_metadata, log_item)
553
+
554
+ elif result.action == ConsolidationAction.MERGE:
555
+ return await self._handle_merge(db, result, area, consolidated_metadata, log_item)
556
+
557
+ elif result.action == ConsolidationAction.REPLACE:
558
+ return await self._handle_replace(db, result, area, consolidated_metadata, log_item)
559
+
560
+ elif result.action == ConsolidationAction.UPDATE:
561
+ return await self._handle_update(db, result, area, consolidated_metadata, log_item)
562
+
563
+ else:
564
+ # Should not reach here, but handle gracefully
565
+ PrintStyle().warning(f"Unknown consolidation action: {result.action}")
566
+ return []
567
+
568
+ except Exception as e:
569
+ PrintStyle().error(f"Failed to apply consolidation result: {str(e)}")
570
+ return []
571
+
572
+ async def _handle_keep_separate(
573
+ self,
574
+ db,
575
+ result: ConsolidationResult,
576
+ area: str,
577
+ original_metadata: Dict[str, Any], # Add original metadata parameter
578
+ log_item: Optional[LogItem] = None
579
+ ) -> list:
580
+ """Handle KEEP_SEPARATE action: Insert new memory without touching existing ones."""
581
+
582
+ if not result.new_memory_content:
583
+ return []
584
+
585
+ # Prepare metadata for new memory
586
+ # LLM metadata takes precedence over original metadata when there are conflicts
587
+ final_metadata = {
588
+ 'area': area,
589
+ 'timestamp': self._get_timestamp(),
590
+ 'consolidation_action': result.action.value,
591
+ **original_metadata, # Original metadata first
592
+ **result.metadata # LLM metadata second (wins conflicts)
593
+ }
594
+
595
+ if result.reasoning:
596
+ final_metadata['consolidation_reasoning'] = result.reasoning
597
+
598
+ new_id = await db.insert_text(result.new_memory_content, final_metadata)
599
+ return [new_id]
600
+
601
+ async def _handle_merge(
602
+ self,
603
+ db,
604
+ result: ConsolidationResult,
605
+ area: str,
606
+ original_metadata: Dict[str, Any], # Add original metadata parameter
607
+ log_item: Optional[LogItem] = None
608
+ ) -> list:
609
+ """Handle MERGE action: Combine memories, remove originals, insert consolidated version."""
610
+
611
+ # Step 1: Remove original memories being merged
612
+ if result.memories_to_remove:
613
+ await db.delete_documents_by_ids(result.memories_to_remove)
614
+
615
+ # Step 2: Insert consolidated memory
616
+ if result.new_memory_content:
617
+ # LLM metadata takes precedence over original metadata when there are conflicts
618
+ final_metadata = {
619
+ 'area': area,
620
+ 'timestamp': self._get_timestamp(),
621
+ 'consolidation_action': result.action.value,
622
+ 'consolidated_from': result.memories_to_remove,
623
+ **original_metadata, # Original metadata first
624
+ **result.metadata # LLM metadata second (wins conflicts)
625
+ }
626
+
627
+ if result.reasoning:
628
+ final_metadata['consolidation_reasoning'] = result.reasoning
629
+
630
+ new_id = await db.insert_text(result.new_memory_content, final_metadata)
631
+ return [new_id]
632
+ else:
633
+ return []
634
+
635
+ async def _handle_replace(
636
+ self,
637
+ db,
638
+ result: ConsolidationResult,
639
+ area: str,
640
+ original_metadata: Dict[str, Any], # Add original metadata parameter
641
+ log_item: Optional[LogItem] = None
642
+ ) -> list:
643
+ """Handle REPLACE action: Remove old memories, insert new version with similarity validation."""
644
+
645
+ # Step 1: Validate similarity scores for replacement safety
646
+ if result.memories_to_remove:
647
+ # Get the memories to be removed and check their similarity scores
648
+ memories_to_check = await db.aget_by_ids(result.memories_to_remove)
649
+
650
+ unsafe_replacements = []
651
+ for memory in memories_to_check:
652
+ similarity = memory.metadata.get('_consolidation_similarity', 0.7)
653
+ if similarity < self.config.replace_similarity_threshold:
654
+ unsafe_replacements.append({
655
+ 'id': memory.metadata.get('id'),
656
+ 'similarity': similarity,
657
+ 'content_preview': memory.page_content[:100]
658
+ })
659
+
660
+ # If we have unsafe replacements, either block them or require explicit confirmation
661
+ if unsafe_replacements:
662
+ PrintStyle().warning(
663
+ f"REPLACE blocked: {len(unsafe_replacements)} memories below "
664
+ f"similarity threshold {self.config.replace_similarity_threshold}, converting to KEEP_SEPARATE"
665
+ )
666
+
667
+ # Instead of replace, just insert the new memory (keep separate)
668
+ if result.new_memory_content:
669
+ final_metadata = {
670
+ 'area': area,
671
+ 'timestamp': self._get_timestamp(),
672
+ 'consolidation_action': 'keep_separate_safety', # Indicate safety conversion
673
+ 'original_action': 'replace',
674
+ 'safety_reason': f'Similarity below threshold {self.config.replace_similarity_threshold}',
675
+ **original_metadata,
676
+ **result.metadata
677
+ }
678
+
679
+ if result.reasoning:
680
+ final_metadata['consolidation_reasoning'] = result.reasoning
681
+
682
+ new_id = await db.insert_text(result.new_memory_content, final_metadata)
683
+ return [new_id]
684
+ else:
685
+ return []
686
+
687
+ # Step 2: Proceed with normal replacement if similarity checks pass
688
+ if result.memories_to_remove:
689
+ await db.delete_documents_by_ids(result.memories_to_remove)
690
+
691
+ # Step 3: Insert replacement memory
692
+ if result.new_memory_content:
693
+ # LLM metadata takes precedence over original metadata when there are conflicts
694
+ final_metadata = {
695
+ 'area': area,
696
+ 'timestamp': self._get_timestamp(),
697
+ 'consolidation_action': result.action.value,
698
+ 'replaced_memories': result.memories_to_remove,
699
+ **original_metadata, # Original metadata first
700
+ **result.metadata # LLM metadata second (wins conflicts)
701
+ }
702
+
703
+ if result.reasoning:
704
+ final_metadata['consolidation_reasoning'] = result.reasoning
705
+
706
+ new_id = await db.insert_text(result.new_memory_content, final_metadata)
707
+ return [new_id]
708
+ else:
709
+ return []
710
+
711
+ async def _handle_update(
712
+ self,
713
+ db,
714
+ result: ConsolidationResult,
715
+ area: str,
716
+ original_metadata: Dict[str, Any], # Add original metadata parameter
717
+ log_item: Optional[LogItem] = None
718
+ ) -> list:
719
+ """Handle UPDATE action: Modify existing memories in place with additional information."""
720
+
721
+ updated_count = 0
722
+ updated_ids = []
723
+
724
+ # Step 1: Update existing memories
725
+ for update_info in result.memories_to_update:
726
+ memory_id = update_info.get('id')
727
+ new_content = update_info.get('new_content', '')
728
+
729
+ if memory_id and new_content:
730
+ # Validate that the memory exists before attempting to delete it
731
+ existing_docs = await db.aget_by_ids([memory_id])
732
+ if not existing_docs:
733
+ PrintStyle().warning(f"Memory ID {memory_id} not found during update, skipping")
734
+ continue
735
+
736
+ # Delete old version and insert updated version
737
+ await db.delete_documents_by_ids([memory_id])
738
+
739
+ # LLM metadata takes precedence over original metadata when there are conflicts
740
+ updated_metadata = {
741
+ 'area': area,
742
+ 'timestamp': self._get_timestamp(),
743
+ 'consolidation_action': result.action.value,
744
+ 'updated_from': memory_id,
745
+ **original_metadata, # Original metadata first
746
+ **update_info.get('metadata', {}) # LLM metadata second (wins conflicts)
747
+ }
748
+
749
+ new_id = await db.insert_text(new_content, updated_metadata)
750
+ updated_count += 1
751
+ updated_ids.append(new_id)
752
+
753
+ # Step 2: Insert additional new memory if provided
754
+ new_memory_id = None
755
+ if result.new_memory_content:
756
+ # LLM metadata takes precedence over original metadata when there are conflicts
757
+ final_metadata = {
758
+ 'area': area,
759
+ 'timestamp': self._get_timestamp(),
760
+ 'consolidation_action': result.action.value,
761
+ **original_metadata, # Original metadata first
762
+ **result.metadata # LLM metadata second (wins conflicts)
763
+ }
764
+
765
+ if result.reasoning:
766
+ final_metadata['consolidation_reasoning'] = result.reasoning
767
+
768
+ new_memory_id = await db.insert_text(result.new_memory_content, final_metadata)
769
+ updated_ids.append(new_memory_id)
770
+
771
+ return updated_ids
772
+
773
+ def _get_timestamp(self) -> str:
774
+ """Get current timestamp in standard format."""
775
+ return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
776
+
777
+
778
+ # Factory function for easy instantiation
779
+ def create_memory_consolidator(agent: Agent, **config_overrides) -> MemoryConsolidator:
780
+ """
781
+ Create a MemoryConsolidator with optional configuration overrides.
782
+
783
+ Available configuration options:
784
+ - similarity_threshold: Discovery threshold for finding related memories (default 0.7)
785
+ - replace_similarity_threshold: Safety threshold for REPLACE actions (default 0.9)
786
+ - max_similar_memories: Maximum memories to discover (default 10)
787
+ - max_llm_context_memories: Maximum memories to send to LLM (default 5)
788
+ - processing_timeout_seconds: Timeout for consolidation processing (default 30)
789
+ """
790
+ config = ConsolidationConfig(**config_overrides)
791
+ return MemoryConsolidator(agent, config)
python/helpers/whisper.py CHANGED
@@ -68,9 +68,16 @@ async def _transcribe(model_name:str, audio_bytes_b64: str):
68
  audio_bytes = base64.b64decode(audio_bytes_b64)
69
 
70
  # Create temp audio file
 
71
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as audio_file:
72
  audio_file.write(audio_bytes)
73
-
74
- # Transcribe the audio file
75
- result = _model.transcribe(audio_file.name, fp16=False) # type: ignore
76
- return result
 
 
 
 
 
 
 
68
  audio_bytes = base64.b64decode(audio_bytes_b64)
69
 
70
  # Create temp audio file
71
+ import os
72
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as audio_file:
73
  audio_file.write(audio_bytes)
74
+ temp_path = audio_file.name
75
+ try:
76
+ # Transcribe the audio file
77
+ result = _model.transcribe(temp_path, fp16=False) # type: ignore
78
+ return result
79
+ finally:
80
+ try:
81
+ os.remove(temp_path)
82
+ except Exception:
83
+ pass # ignore errors during cleanup
python/tools/knowledge_tool._py CHANGED
@@ -1,4 +1,3 @@
1
- import os
2
  import asyncio
3
  from python.helpers import dotenv, memory, perplexity_search, duckduckgo_search
4
  from python.helpers.tool import Tool, Response
@@ -13,12 +12,17 @@ SEARCH_ENGINE_RESULTS = 10
13
 
14
  class Knowledge(Tool):
15
  async def execute(self, question="", **kwargs):
16
- # Create tasks for all three search methods
 
 
 
 
 
17
  tasks = [
18
  self.searxng_search(question),
19
  # self.perplexity_search(question),
20
  # self.duckduckgo_search(question),
21
- self.mem_search(question),
22
  ]
23
 
24
  # Run all tasks concurrently
@@ -31,8 +35,6 @@ class Knowledge(Tool):
31
  searxng_result = await self.searxng_document_qa(searxng_result, question)
32
 
33
  # Handle exceptions and format results
34
- # perplexity_result = self.format_result(perplexity_result, "Perplexity")
35
- # duckduckgo_result = self.format_result(duckduckgo_result, "DuckDuckGo")
36
  searxng_result = self.format_result_searxng(searxng_result, "Search Engine")
37
  memory_result = self.format_result(memory_result, "Memory")
38
 
@@ -102,6 +104,134 @@ class Knowledge(Tool):
102
  text = memory.Memory.format_docs_plain(docs)
103
  return "\n\n".join(text)
104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  def format_result(self, result, source):
106
  if isinstance(result, Exception):
107
  handle_error(result)
@@ -113,6 +243,9 @@ class Knowledge(Tool):
113
  handle_error(result)
114
  return f"{source} search failed: {str(result)}"
115
 
 
 
 
116
  outputs = []
117
  for item in result["results"]:
118
  if "qa" in item:
 
 
1
  import asyncio
2
  from python.helpers import dotenv, memory, perplexity_search, duckduckgo_search
3
  from python.helpers.tool import Tool, Response
 
12
 
13
  class Knowledge(Tool):
14
  async def execute(self, question="", **kwargs):
15
+ if not question:
16
+ question = kwargs.get("query", "")
17
+ if not question:
18
+ return Response(message="No question provided", break_loop=False)
19
+
20
+ # Create tasks for all search methods
21
  tasks = [
22
  self.searxng_search(question),
23
  # self.perplexity_search(question),
24
  # self.duckduckgo_search(question),
25
+ self.mem_search_enhanced(question),
26
  ]
27
 
28
  # Run all tasks concurrently
 
35
  searxng_result = await self.searxng_document_qa(searxng_result, question)
36
 
37
  # Handle exceptions and format results
 
 
38
  searxng_result = self.format_result_searxng(searxng_result, "Search Engine")
39
  memory_result = self.format_result(memory_result, "Memory")
40
 
 
104
  text = memory.Memory.format_docs_plain(docs)
105
  return "\n\n".join(text)
106
 
107
+ async def mem_search_enhanced(self, question: str):
108
+ """
109
+ Enhanced memory search with knowledge source awareness.
110
+ Separates and prioritizes knowledge sources vs conversation memories.
111
+ """
112
+ try:
113
+ db = await memory.Memory.get(self.agent)
114
+
115
+ # Search for knowledge sources (knowledge_source=True)
116
+ knowledge_docs = await db.search_similarity_threshold(
117
+ query=question, limit=5, threshold=DEFAULT_MEMORY_THRESHOLD,
118
+ filter="knowledge_source == True"
119
+ )
120
+
121
+ # Search for conversation memories (field doesn't exist or is not True)
122
+ conversation_docs = await db.search_similarity_threshold(
123
+ query=question, limit=5, threshold=DEFAULT_MEMORY_THRESHOLD,
124
+ filter="not knowledge_source if 'knowledge_source' in locals() else True"
125
+ )
126
+
127
+ # Combine and fallback to lower threshold if needed
128
+ all_docs = knowledge_docs + conversation_docs
129
+ threshold_note = ""
130
+
131
+ # If no results with default threshold, try with lower threshold
132
+ if not all_docs:
133
+ lower_threshold = DEFAULT_MEMORY_THRESHOLD * 0.8
134
+ knowledge_docs = await db.search_similarity_threshold(
135
+ query=question, limit=5, threshold=lower_threshold,
136
+ filter="knowledge_source == True"
137
+ )
138
+ conversation_docs = await db.search_similarity_threshold(
139
+ query=question, limit=5, threshold=lower_threshold,
140
+ filter="not knowledge_source if 'knowledge_source' in locals() else True"
141
+ )
142
+ all_docs = knowledge_docs + conversation_docs
143
+ if all_docs:
144
+ threshold_note = f" (threshold: {lower_threshold})"
145
+
146
+ if not all_docs:
147
+ return await self._get_memory_diagnostics(db, question)
148
+
149
+ # Separate knowledge sources from conversation memories
150
+ knowledge_sources = knowledge_docs
151
+ conversation_memories = conversation_docs
152
+ result_parts = []
153
+
154
+ # Add search summary
155
+ result_parts.append(f"## 🔍 Search Results for: '{question}'")
156
+ result_parts.append(f"**Found:** {len(knowledge_sources)} knowledge sources, {len(conversation_memories)} conversation memories{threshold_note}")
157
+
158
+ # Show knowledge sources
159
+ if knowledge_sources:
160
+ result_parts.append("")
161
+ result_parts.append("## 📚 Knowledge Sources:")
162
+ for index, doc in enumerate(knowledge_sources):
163
+ source_file = doc.metadata.get('source_file', 'Unknown source')
164
+ file_type = doc.metadata.get('file_type', '').upper()
165
+ area = doc.metadata.get('area', 'main').upper()
166
+
167
+ result_parts.append(f"**Source:** {source_file} ({file_type}) [{area}]")
168
+ result_parts.append(f"**Content:** {doc.page_content}")
169
+ if index < len(knowledge_sources) - 1:
170
+ result_parts.append("-" * 80)
171
+
172
+ # Show conversation memories
173
+ if conversation_memories:
174
+ if knowledge_sources:
175
+ result_parts.append("")
176
+ result_parts.append("## 💭 Related Experience:")
177
+ for index, doc in enumerate(conversation_memories):
178
+ timestamp = doc.metadata.get('timestamp', 'Unknown time')
179
+ area = doc.metadata.get('area', 'main').upper()
180
+ consolidation_action = doc.metadata.get('consolidation_action', '')
181
+
182
+ metadata_info = f"{timestamp} [{area}]"
183
+ if consolidation_action:
184
+ metadata_info += f" (consolidated: {consolidation_action})"
185
+
186
+ result_parts.append(f"**Experience:** {metadata_info}")
187
+ result_parts.append(f"**Content:** {doc.page_content}")
188
+ if index < len(conversation_memories) - 1:
189
+ result_parts.append("-" * 80)
190
+
191
+ return "\n".join(result_parts)
192
+
193
+ except Exception as e:
194
+ handle_error(e)
195
+ return f"Memory search failed: {str(e)}"
196
+
197
+ async def _get_memory_diagnostics(self, db, query: str):
198
+ """Provide memory diagnostics when no search results are found."""
199
+ try:
200
+ # Get sample of all documents to see what's in memory
201
+ sample_docs = await db.search_similarity_threshold(
202
+ query="test", limit=20, threshold=0.0
203
+ )
204
+
205
+ if not sample_docs:
206
+ return f"## 🔍 No Results for: '{query}'\n**Memory database appears to be empty.**"
207
+
208
+ # Analyze what's in memory
209
+ area_counts: dict[str, int] = {}
210
+ knowledge_count = 0
211
+
212
+ for doc in sample_docs:
213
+ area = doc.metadata.get('area', 'unknown')
214
+ area_counts[area] = area_counts.get(area, 0) + 1
215
+ if doc.metadata.get('knowledge_source', False):
216
+ knowledge_count += 1
217
+
218
+ result_parts = [
219
+ f"## 🔍 No Results for: '{query}'",
220
+ f"**Database contains:** {len(sample_docs)} total documents",
221
+ f"**Areas:** {', '.join([f'{area.upper()}: {count}' for area, count in area_counts.items()])}",
222
+ f"**Knowledge sources:** {knowledge_count} documents",
223
+ "",
224
+ "**Suggestions:**",
225
+ "- Try different or more general search terms",
226
+ "- Check if the information was recently memorized",
227
+ f"- Current search threshold: {DEFAULT_MEMORY_THRESHOLD}"
228
+ ]
229
+
230
+ return "\n".join(result_parts)
231
+
232
+ except Exception as e:
233
+ return f"Memory diagnostics failed: {str(e)}"
234
+
235
  def format_result(self, result, source):
236
  if isinstance(result, Exception):
237
  handle_error(result)
 
243
  handle_error(result)
244
  return f"{source} search failed: {str(result)}"
245
 
246
+ if not result or "results" not in result:
247
+ return ""
248
+
249
  outputs = []
250
  for item in result["results"]:
251
  if "qa" in item:
run_cli.py DELETED
@@ -1,116 +0,0 @@
1
- import asyncio
2
- import sys
3
- import threading, time, models, os
4
- from ansio import application_keypad, mouse_input, raw_input
5
- from ansio.input import InputEvent, get_input_event
6
- from agent import AgentContext, UserMessage
7
- from python.helpers.print_style import PrintStyle
8
- from python.helpers.files import read_file
9
- from python.helpers import files
10
- import python.helpers.timed_input as timed_input
11
- from initialize import initialize_agent
12
- from python.helpers.dotenv import load_dotenv
13
-
14
-
15
- context: AgentContext = None # type: ignore
16
- input_lock = threading.Lock()
17
-
18
-
19
- # Main conversation loop
20
- async def chat(context: AgentContext):
21
-
22
- # start the conversation loop
23
- while True:
24
- # ask user for message
25
- with input_lock:
26
- timeout = context.agent0.get_data("timeout") # how long the agent is willing to wait
27
- if not timeout: # if agent wants to wait for user input forever
28
- PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message ('e' to leave):")
29
- if sys.platform != "win32": import readline # this fixes arrow keys in terminal
30
- user_input = input("> ")
31
- PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}")
32
-
33
- else: # otherwise wait for user input with a timeout
34
- PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message ({timeout}s timeout, 'w' to wait, 'e' to leave):")
35
- if sys.platform != "win32": import readline # this fixes arrow keys in terminal
36
- # user_input = timed_input("> ", timeout=timeout)
37
- user_input = timeout_input("> ", timeout=timeout)
38
-
39
- if not user_input:
40
- user_input = context.agent0.read_prompt("fw.msg_timeout.md")
41
- PrintStyle(font_color="white", padding=False).stream(f"{user_input}")
42
- else:
43
- user_input = user_input.strip()
44
- if user_input.lower()=="w": # the user needs more time
45
- user_input = input("> ").strip()
46
- PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}")
47
-
48
-
49
-
50
- # exit the conversation when the user types 'exit'
51
- if user_input.lower() == 'e': break
52
-
53
- # send message to agent0,
54
- assistant_response = await context.communicate(UserMessage(user_input, [])).result()
55
-
56
- # print agent0 response
57
- PrintStyle(font_color="white",background_color="#1D8348", bold=True, padding=True).print(f"{context.agent0.agent_name}: reponse:")
58
- PrintStyle(font_color="white").print(f"{assistant_response}")
59
-
60
-
61
- # User intervention during agent streaming
62
- def intervention():
63
- if context.streaming_agent and not context.paused:
64
- context.paused = True # stop agent streaming
65
- PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User intervention ('e' to leave, empty to continue):")
66
-
67
- if sys.platform != "win32": import readline # this fixes arrow keys in terminal
68
- user_input = input("> ").strip()
69
- PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}")
70
-
71
- if user_input.lower() == 'e': os._exit(0) # exit the conversation when the user types 'exit'
72
- if user_input: context.streaming_agent.intervention = UserMessage(user_input, []) # set intervention message if non-empty
73
- context.paused = False # continue agent streaming
74
-
75
-
76
- # Capture keyboard input to trigger user intervention
77
- def capture_keys():
78
- global input_lock
79
- intervent=False
80
- while True:
81
- if intervent: intervention()
82
- intervent = False
83
- time.sleep(0.1)
84
-
85
- if context.streaming_agent:
86
- # with raw_input, application_keypad, mouse_input:
87
- with input_lock, raw_input, application_keypad:
88
- event: InputEvent | None = get_input_event(timeout=0.1)
89
- if event and (event.shortcut.isalpha() or event.shortcut.isspace()):
90
- intervent=True
91
- continue
92
-
93
- # User input with timeout
94
- def timeout_input(prompt, timeout=10):
95
- return timed_input.timeout_input(prompt=prompt, timeout=timeout)
96
-
97
- def run():
98
- global context
99
- PrintStyle.standard("Initializing framework...")
100
-
101
- #load env vars
102
- load_dotenv()
103
-
104
- # initialize context
105
- config = initialize_agent()
106
- context = AgentContext(config)
107
-
108
- # Start the key capture thread for user intervention during agent streaming
109
- threading.Thread(target=capture_keys, daemon=True).start()
110
-
111
- #start the chat
112
- asyncio.run(chat(context))
113
-
114
- if __name__ == "__main__":
115
- PrintStyle.standard("\n\n!!! run_cli.py is now discontinued. run_ui.py serves as both UI and API endpoint !!!\n\n")
116
- run()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
tests/mcp/stream_http_mcp_server.py DELETED
@@ -1,223 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Hello World MCP Server using FastMCP with Streamable HTTP Protocol
4
-
5
- This is a simple example demonstrating how to create an MCP server using
6
- the FastMCP framework with the streamable-http transport protocol.
7
-
8
- Features:
9
- - Hello world tool that greets users
10
- - Simple resource that provides server information
11
- - Basic prompt template for greeting
12
- - Runs using streamable-http transport for better scalability
13
- """
14
-
15
- from fastmcp import FastMCP, Context
16
- import os
17
- from datetime import datetime
18
-
19
-
20
- # Create a FastMCP server instance
21
- mcp: FastMCP = FastMCP(
22
- "Hello World Server 🚀",
23
- dependencies=[] # No special dependencies for this simple example
24
- )
25
-
26
-
27
- # ========== TOOLS ==========
28
-
29
- @mcp.tool()
30
- def hello_world(name: str = "World") -> str:
31
- """Say hello to someone with a personalized greeting.
32
-
33
- Args:
34
- name: The name of the person to greet (defaults to "World")
35
-
36
- Returns:
37
- A friendly greeting message
38
- """
39
- current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
40
- return f"Hello, {name}! 👋 Welcome to the FastMCP Hello World Server. Current time: {current_time}"
41
-
42
-
43
- @mcp.tool()
44
- def add_numbers(a: float, b: float) -> float:
45
- """Add two numbers together.
46
-
47
- Args:
48
- a: First number
49
- b: Second number
50
-
51
- Returns:
52
- The sum of the two numbers
53
- """
54
- result = a + b
55
- return result
56
-
57
-
58
- @mcp.tool()
59
- async def get_server_status(ctx: Context) -> str:
60
- """Get the current server status and information.
61
-
62
- Returns:
63
- Server status information including uptime and capabilities
64
- """
65
- # Log that someone is checking server status
66
- await ctx.info("Server status requested")
67
-
68
- # Get basic server info
69
- server_info = {
70
- "status": "running",
71
- "protocol": "MCP (Model Context Protocol)",
72
- "transport": "streamable-http",
73
- "framework": "FastMCP 2.0",
74
- "capabilities": ["tools", "resources", "prompts"],
75
- "timestamp": datetime.now().isoformat()
76
- }
77
-
78
- return f"""
79
- 🟢 Server Status: {server_info['status'].upper()}
80
-
81
- 📊 Server Information:
82
- • Protocol: {server_info['protocol']}
83
- • Transport: {server_info['transport']}
84
- • Framework: {server_info['framework']}
85
- • Capabilities: {', '.join(server_info['capabilities'])}
86
- • Last checked: {server_info['timestamp']}
87
-
88
- ✅ All systems operational!
89
- """
90
-
91
-
92
- # ========== RESOURCES ==========
93
-
94
- @mcp.resource("info://server")
95
- def get_server_info() -> str:
96
- """Static resource providing information about this MCP server."""
97
- return """
98
- 🚀 Hello World MCP Server
99
-
100
- This is a demonstration MCP server built with FastMCP, showcasing the
101
- streamable-http transport protocol.
102
-
103
- Available capabilities:
104
- • Tools: Interactive functions the LLM can call
105
- • Resources: Data sources for context
106
- • Prompts: Reusable message templates
107
-
108
- Built with FastMCP 2.0 for production-ready MCP applications.
109
- """
110
-
111
-
112
- @mcp.resource("greeting://{user_name}")
113
- def get_personal_greeting(user_name: str) -> str:
114
- """Dynamic resource template that provides personalized greetings.
115
-
116
- Args:
117
- user_name: The name of the user to create a greeting for
118
-
119
- Returns:
120
- A personalized greeting message
121
- """
122
- greetings = [
123
- f"Welcome, {user_name}! 🎉",
124
- f"Hello there, {user_name}! Great to see you! 👋",
125
- f"Greetings, {user_name}! Hope you're having a wonderful day! ☀️"
126
- ]
127
-
128
- # Select greeting based on name length (simple example)
129
- greeting_index = len(user_name) % len(greetings)
130
- return greetings[greeting_index]
131
-
132
-
133
- # ========== PROMPTS ==========
134
-
135
- @mcp.prompt()
136
- def introduction_prompt(user_name: str = "friend") -> str:
137
- """Generate a friendly introduction prompt.
138
-
139
- Args:
140
- user_name: Name of the person to introduce to
141
-
142
- Returns:
143
- A prompt for introducing the MCP server capabilities
144
- """
145
- return f"""
146
- Hello {user_name}! 👋
147
-
148
- I'm your Hello World MCP Server, here to demonstrate the power of the Model Context Protocol with FastMCP!
149
-
150
- Here's what I can help you with:
151
-
152
- 🔧 **Tools I can execute:**
153
- • hello_world - Give you personalized greetings
154
- • add_numbers - Perform simple math operations
155
- • get_server_status - Check my current status
156
-
157
- 📚 **Resources I can provide:**
158
- • Server information and documentation
159
- • Personalized greeting messages
160
-
161
- 💡 **How to use me:**
162
- Try asking me to say hello, add some numbers, or check my status!
163
-
164
- What would you like to do first?
165
- """
166
-
167
-
168
- @mcp.prompt()
169
- def math_prompt(operation: str = "addition") -> str:
170
- """Create a prompt for helping with math operations.
171
-
172
- Args:
173
- operation: The type of math operation to help with
174
-
175
- Returns:
176
- A simple prompt for math assistance
177
- """
178
- return (f"I need help with {operation}. I'd be happy to help you with {operation}! "
179
- f"I can add numbers together using my add_numbers tool. "
180
- f"Just tell me which numbers you'd like me to work with.")
181
-
182
-
183
- # ========== SERVER LIFECYCLE ==========
184
-
185
- def main():
186
- """Main function to run the MCP server."""
187
- print("🚀 Starting Hello World MCP Server with Streamable HTTP...")
188
- print("📡 Transport: streamable-http")
189
- print("🌐 Framework: FastMCP 2.0")
190
- print("🔗 Protocol: Model Context Protocol (MCP)")
191
- print()
192
-
193
- # Get configuration from environment or use defaults
194
- host = os.getenv("MCP_HOST", "0.0.0.0")
195
- port = int(os.getenv("MCP_PORT", "8000"))
196
- path = os.getenv("MCP_PATH", "/mcp")
197
-
198
- print(f"🏠 Host: {host}")
199
- print(f"🚪 Port: {port}")
200
- print(f"🛤️ Path: {path}")
201
- print(f"📍 Full URL: http://{host}:{port}{path}")
202
- print()
203
- print("✅ Server is ready to accept MCP connections!")
204
- print("💡 Use this server with MCP clients that support streamable-http transport")
205
- print()
206
-
207
- # Run the server with streamable-http transport
208
- try:
209
- mcp.run(
210
- transport="streamable-http",
211
- host=host,
212
- port=port,
213
- path=path
214
- )
215
- except KeyboardInterrupt:
216
- print("\n👋 Server shutting down gracefully...")
217
- except Exception as e:
218
- print(f"❌ Server error: {e}")
219
- raise
220
-
221
-
222
- if __name__ == "__main__":
223
- main()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
tests/mcp/stream_http_mcp_server_README.md DELETED
@@ -1,208 +0,0 @@
1
- # FastMCP Hello World Server with Streamable HTTP
2
-
3
- A comprehensive hello world example demonstrating how to build an MCP (Model Context Protocol) server using the FastMCP framework with streamable-http transport.
4
-
5
- ## 🚀 Features
6
-
7
- This server demonstrates all three core MCP primitives:
8
-
9
- ### 🔧 Tools (LLM-callable functions)
10
- - **hello_world** - Personalized greetings with timestamps
11
- - **add_numbers** - Simple math operations
12
- - **get_server_status** - Server status and information with context logging
13
-
14
- ### 📚 Resources (Data sources)
15
- - **info://server** - Static server information
16
- - **greeting://{user_name}** - Dynamic personalized greetings template
17
-
18
- ### 💡 Prompts (Reusable templates)
19
- - **introduction_prompt** - Server capability introduction
20
- - **math_prompt** - Math assistance template
21
-
22
- ## 📋 Prerequisites
23
-
24
- - Python 3.10+
25
- - pip or uv package manager
26
-
27
- ## 🛠️ Installation
28
-
29
- ### Option 1: Using pip
30
- ```bash
31
- # Install dependencies
32
- pip install -r stream_http_mcp_server_requirements.txt
33
-
34
- # Or install FastMCP directly
35
- pip install fastmcp
36
- ```
37
-
38
- ### Option 2: Using uv (recommended)
39
- ```bash
40
- # Install FastMCP with uv
41
- uv pip install fastmcp
42
- ```
43
-
44
- ## ▶️ Running the Server
45
-
46
- ### Basic Usage
47
- ```bash
48
- # Run with default settings (localhost:8000/mcp)
49
- python stream_http_mcp_server.py
50
- ```
51
-
52
- ### Custom Configuration via Environment Variables
53
- ```bash
54
- # Set custom host, port, and path
55
- export MCP_HOST=0.0.0.0
56
- export MCP_PORT=3000
57
- export MCP_PATH=/hello-mcp
58
-
59
- python stream_http_mcp_server.py
60
- ```
61
-
62
- ### Expected Output
63
- ```
64
- 🚀 Starting Hello World MCP Server with Streamable HTTP...
65
- 📡 Transport: streamable-http
66
- 🌐 Framework: FastMCP 2.0
67
- 🔗 Protocol: Model Context Protocol (MCP)
68
-
69
- 🏠 Host: 127.0.0.1
70
- 🚪 Port: 8000
71
- 🛤️ Path: /mcp
72
- 📍 Full URL: http://127.0.0.1:8000/mcp
73
-
74
- ✅ Server is ready to accept MCP connections!
75
- 💡 Use this server with MCP clients that support streamable-http transport
76
- ```
77
-
78
- ## 🧪 Testing the Server
79
-
80
- ### Method 1: Using MCP Inspector (Recommended)
81
-
82
- 1. **Install MCP Inspector**:
83
- ```bash
84
- npm install -g @modelcontextprotocol/inspector
85
- ```
86
-
87
- 2. **Run the Inspector**:
88
- ```bash
89
- npx @modelcontextprotocol/inspector
90
- ```
91
-
92
- 3. **Connect to the Server**:
93
- - Choose "Streamable HTTP" transport
94
- - Enter URL: `http://localhost:8000/mcp`
95
- - Click "Connect"
96
-
97
- 4. **Test Tools**:
98
- - Go to the "Tools" tab
99
- - Try `hello_world` with `{"name": "Alice"}`
100
- - Try `add_numbers` with `{"a": 5, "b": 3}`
101
- - Try `get_server_status` (no parameters needed)
102
-
103
- 5. **Test Resources**:
104
- - Go to "Resources" tab
105
- - View `info://server`
106
- - Try `greeting://YourName`
107
-
108
- 6. **Test Prompts**:
109
- - Go to "Prompts" tab
110
- - Try `introduction_prompt` with `{"user_name": "Developer"}`
111
- - Try `math_prompt` with `{"operation": "multiplication"}`
112
-
113
- ### Method 2: Agent Zero Integration
114
-
115
- Configure Agent Zero to use this server by adding to your MCP servers configuration:
116
-
117
- ```json
118
- [
119
- {
120
- "name": "hello_world_server",
121
- "type": "streamable-http",
122
- "url": "http://localhost:8000/mcp",
123
- "description": "Hello World FastMCP Server with streamable HTTP"
124
- }
125
- ]
126
- ```
127
-
128
- ### Method 3: Custom MCP Client
129
-
130
- Example using the MCP Python SDK:
131
-
132
- ```python
133
- from mcp.client.streamable_http import streamablehttp_client
134
- from mcp import ClientSession
135
-
136
- async def test_server():
137
- async with streamablehttp_client("http://localhost:8000/mcp") as (read, write, get_session_id):
138
- async with ClientSession(read, write) as session:
139
- await session.initialize()
140
-
141
- # Test tool
142
- result = await session.call_tool("hello_world", {"name": "Test"})
143
- print(f"Tool result: {result}")
144
-
145
- # Test resource
146
- resource = await session.read_resource("info://server")
147
- print(f"Resource: {resource}")
148
-
149
- # Run with: asyncio.run(test_server())
150
- ```
151
-
152
- ## 🔧 Configuration Options
153
-
154
- ### Environment Variables
155
- - `MCP_HOST` - Server host (default: 127.0.0.1)
156
- - `MCP_PORT` - Server port (default: 8000)
157
- - `MCP_PATH` - Server path (default: /mcp)
158
-
159
- ### Server Capabilities
160
- This server supports all MCP capabilities:
161
- - ✅ Tools (with async support and context logging)
162
- - ✅ Resources (static and dynamic templates)
163
- - ✅ Prompts (string and message-based)
164
- - ✅ Streamable HTTP transport
165
- - ✅ Session management
166
-
167
- ## 🎯 Key Concepts Demonstrated
168
-
169
- 1. **FastMCP Framework**: Modern, production-ready MCP server development
170
- 2. **Streamable HTTP Transport**: Scalable transport for web deployments
171
- 3. **Type Safety**: Full Python type hints and docstrings
172
- 4. **Async Support**: Proper async/await patterns with context
173
- 5. **Dynamic Resources**: Template-based resources with parameters
174
- 6. **Context Logging**: Using MCP context for client communication
175
- 7. **Error Handling**: Graceful startup and shutdown
176
-
177
- ## 📚 Next Steps
178
-
179
- - **Scale Up**: Use FastMCP's server composition to mount multiple apps
180
- - **Add Auth**: Implement OAuth authentication for production
181
- - **Deploy**: Use Docker or cloud platforms for production deployment
182
- - **Integrate**: Connect with Claude Desktop, Agent Zero, or custom clients
183
- - **Extend**: Add more sophisticated tools, resources, and prompts
184
-
185
- ## 🐛 Troubleshooting
186
-
187
- ### Server Won't Start
188
- - Check if port 8000 is available: `lsof -i :8000`
189
- - Try a different port: `MCP_PORT=8001 python stream_http_mcp_server.py`
190
-
191
- ### Connection Issues
192
- - Verify the URL in your client matches the server output
193
- - Check firewall settings for the port
194
- - Ensure you're using "streamable-http" transport type
195
-
196
- ### Import Errors
197
- - Install FastMCP: `pip install fastmcp`
198
- - Check Python version: `python --version` (requires 3.10+)
199
-
200
- ## 📖 Documentation Links
201
-
202
- - [FastMCP Documentation](https://gofastmcp.com/)
203
- - [MCP Specification](https://spec.modelcontextprotocol.io/)
204
- - [Agent Zero MCP Integration](../../docs/mcp_setup.md)
205
-
206
- ---
207
-
208
- Built with ❤️ using FastMCP 2.0 and the Model Context Protocol
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
tests/mcp/stream_http_mcp_server_requirements.txt DELETED
@@ -1,9 +0,0 @@
1
- # FastMCP Hello World Server Requirements
2
- # Install with: pip install -r stream_http_mcp_server_requirements.txt
3
-
4
- # FastMCP framework for building MCP servers
5
- fastmcp>=2.8.0
6
-
7
- # Optional: Additional dependencies that might be useful
8
- # uvicorn>=0.18.0 # ASGI server (may be included with FastMCP)
9
- # httpx>=0.24.0 # HTTP client (may be included with FastMCP)