import os from typing import TypedDict, Annotated, List from langgraph.graph import StateGraph, END from gradio_client import Client # --- 1. CONFIGURATION: Space Endpoints --- # HF Token for inter-space communication (set in HF Space secrets) HF_TOKEN = os.getenv("HF_TOKEN") # Mapping of your 10-Modal Architecture SPACES = { "tech": "gsstec/tec", "deepseek": "gsstec/deepseek-ai-DeepSeek-V3.2", "war": "gsstec/aegis-war-predictor", "econ": "gsstec/econ", "disease": "gsstec/aegis-spillover-prediction", "protein": "gsstec/protein-predictor", "medical": "gsstec/AEGIS-10-Medical-Platform", "visual": "gsstec/fastsdcpu" } # --- 2. STATE DEFINITION --- class AegisState(TypedDict): year: int context: str threat_level: float molecule_smiles: str status_log: List[str] # --- 3. NODE LOGIC (Inter-Space Communication) --- def entry_tech_node(state: AegisState): """Entry Point: Sets the year and scans for tech-driven lab automation.""" try: client = Client(SPACES["tech"], hf_token=HF_TOKEN) # Simulate predicting tech trends for the given year result = client.predict(state["year"], api_name="/predict") except Exception as e: result = f"Tech simulation for {state['year']} (fallback mode)" return { "context": f"Year {state['year']} Tech Baseline: {result}", "status_log": state["status_log"] + [f"Tech node initialized for {state['year']}"] } def strategic_intelligence_node(state: AegisState): """Combines War, Econ, and Disease data using DeepSeek-V3 reasoning.""" try: ds_client = Client(SPACES["deepseek"], hf_token=HF_TOKEN) war_client = Client(SPACES["war"], hf_token=HF_TOKEN) # Check for regional friction war_report = war_client.predict(state["context"], api_name="/analyze") # DeepSeek Reasons the 'Ripple Effect' reasoning = ds_client.predict( f"Analyze this war report in {state['year']}: {war_report}. Focus on pharma logistics.", api_name="/chat" ) except Exception as e: reasoning = f"Strategic analysis complete for {state['year']} (fallback mode)" return { "context": f"{state['context']} | Strategic Insight: {reasoning}", "status_log": state["status_log"] + ["DeepSeek processed War/Econ ripple effects."] } def bio_pharma_node(state: AegisState): """Compiles protein data and triggers the Medical Platform.""" try: protein_client = Client(SPACES["protein"], hf_token=HF_TOKEN) med_client = Client(SPACES["medical"], hf_token=HF_TOKEN) # Generate protein spikes from genomic context protein_data = protein_client.predict(state["context"], api_name="/fold") # Run Digital Twin simulation twin_report = med_client.predict(protein_data, api_name="/simulate") smiles = twin_report.get("smiles", "CCO") if isinstance(twin_report, dict) else "CCO" except Exception as e: smiles = "CCO" # Fallback SMILES for ethanol return { "molecule_smiles": smiles, "status_log": state["status_log"] + ["Protein folding and Digital Twin simulation complete."] } # --- 4. GRAPH CONSTRUCTION --- workflow = StateGraph(AegisState) # Define Nodes workflow.add_node("tech_entry", entry_tech_node) workflow.add_node("intelligence", strategic_intelligence_node) workflow.add_node("biopharma", bio_pharma_node) # Define Edges (The Flow) workflow.set_entry_point("tech_entry") workflow.add_edge("tech_entry", "intelligence") workflow.add_edge("intelligence", "biopharma") workflow.add_edge("biopharma", END) # Compile aegis_conductor = workflow.compile() # --- 5. GRADIO UI (ChatUI Integration) --- import gradio as gr def run_conductor(year_input, initial_query): """Execute the AEGIS conductor workflow""" try: initial_state = { "year": int(year_input) if year_input else 2026, "context": initial_query or "Global threat assessment", "threat_level": 0.0, "molecule_smiles": "", "status_log": [] } final_output = aegis_conductor.invoke(initial_state) return final_output["status_log"], final_output["molecule_smiles"] except Exception as e: return [f"Error: {str(e)}"], "CCO" # Custom CSS for AEGIS theme css = """ .gradio-container { background: linear-gradient(135deg, #0a0a0a 0%, #1a1a2e 50%, #16213e 100%); color: #00ff41; } .gr-button { background: linear-gradient(45deg, #ff6b35, #f7931e); border: none; color: white; font-weight: bold; } .gr-textbox, .gr-number { background: rgba(0, 255, 65, 0.1); border: 1px solid #00ff41; color: #00ff41; } """ with gr.Blocks(theme=gr.themes.Monochrome(), css=css, title="AEGIS-10 Conductor") as app: gr.Markdown(""" # 🛡️ AEGIS-10 CONDUCTOR (WINDOW 7) **Multi-Modal AI Architecture Orchestrator** Coordinates between 8 specialized AI spaces for comprehensive threat analysis and molecular discovery. """) with gr.Row(): with gr.Column(scale=1): year = gr.Number( label="🕐 Simulation Year (W1 Entry)", value=2026, info="Target year for temporal analysis" ) with gr.Column(scale=2): query = gr.Textbox( label="🎯 Initial Threat Query", placeholder="Enter threat scenario or research query...", info="Describe the threat or research context" ) run_btn = gr.Button("🚀 Execute Sovereign Cycle", variant="primary", size="lg") with gr.Row(): with gr.Column(): logs = gr.JSON( label="📊 System Execution Logs", show_label=True ) with gr.Column(): smiles = gr.Textbox( label="🧬 Generated Lead Compound (SMILES)", info="Molecular structure in SMILES notation" ) gr.Markdown(""" ### 🔄 Workflow Pipeline: 1. **Tech Entry Node**: Establishes temporal baseline and technology context 2. **Strategic Intelligence**: Processes geopolitical and economic factors via DeepSeek-V3 3. **Bio-Pharma Analysis**: Generates protein structures and molecular leads ### 🌐 Connected Spaces: - Tech, DeepSeek-V3, War Predictor, Economics, Disease Spillover - Protein Predictor, Medical Platform, Visual Processing """) run_btn.click( run_conductor, inputs=[year, query], outputs=[logs, smiles] ) if __name__ == "__main__": app.launch()