WhoDat55 commited on
Commit
da950ab
·
verified ·
1 Parent(s): 1aa5492

Update core_mcp.py

Browse files
Files changed (1) hide show
  1. core_mcp.py +107 -0
core_mcp.py CHANGED
@@ -5,10 +5,12 @@ import time
5
  import uuid
6
  import requests
7
  import uvicorn
 
8
  from datetime import datetime
9
  from configparser import ConfigParser
10
  from typing import List, Dict, Tuple, Optional, Any
11
  from fastapi import FastAPI, HTTPException, BackgroundTasks
 
12
  from pydantic import BaseModel
13
 
14
  # --- CONFIGURATION & REGISTRY ---
@@ -471,7 +473,18 @@ class ConductorEngine:
471
  gemini_role = 'model' if role == 'assistant' else 'user'
472
  self.strategist_history.append({"role": gemini_role, "parts": [{"text": content}]})
473
 
 
 
 
 
 
 
 
 
 
 
474
  def execute_workflow(self, blueprint: str, intent: str) -> Dict[str, Any]:
 
475
  self.coder_history = []
476
  self.strategist_history = []
477
  self.system_log = []
@@ -531,6 +544,84 @@ class ConductorEngine:
531
  self.log_system("FATAL: Loop limit reached.")
532
  return self._compile_results(coder_response)
533
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
534
  def _compile_results(self, final_output: str) -> Dict[str, Any]:
535
  return {
536
  "system_log": self.system_log,
@@ -559,9 +650,25 @@ class ExecutionResponse(BaseModel):
559
 
560
  @app.post("/execute", response_model=ExecutionResponse)
561
  async def execute_conductor(req: ExecutionRequest):
 
562
  engine = ConductorEngine(req.coder_model_label, req.strategist_model_label)
563
  result = engine.execute_workflow(req.blueprint, req.intent)
564
  return result
565
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
566
  if __name__ == "__main__":
567
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
5
  import uuid
6
  import requests
7
  import uvicorn
8
+ import asyncio
9
  from datetime import datetime
10
  from configparser import ConfigParser
11
  from typing import List, Dict, Tuple, Optional, Any
12
  from fastapi import FastAPI, HTTPException, BackgroundTasks
13
+ from fastapi.responses import StreamingResponse
14
  from pydantic import BaseModel
15
 
16
  # --- CONFIGURATION & REGISTRY ---
 
473
  gemini_role = 'model' if role == 'assistant' else 'user'
474
  self.strategist_history.append({"role": gemini_role, "parts": [{"text": content}]})
475
 
476
+ def _current_state(self) -> Dict[str, Any]:
477
+ """Package current state for transmission"""
478
+ return {
479
+ "system_log": self.system_log,
480
+ "coder_log": self.coder_history,
481
+ "strategist_log": self.strategist_history,
482
+ "audit_log": self.audit_log,
483
+ "final_output": ""
484
+ }
485
+
486
  def execute_workflow(self, blueprint: str, intent: str) -> Dict[str, Any]:
487
+ """ORIGINAL: Non-streaming version (kept for backwards compatibility)"""
488
  self.coder_history = []
489
  self.strategist_history = []
490
  self.system_log = []
 
544
  self.log_system("FATAL: Loop limit reached.")
545
  return self._compile_results(coder_response)
546
 
547
+ def execute_workflow_streaming(self, blueprint: str, intent: str):
548
+ """NEW: Streaming version that yields updates as they happen"""
549
+ self.coder_history = []
550
+ self.strategist_history = []
551
+ self.system_log = []
552
+ self.audit_log = []
553
+ self.loop_count = 0
554
+
555
+ self.log_system("INITIATING CONDUCTOR ENGINE...")
556
+ yield self._current_state()
557
+
558
+ for log_msg in self.model_resolution_logs:
559
+ self.log_system(log_msg)
560
+ yield self._current_state()
561
+
562
+ if self.fatal_init_error:
563
+ self.log_system("FATAL: Model configuration invalid.")
564
+ yield self._current_state()
565
+ return
566
+
567
+ self.log_system(f"Engaged: Coder ({self.coder_model_id}), Strategist ({self.strategist_model_id})")
568
+ yield self._current_state()
569
+
570
+ initial_prompt = format_initial_prompt(blueprint)
571
+ self.log_coder("user", initial_prompt)
572
+ yield self._current_state()
573
+
574
+ while self.loop_count < self.max_loops:
575
+ self.loop_count += 1
576
+ self.log_system(f"Executing Cycle {self.loop_count}/{self.max_loops}")
577
+ yield self._current_state()
578
+
579
+ coder_response = call_ai_api(self.coder_url, CONFIG['CODER_API_KEY'], self.coder_history)
580
+ self.log_coder("assistant", coder_response)
581
+ yield self._current_state()
582
+
583
+ if "DETECTED" not in coder_response:
584
+ self.log_system("MISSION COMPLETE: Ambiguity resolved or Code generated.")
585
+ yield self._current_state()
586
+ return
587
+
588
+ self.log_system("Ambiguity DETECTED. Escalating to Strategist.")
589
+ yield self._current_state()
590
+
591
+ strategist_prompt = format_strategist_prompt(blueprint, intent, coder_response)
592
+ self.log_strategist("user", strategist_prompt)
593
+ yield self._current_state()
594
+
595
+ strategist_response = call_ai_api(self.strategist_url, CONFIG['STRATEGIST_API_KEY'], self.strategist_history)
596
+ self.log_strategist("assistant", strategist_response)
597
+ yield self._current_state()
598
+
599
+ tool_request = detect_tool_usage(strategist_response)
600
+ if tool_request:
601
+ self.log_system(f"TOOL REQUEST: {tool_request['tool_name']}")
602
+ yield self._current_state()
603
+
604
+ diagnostic = execute_tool(tool_request['tool_name'], tool_request['params'])
605
+ self.audit_log.append(diagnostic)
606
+ self.log_system(f"Tool Result: {diagnostic['status']}")
607
+ yield self._current_state()
608
+
609
+ tool_result_text = f"TOOL: {diagnostic['tool_name']}\nSTATUS: {diagnostic['status']}\nOUTPUT: {json.dumps(diagnostic['response_payload'])}"
610
+ synthesis_prompt = format_strategist_tool_synthesis_prompt(blueprint, intent, coder_response, tool_result_text)
611
+ self.log_strategist("user", synthesis_prompt)
612
+ yield self._current_state()
613
+
614
+ strategist_response = call_ai_api(self.strategist_url, CONFIG['STRATEGIST_API_KEY'], self.strategist_history)
615
+ self.log_strategist("assistant", strategist_response)
616
+ yield self._current_state()
617
+
618
+ resolution_prompt = format_resolution_prompt(blueprint, strategist_response)
619
+ self.log_coder("user", resolution_prompt)
620
+ yield self._current_state()
621
+
622
+ self.log_system("FATAL: Loop limit reached.")
623
+ yield self._current_state()
624
+
625
  def _compile_results(self, final_output: str) -> Dict[str, Any]:
626
  return {
627
  "system_log": self.system_log,
 
650
 
651
  @app.post("/execute", response_model=ExecutionResponse)
652
  async def execute_conductor(req: ExecutionRequest):
653
+ """ORIGINAL: Non-streaming endpoint (kept for backwards compatibility)"""
654
  engine = ConductorEngine(req.coder_model_label, req.strategist_model_label)
655
  result = engine.execute_workflow(req.blueprint, req.intent)
656
  return result
657
 
658
+ @app.post("/execute_stream")
659
+ async def execute_conductor_stream(req: ExecutionRequest):
660
+ """NEW: Streaming endpoint that yields real-time updates"""
661
+
662
+ async def generate_updates():
663
+ engine = ConductorEngine(req.coder_model_label, req.strategist_model_label)
664
+
665
+ for update in engine.execute_workflow_streaming(req.blueprint, req.intent):
666
+ yield f"data: {json.dumps(update)}\n\n"
667
+ await asyncio.sleep(0)
668
+
669
+ yield "data: {\"status\": \"complete\"}\n\n"
670
+
671
+ return StreamingResponse(generate_updates(), media_type="text/event-stream")
672
+
673
  if __name__ == "__main__":
674
  uvicorn.run(app, host="0.0.0.0", port=7860)