Spaces:
Paused
Paused
| import asyncio | |
| from python.helpers import settings | |
| from python.helpers.extension import Extension | |
| from python.helpers.memory import Memory | |
| from python.helpers.dirty_json import DirtyJson | |
| from agent import LoopData | |
| from python.helpers.log import LogItem | |
| from python.tools.memory_load import DEFAULT_THRESHOLD as DEFAULT_MEMORY_THRESHOLD | |
| class MemorizeSolutions(Extension): | |
| async def execute(self, loop_data: LoopData = LoopData(), **kwargs): | |
| # try: | |
| set = settings.get_settings() | |
| if not set["memory_memorize_enabled"]: | |
| return | |
| # show full util message | |
| log_item = self.agent.context.log.log( | |
| type="util", | |
| heading="Memorizing succesful solutions...", | |
| ) | |
| # memorize in background | |
| task = asyncio.create_task(self.memorize(loop_data, log_item)) | |
| return task | |
| async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs): | |
| set = settings.get_settings() | |
| db = await Memory.get(self.agent) | |
| # get system message and chat history for util llm | |
| system = self.agent.read_prompt("memory.solutions_sum.sys.md") | |
| msgs_text = self.agent.concat_messages(self.agent.history) | |
| # log query streamed by LLM | |
| async def log_callback(content): | |
| log_item.stream(content=content) | |
| # call util llm to find solutions in history | |
| solutions_json = await self.agent.call_utility_model( | |
| system=system, | |
| message=msgs_text, | |
| callback=log_callback, | |
| background=True, | |
| ) | |
| # Add validation and error handling for solutions_json | |
| if not solutions_json or not isinstance(solutions_json, str): | |
| log_item.update(heading="No response from utility model.") | |
| return | |
| # Strip any whitespace that might cause issues | |
| solutions_json = solutions_json.strip() | |
| if not solutions_json: | |
| log_item.update(heading="Empty response from utility model.") | |
| return | |
| try: | |
| solutions = DirtyJson.parse_string(solutions_json) | |
| except Exception as e: | |
| log_item.update(heading=f"Failed to parse solutions response: {str(e)}") | |
| return | |
| # Validate that solutions is a list or convertible to one | |
| if solutions is None: | |
| log_item.update(heading="No valid solutions found in response.") | |
| return | |
| # If solutions is not a list, try to make it one | |
| if not isinstance(solutions, list): | |
| if isinstance(solutions, (str, dict)): | |
| solutions = [solutions] | |
| else: | |
| log_item.update(heading="Invalid solutions format received.") | |
| return | |
| if not isinstance(solutions, list) or len(solutions) == 0: | |
| log_item.update(heading="No successful solutions to memorize.") | |
| return | |
| else: | |
| solutions_txt = "\n\n".join([str(solution) for solution in solutions]).strip() | |
| log_item.update( | |
| heading=f"{len(solutions)} successful solutions to memorize.", solutions=solutions_txt | |
| ) | |
| # Process solutions with intelligent consolidation | |
| total_processed = 0 | |
| total_consolidated = 0 | |
| rem = [] | |
| for solution in solutions: | |
| # Convert solution to structured text | |
| if isinstance(solution, dict): | |
| problem = solution.get('problem', 'Unknown problem') | |
| solution_text = solution.get('solution', 'Unknown solution') | |
| txt = f"# Problem\n {problem}\n# Solution\n {solution_text}" | |
| else: | |
| # If solution is not a dict, convert it to string | |
| txt = f"# Solution\n {str(solution)}" | |
| if set["memory_memorize_consolidation"]: | |
| try: | |
| # Use intelligent consolidation system | |
| from python.helpers.memory_consolidation import create_memory_consolidator | |
| consolidator = create_memory_consolidator( | |
| self.agent, | |
| similarity_threshold=DEFAULT_MEMORY_THRESHOLD, # More permissive for discovery | |
| max_similar_memories=6, # Fewer for solutions (more complex) | |
| max_llm_context_memories=3 | |
| ) | |
| # Create solution-specific log for detailed tracking | |
| solution_log = None # too many utility messages, skip log for now | |
| # solution_log = self.agent.context.log.log( | |
| # type="util", | |
| # heading=f"Processing solution: {txt[:50]}...", | |
| # temp=False, | |
| # update_progress="none" # Don't affect status bar | |
| # ) | |
| # Process with intelligent consolidation | |
| result_obj = await consolidator.process_new_memory( | |
| new_memory=txt, | |
| area=Memory.Area.SOLUTIONS.value, | |
| metadata={"area": Memory.Area.SOLUTIONS.value}, | |
| log_item=solution_log | |
| ) | |
| # Update the individual log item with completion status but keep it temporary | |
| if result_obj.get("success"): | |
| total_consolidated += 1 | |
| if solution_log: | |
| solution_log.update( | |
| result="Solution processed successfully", | |
| heading=f"Solution completed: {txt[:50]}...", | |
| temp=False, # Show completion message | |
| update_progress="none" # Show briefly then disappear | |
| ) | |
| else: | |
| if solution_log: | |
| solution_log.update( | |
| result="Solution processing failed", | |
| heading=f"Solution failed: {txt[:50]}...", | |
| temp=False, # Show completion message | |
| update_progress="none" # Show briefly then disappear | |
| ) | |
| total_processed += 1 | |
| except Exception as e: | |
| # Log error but continue processing | |
| log_item.update(consolidation_error=str(e)) | |
| total_processed += 1 | |
| # Update final results with structured logging | |
| log_item.update( | |
| heading=f"Solution memorization completed: {total_processed} solutions processed, {total_consolidated} intelligently consolidated", | |
| solutions=solutions_txt, | |
| result=f"{total_processed} solutions processed, {total_consolidated} intelligently consolidated", | |
| solutions_processed=total_processed, | |
| solutions_consolidated=total_consolidated, | |
| update_progress="none" | |
| ) | |
| else: | |
| # remove previous solutions too similiar to this one | |
| if set["memory_memorize_replace_threshold"] > 0: | |
| rem += await db.delete_documents_by_query( | |
| query=txt, | |
| threshold=set["memory_memorize_replace_threshold"], | |
| filter=f"area=='{Memory.Area.SOLUTIONS.value}'", | |
| ) | |
| if rem: | |
| rem_txt = "\n\n".join(Memory.format_docs_plain(rem)) | |
| log_item.update(replaced=rem_txt) | |
| # insert new solution | |
| await db.insert_text(text=txt, metadata={"area": Memory.Area.SOLUTIONS.value}) | |
| log_item.update( | |
| result=f"{len(solutions)} solutions memorized.", | |
| heading=f"{len(solutions)} solutions memorized.", | |
| ) | |
| if rem: | |
| log_item.stream(result=f"\nReplaced {len(rem)} previous solutions.") | |
| # except Exception as e: | |
| # err = errors.format_error(e) | |
| # self.agent.context.log.log( | |
| # type="error", heading="Memorize solutions extension error:", content=err | |
| # ) | |