Spaces:
Paused
Paused
File size: 8,387 Bytes
8d1819a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | import asyncio
from python.helpers import settings
from python.helpers.extension import Extension
from python.helpers.memory import Memory
from python.helpers.dirty_json import DirtyJson
from agent import LoopData
from python.helpers.log import LogItem
from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD
class MemorizeSolutions(Extension):
async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
# try:
set = settings.get_settings()
if not set["memory_memorize_enabled"]:
return
# show full util message
log_item = self.agent.context.log.log(
type="util",
heading="Memorizing succesful solutions...",
)
# memorize in background
task = asyncio.create_task(self.memorize(loop_data, log_item))
return task
async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
set = settings.get_settings()
db = await Memory.get(self.agent)
# get system message and chat history for util llm
system = self.agent.read_prompt("memory.solutions_sum.sys.md")
msgs_text = self.agent.concat_messages(self.agent.history)
# log query streamed by LLM
async def log_callback(content):
log_item.stream(content=content)
# call util llm to find solutions in history
solutions_json = await self.agent.call_utility_model(
system=system,
message=msgs_text,
callback=log_callback,
background=True,
)
# Add validation and error handling for solutions_json
if not solutions_json or not isinstance(solutions_json, str):
log_item.update(heading="No response from utility model.")
return
# Strip any whitespace that might cause issues
solutions_json = solutions_json.strip()
if not solutions_json:
log_item.update(heading="Empty response from utility model.")
return
try:
solutions = DirtyJson.parse_string(solutions_json)
except Exception as e:
log_item.update(heading=f"Failed to parse solutions response: {str(e)}")
return
# Validate that solutions is a list or convertible to one
if solutions is None:
log_item.update(heading="No valid solutions found in response.")
return
# If solutions is not a list, try to make it one
if not isinstance(solutions, list):
if isinstance(solutions, (str, dict)):
solutions = [solutions]
else:
log_item.update(heading="Invalid solutions format received.")
return
if not isinstance(solutions, list) or len(solutions) == 0:
log_item.update(heading="No successful solutions to memorize.")
return
else:
solutions_txt = "\n\n".join([str(solution) for solution in solutions]).strip()
log_item.update(
heading=f"{len(solutions)} successful solutions to memorize.", solutions=solutions_txt
)
# Process solutions with intelligent consolidation
total_processed = 0
total_consolidated = 0
rem = []
for solution in solutions:
# Convert solution to structured text
if isinstance(solution, dict):
problem = solution.get('problem', 'Unknown problem')
solution_text = solution.get('solution', 'Unknown solution')
txt = f"# Problem\n {problem}\n# Solution\n {solution_text}"
else:
# If solution is not a dict, convert it to string
txt = f"# Solution\n {str(solution)}"
if set["memory_memorize_consolidation"]:
try:
# Use intelligent consolidation system
from python.helpers.memory_consolidation import create_memory_consolidator
consolidator = create_memory_consolidator(
self.agent,
similarity_threshold=DEFAULT_MEMORY_THRESHOLD, # More permissive for discovery
max_similar_memories=6, # Fewer for solutions (more complex)
max_llm_context_memories=3
)
# Create solution-specific log for detailed tracking
solution_log = None # too many utility messages, skip log for now
# solution_log = self.agent.context.log.log(
# type="util",
# heading=f"Processing solution: {txt[:50]}...",
# temp=False,
# update_progress="none" # Don't affect status bar
# )
# Process with intelligent consolidation
result_obj = await consolidator.process_new_memory(
new_memory=txt,
area=Memory.Area.SOLUTIONS.value,
metadata={"area": Memory.Area.SOLUTIONS.value},
log_item=solution_log
)
# Update the individual log item with completion status but keep it temporary
if result_obj.get("success"):
total_consolidated += 1
if solution_log:
solution_log.update(
result="Solution processed successfully",
heading=f"Solution completed: {txt[:50]}...",
temp=False, # Show completion message
update_progress="none" # Show briefly then disappear
)
else:
if solution_log:
solution_log.update(
result="Solution processing failed",
heading=f"Solution failed: {txt[:50]}...",
temp=False, # Show completion message
update_progress="none" # Show briefly then disappear
)
total_processed += 1
except Exception as e:
# Log error but continue processing
log_item.update(consolidation_error=str(e))
total_processed += 1
# Update final results with structured logging
log_item.update(
heading=f"Solution memorization completed: {total_processed} solutions processed, {total_consolidated} intelligently consolidated",
solutions=solutions_txt,
result=f"{total_processed} solutions processed, {total_consolidated} intelligently consolidated",
solutions_processed=total_processed,
solutions_consolidated=total_consolidated,
update_progress="none"
)
else:
# remove previous solutions too similiar to this one
if set["memory_memorize_replace_threshold"] > 0:
rem += await db.delete_documents_by_query(
query=txt,
threshold=set["memory_memorize_replace_threshold"],
filter=f"area=='{Memory.Area.SOLUTIONS.value}'",
)
if rem:
rem_txt = "\n\n".join(Memory.format_docs_plain(rem))
log_item.update(replaced=rem_txt)
# insert new solution
await db.insert_text(text=txt, metadata={"area": Memory.Area.SOLUTIONS.value})
log_item.update(
result=f"{len(solutions)} solutions memorized.",
heading=f"{len(solutions)} solutions memorized.",
)
if rem:
log_item.stream(result=f"\nReplaced {len(rem)} previous solutions.")
# except Exception as e:
# err = errors.format_error(e)
# self.agent.context.log.log(
# type="error", heading="Memorize solutions extension error:", content=err
# )
|