rajkumarrawal commited on
Commit
f6c1694
·
1 Parent(s): 75e5187

feat: add pickle serialization support to core engine classes

Browse files

Add __getstate__ and __setstate__ methods to AgentManager, SpacesApp, CachedReasoningEngine, OptimizedPlanningEngine, and other core classes to handle non-serializable objects like loggers during distributed processing. This enables proper object serialization for parallel/distributed execution environments by excluding logger objects from pickling and restoring them on unpickling.

Files changed (2) hide show
  1. app.py +32 -9
  2. autonomous_engine.py +67 -3
app.py CHANGED
@@ -211,20 +211,30 @@ class AgentManager:
211
  self.config = config
212
  self.agents = {}
213
  self.agent_stats = {}
214
- self._request_queue = None
215
  self._performance_metrics = {
216
- "total_requests": 0,
217
- "successful_requests": 0,
218
- "failed_requests": 0,
219
- "average_response_time": 0.0,
220
- "memory_usage": [],
221
- "cpu_usage": [],
222
- "timestamps": []
223
  }
224
 
225
  # Don't initialize agents during construction to avoid pickling issues
226
  # Agents will be created lazily when first requested
227
  logger.info("Agent manager initialized (agents will be created lazily)")
 
 
 
 
 
 
 
 
 
 
 
228
 
229
  def _initialize_default_agents(self):
230
  """Initialize default agent instances lazily."""
@@ -417,9 +427,22 @@ class SpacesApp:
417
 
418
  # Setup logging level
419
  logging.getLogger().setLevel(getattr(logging, self.config.config["log_level"]))
420
-
421
  logger.info("Spaces application initialized successfully")
422
 
 
 
 
 
 
 
 
 
 
 
 
 
 
423
  @property
424
  def validator(self):
425
  """Lazy initialization of RequestValidator."""
 
211
  self.config = config
212
  self.agents = {}
213
  self.agent_stats = {}
 
214
  self._performance_metrics = {
215
+ "total_requests": 0,
216
+ "successful_requests": 0,
217
+ "failed_requests": 0,
218
+ "average_response_time": 0.0,
219
+ "memory_usage": [],
220
+ "cpu_usage": [],
221
+ "timestamps": []
222
  }
223
 
224
  # Don't initialize agents during construction to avoid pickling issues
225
  # Agents will be created lazily when first requested
226
  logger.info("Agent manager initialized (agents will be created lazily)")
227
+
228
+ def __getstate__(self):
229
+ """Custom pickling to handle non-serializable objects."""
230
+ state = self.__dict__.copy()
231
+ # Remove any non-serializable objects if they exist
232
+ return state
233
+
234
+ def __setstate__(self, state):
235
+ """Custom unpickling to restore object state."""
236
+ self.__dict__.update(state)
237
+ # Reinitialize any necessary components
238
 
239
  def _initialize_default_agents(self):
240
  """Initialize default agent instances lazily."""
 
427
 
428
  # Setup logging level
429
  logging.getLogger().setLevel(getattr(logging, self.config.config["log_level"]))
430
+
431
  logger.info("Spaces application initialized successfully")
432
 
433
+ def __getstate__(self):
434
+ """Custom pickling to handle non-serializable objects."""
435
+ state = self.__dict__.copy()
436
+ # Remove any non-serializable objects
437
+ state['_validator'] = None # Will be reinitialized on unpickling
438
+ return state
439
+
440
+ def __setstate__(self, state):
441
+ """Custom unpickling to restore object state."""
442
+ self.__dict__.update(state)
443
+ # Reinitialize validator on demand
444
+ self._validator = None
445
+
446
  @property
447
  def validator(self):
448
  """Lazy initialization of RequestValidator."""
autonomous_engine.py CHANGED
@@ -273,7 +273,23 @@ class CachedReasoningEngine:
273
  self.agent_name = agent_name
274
  self.logger = logging.getLogger(f"{__name__}.{agent_name}")
275
  self.knowledge_base = {}
276
- self.decision_history = deque(maxlen=1000) # Keep last 1000 decisions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
 
278
  @lru_cache(maxsize=1000)
279
  def _analyze_input_hash(self, user_input_hash: str) -> Dict[str, Any]:
@@ -633,6 +649,22 @@ class OptimizedPlanningEngine:
633
  self.logger = logging.getLogger(f"{__name__}.{agent_name}")
634
  self.plans = {}
635
  self.execution_history = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
636
 
637
  def create_plan(self, analysis: Dict[str, Any], user_input: str) -> Plan:
638
  """Create a comprehensive execution plan with validation."""
@@ -828,7 +860,23 @@ class OptimizedExecutionEngine:
828
  self.active_executions = {}
829
  self.execution_metrics = {}
830
  self.max_retries = 3
831
- self.retry_delay = 1.0 # seconds
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
832
 
833
  @asynccontextmanager
834
  async def execution_context(self, plan: Plan):
@@ -1230,8 +1278,24 @@ class RefactoredAutonomousAgent:
1230
  "failed_executions": 0,
1231
  "average_response_time": 0.0
1232
  }
1233
-
1234
  self.logger.info(f"Autonomous agent {agent_name} initialized")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1235
 
1236
  @rate_limit(calls_per_minute=100) # Rate limit: 100 requests per minute
1237
  @validate_input # Validate and sanitize input
 
273
  self.agent_name = agent_name
274
  self.logger = logging.getLogger(f"{__name__}.{agent_name}")
275
  self.knowledge_base = {}
276
+ self.decision_history = deque(maxlen=1000) # Keep last 1000 decisions
277
+
278
+ def __getstate__(self):
279
+ """Custom pickling to handle non-serializable objects."""
280
+ state = self.__dict__.copy()
281
+ # Remove logger as it's not serializable
282
+ state['logger'] = None
283
+ return state
284
+
285
+ def __setstate__(self, state):
286
+ """Custom unpickling to restore object state."""
287
+ self.__dict__.update(state)
288
+ # Restore logger
289
+ if hasattr(self, 'agent_name'):
290
+ self.logger = logging.getLogger(f"{__name__}.{self.agent_name}")
291
+ else:
292
+ self.logger = logging.getLogger(__name__)
293
 
294
  @lru_cache(maxsize=1000)
295
  def _analyze_input_hash(self, user_input_hash: str) -> Dict[str, Any]:
 
649
  self.logger = logging.getLogger(f"{__name__}.{agent_name}")
650
  self.plans = {}
651
  self.execution_history = []
652
+
653
+ def __getstate__(self):
654
+ """Custom pickling to handle non-serializable objects."""
655
+ state = self.__dict__.copy()
656
+ # Remove logger as it's not serializable
657
+ state['logger'] = None
658
+ return state
659
+
660
+ def __setstate__(self, state):
661
+ """Custom unpickling to restore object state."""
662
+ self.__dict__.update(state)
663
+ # Restore logger
664
+ if hasattr(self, 'agent_name'):
665
+ self.logger = logging.getLogger(f"{__name__}.{self.agent_name}")
666
+ else:
667
+ self.logger = logging.getLogger(__name__)
668
 
669
  def create_plan(self, analysis: Dict[str, Any], user_input: str) -> Plan:
670
  """Create a comprehensive execution plan with validation."""
 
860
  self.active_executions = {}
861
  self.execution_metrics = {}
862
  self.max_retries = 3
863
+ self.retry_delay = 1.0 # seconds
864
+
865
+ def __getstate__(self):
866
+ """Custom pickling to handle non-serializable objects."""
867
+ state = self.__dict__.copy()
868
+ # Remove logger as it's not serializable
869
+ state['logger'] = None
870
+ return state
871
+
872
+ def __setstate__(self, state):
873
+ """Custom unpickling to restore object state."""
874
+ self.__dict__.update(state)
875
+ # Restore logger
876
+ if hasattr(self, 'agent_name'):
877
+ self.logger = logging.getLogger(f"{__name__}.{self.agent_name}")
878
+ else:
879
+ self.logger = logging.getLogger(__name__)
880
 
881
  @asynccontextmanager
882
  async def execution_context(self, plan: Plan):
 
1278
  "failed_executions": 0,
1279
  "average_response_time": 0.0
1280
  }
1281
+
1282
  self.logger.info(f"Autonomous agent {agent_name} initialized")
1283
+
1284
+ def __getstate__(self):
1285
+ """Custom pickling to handle non-serializable objects."""
1286
+ state = self.__dict__.copy()
1287
+ # Remove logger as it's not serializable
1288
+ state['logger'] = None
1289
+ return state
1290
+
1291
+ def __setstate__(self, state):
1292
+ """Custom unpickling to restore object state."""
1293
+ self.__dict__.update(state)
1294
+ # Restore logger
1295
+ if hasattr(self, 'agent_name'):
1296
+ self.logger = logging.getLogger(f"{__name__}.{self.agent_name}")
1297
+ else:
1298
+ self.logger = logging.getLogger(__name__)
1299
 
1300
  @rate_limit(calls_per_minute=100) # Rate limit: 100 requests per minute
1301
  @validate_input # Validate and sanitize input