from dotenv import load_dotenv load_dotenv() """ AI Democracy - Multi-Model Consensus System Professional Edition with Clean Architecture """ from dataclasses import dataclass from typing import List, Dict, Any, Optional from datetime import datetime import json import traceback import re import logging from enum import Enum import os import uuid # pred Agent frameworks from agno.agent import Agent from agno.models.openai import OpenAIChat from agno.models.anthropic import Claude from agno.models.mistral import MistralChat from agno.models.sambanova import Sambanova from agno.knowledge.knowledge import Knowledge from agno.vectordb.pgvector import PgVector from agno.knowledge.embedder.huggingface import HuggingfaceCustomEmbedder from supabase import create_client import gradio as gr # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # DATA MODELS class ProblemDomain(Enum): MEDICAL = "medical" LEGAL = "legal" BUSINESS = "business" TECHNICAL = "technical" ETHICAL = "ethical" GENERAL = "general" @dataclass class ModelResponse: model_name: str response: str confidence: float reasoning: str timestamp: datetime tokens_used: int = 0 @dataclass class DebateRound: round_number: int responses: List[ModelResponse] consensus_score: float timestamp: datetime @dataclass class Problem: id: str title: str description: str domain: ProblemDomain context: str user_id: str timestamp: datetime # CONFIGURATION class Config: """Central configuration management""" def __init__(self): self.ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") self.MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") self.SAMBANOVA_API_KEY = os.getenv("SAMBANOVA_API_KEY") self.SUPABASE_URL = os.getenv("SUPABASE_URL") self.SUPABASE_KEY = os.getenv("SUPABASE_KEY") self.SUPABASE_DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD") self._validate() def _validate(self): """Log warnings for missing keys""" keys = { 'ANTHROPIC_API_KEY': self.ANTHROPIC_API_KEY, 'OPENAI_API_KEY': self.OPENAI_API_KEY, 'MISTRAL_API_KEY': self.MISTRAL_API_KEY, 'SAMBANOVA_API_KEY': self.SAMBANOVA_API_KEY, } for name, value in keys.items(): if not value: logger.warning(f"{name} not configured") def has_ai_models(self) -> bool: """Check if at least one AI model is available""" return any([ self.ANTHROPIC_API_KEY, self.OPENAI_API_KEY, self.MISTRAL_API_KEY, self.SAMBANOVA_API_KEY ]) def has_database(self) -> bool: """Check if database is configured""" return all([self.SUPABASE_URL, self.SUPABASE_KEY, self.SUPABASE_DB_PASSWORD]) class TextUtils: """Text formatting and processing utilities""" @staticmethod def clean_markdown(text: str) -> str: """Remove markdown formatting""" text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) text = re.sub(r'\*([^*]+)\*', r'\1', text) text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) text = re.sub(r'```[^`]*```', '[Code Block]', text, flags=re.DOTALL) text = re.sub(r'`([^`]+)`', r'\1', text) text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) text = re.sub(r'^[\*\-\+]\s+', '• ', text, flags=re.MULTILINE) text = re.sub(r'^\d+\.\s+', '• ', text, flags=re.MULTILINE) return re.sub(r'\n\s*\n', '\n\n', text).strip() @staticmethod def wrap_text(text: str, width: int = 80) -> str: """Wrap text to specified width""" words = text.split() lines = [] current_line = [] current_length = 0 for word in words: if current_length + len(word) + 1 <= width: current_line.append(word) current_length += len(word) + 1 else: if current_line: lines.append(" ".join(current_line)) current_line = [word] current_length = len(word) if current_line: lines.append(" ".join(current_line)) return "\n".join(lines) class OutputFormatter: """Professional output formatting""" SEPARATOR = "─" * 80 SECTION_SEPARATOR = "─" * 60 @staticmethod def get_quality_label(score: float) -> str: """Get quality label for consensus score""" if score >= 0.8: return "Excellent Agreement" if score >= 0.7: return "High Agreement" if score >= 0.6: return "Good Agreement" if score >= 0.4: return "Moderate Agreement" if score >= 0.3: return "Low Agreement" return "Divergent Views" @staticmethod def get_confidence_label(confidence: float) -> str: """Get confidence level label""" if confidence >= 0.7: return "High" if confidence >= 0.5: return "Moderate" return "Low" @staticmethod def create_progress_bar(score: float, width: int = 50) -> str: """Create progress bar""" filled = int(score * width) return f"[{' ' * filled}{' ' * (width - filled)}] {score:.1%}" def format_results( self, problem: Problem, debate_round: DebateRound, save_success: bool ) -> tuple[str, str]: """Format complete analysis results""" main = self._format_main(problem, debate_round, save_success) summary = self._format_summary(problem, debate_round) return main, summary def _format_main(self, problem: Problem, debate_round: DebateRound, save_success: bool) -> str: """Format main output""" quality = self.get_quality_label(debate_round.consensus_score) db_status = "Saved successfully" if save_success else "Save failed" header = f"""ANALYSIS RESULTS {self.SEPARATOR} Problem: {problem.title} Domain: {problem.domain.value.title()} Consensus: {debate_round.consensus_score:.2f}/1.00 ({quality}) Completed: {debate_round.timestamp.strftime('%Y-%m-%d %H:%M:%S')} Models: {len(debate_round.responses)} agents Database: {db_status} Description: {TextUtils.wrap_text(problem.description, 75)} """ responses_section = self._format_responses(debate_round.responses) consensus_section = self._format_consensus(debate_round) metrics_section = self._format_metrics(debate_round.responses) return f"{header}\n\n{responses_section}\n\n{consensus_section}\n\n{metrics_section}" def _format_responses(self, responses: List[ModelResponse]) -> str: """Format agent responses""" lines = [f"AGENT RESPONSES\n{self.SEPARATOR}\n"] for i, resp in enumerate(responses, 1): conf_label = self.get_confidence_label(resp.confidence) clean_resp = TextUtils.clean_markdown(resp.response) lines.append(f"{i}. {resp.model_name}") lines.append(f"Confidence: {resp.confidence:.2f} ({conf_label}) | " f"Tokens: ~{int(resp.tokens_used)} | " f"Time: {resp.timestamp.strftime('%H:%M:%S')}") lines.append(self.SECTION_SEPARATOR) lines.append(TextUtils.wrap_text(clean_resp, 75)) lines.append("") return "\n".join(lines) def _format_consensus(self, debate_round: DebateRound) -> str: """Format consensus section""" quality = self.get_quality_label(debate_round.consensus_score) bar = self.create_progress_bar(debate_round.consensus_score) interpretation = self._get_interpretation(debate_round.consensus_score) return f"""CONSENSUS ANALYSIS {self.SEPARATOR} Agreement Level: Score: {debate_round.consensus_score:.2f}/1.00 ({quality}) {bar} Interpretation: {TextUtils.wrap_text(interpretation, 75)}""" def _format_metrics(self, responses: List[ModelResponse]) -> str: """Format quality metrics""" avg_conf = sum(r.confidence for r in responses) / len(responses) total_tokens = sum(r.tokens_used for r in responses) high = sum(1 for r in responses if r.confidence >= 0.7) med = sum(1 for r in responses if 0.5 <= r.confidence < 0.7) low = sum(1 for r in responses if r.confidence < 0.5) return f"""QUALITY METRICS {self.SEPARATOR} Average Confidence: {avg_conf:.2f}/1.00 Total Tokens: ~{int(total_tokens)} Distribution: High: {high}, Medium: {med}, Low: {low}""" def _format_summary(self, problem: Problem, debate_round: DebateRound) -> str: """Format executive summary""" avg_conf = sum(r.confidence for r in debate_round.responses) / len(debate_round.responses) quality = self.get_quality_label(debate_round.consensus_score) return f"""EXECUTIVE SUMMARY {self.SEPARATOR} Problem Domain: {problem.domain.value.title()} Models Consulted: {len(debate_round.responses)} Average Confidence: {avg_conf:.2f}/1.00 Consensus Quality: {quality} Key Insights: • Strategic planning considerations identified • Risk assessment and mitigation strategies discussed • Multiple perspectives analyzed • Evidence-based recommendations provided Reliability: All responses from genuine AI models Quality: {quality} ({debate_round.consensus_score:.2f}/1.00)""" @staticmethod def _get_interpretation(score: float) -> str: """Get consensus interpretation""" if score >= 0.8: return ("Excellent: Strong agreement on key points and approaches. " "High confidence in recommendations with consistent reasoning.") if score >= 0.6: return ("Good: Models generally align with some variation. " "Solid foundation for decision-making.") if score >= 0.4: return ("Moderate: Mixed agreement - different priorities identified. " "Consider multiple approaches.") return ("Divergent: Significant disagreement on complex issue. " "Explore different perspectives carefully.") class DatabaseManager: """Handles all database operations""" def __init__(self, config: Config): self.config = config self.client = None self.connected = False self._init_connection() def _init_connection(self): """Initialize database connection""" if not self.config.has_database(): logger.warning("Database not configured") return try: self.client = create_client( self.config.SUPABASE_URL, self.config.SUPABASE_KEY ) self.client.table('conversations').select('id').limit(1).execute() self.connected = True logger.info("Database connected") except Exception as e: logger.error(f"Database connection failed: {e}") self.connected = False def save_problem(self, problem: Problem) -> bool: """Save problem to database""" if not self.connected: return False try: data = { 'id': problem.id, 'title': problem.title, 'description': problem.description, 'domain': problem.domain.value, 'context': problem.context, 'user_id': problem.user_id, 'timestamp': problem.timestamp.isoformat() } self.client.table('problems').insert(data).execute() return True except Exception as e: logger.error(f"Failed to save problem: {e}") return False def save_responses(self, problem_id: str, responses: List[ModelResponse]) -> bool: """Save agent responses""" if not self.connected: return False try: for resp in responses: data = { 'session_id': problem_id, 'query': f"Analysis by {resp.model_name}", 'response': resp.response, 'context': json.dumps({ 'confidence': resp.confidence, 'tokens': resp.tokens_used }), 'timestamp': resp.timestamp.isoformat() } self.client.table('conversations').insert(data).execute() return True except Exception as e: logger.error(f"Failed to save responses: {e}") return False # CONSENSUS CALCULATion class ConsensusCalculator: """Calculate consensus and quality metrics""" @staticmethod def calculate_response_quality(response: str) -> float: """Calculate quality score for response""" if not response or len(response.strip()) < 10: return 0.1 words = response.split() if not words: return 0.1 # Length score length_score = min(1.0, len(words) / 100) # structure score sentences = response.split('.') structure_score = min(1.0, len(sentences) / 5) if sentences else 0.1 # evidence markers evidence = ['research', 'studies', 'data', 'analysis', 'according'] evidence_score = min(0.3, sum( 0.1 for m in evidence if m in response.lower() )) # reasoning markers reasoning = ['because', 'therefore', 'however', 'furthermore'] reasoning_score = min(0.3, sum( 0.05 for m in reasoning if m in response.lower() )) # uncertainty penalty uncertainty = ['maybe', 'possibly', 'might', 'unclear'] penalty = min(0.2, sum( 0.05 for m in uncertainty if m in response.lower() )) score = ( length_score * 0.25 + structure_score * 0.15 + evidence_score + reasoning_score - penalty ) return max(0.1, min(1.0, score)) @staticmethod def calculate_consensus(responses: List[ModelResponse]) -> float: """Calculate overall consensus score""" if not responses: return 0.0 try: # confidence avg_conf = sum(r.confidence for r in responses) / len(responses) # consistency lengths = [len(r.response.split()) for r in responses] if not lengths: return avg_conf * 0.7 mean_len = sum(lengths) / len(lengths) variance = sum((l - mean_len)**2 for l in lengths) / len(lengths) consistency = max(0, 1 - (variance / 1000)) return min(1.0, max(0.0, avg_conf * 0.7 + consistency * 0.3)) except Exception as e: logger.error(f"Consensus calculation error: {e}") return 0.5 class AgentManager: """Manages AI agent initialization""" INSTRUCTIONS = """You are an expert AI analyst. Provide thorough, evidence-based analysis. Focus on actionable insights, clear reasoning, and professional recommendations. Structure your response clearly without using markdown formatting.""" def __init__(self, config: Config, knowledge_base: Optional[Knowledge]): self.config = config self.knowledge_base = knowledge_base self.agents = [] self._initialize() def _initialize(self): """Initialize all available agents""" if self.config.ANTHROPIC_API_KEY: self._add_claude() if self.config.OPENAI_API_KEY: self._add_openai() if self.config.MISTRAL_API_KEY: self._add_mistral() if self.config.SAMBANOVA_API_KEY: self._add_sambanova() logger.info(f"Initialized {len(self.agents)} agents") def _add_claude(self): """Add Claude agent""" try: agent = Agent( name="Claude Analyst", role="Critical Analysis", model=Claude(id="claude-3-5-sonnet-20240620"), instructions=self.INSTRUCTIONS, knowledge=self.knowledge_base ) self.agents.append(agent) except Exception as e: logger.error(f"Claude agent failed: {e}") def _add_openai(self): """Add OpenAI agent""" try: agent = Agent( name="GPT-4 Strategist", role="Strategic Planning", model=OpenAIChat(id="gpt-4o"), instructions=self.INSTRUCTIONS, knowledge=self.knowledge_base ) self.agents.append(agent) except Exception as e: logger.error(f"OpenAI agent failed: {e}") def _add_mistral(self): """Add Mistral agent""" try: agent = Agent( name="Mistral Evaluator", role="Solution Evaluation", model=MistralChat( id="mistral-large-latest", api_key=self.config.MISTRAL_API_KEY ), instructions=self.INSTRUCTIONS, knowledge=self.knowledge_base ) self.agents.append(agent) except Exception as e: logger.error(f"Mistral agent failed: {e}") def _add_sambanova(self): """Add SambaNova agent""" try: agent = Agent( name="SambaNova Specialist", role="Technical Implementation", model=Sambanova(), instructions=self.INSTRUCTIONS, knowledge=self.knowledge_base ) self.agents.append(agent) except Exception as e: logger.error(f"SambaNova agent failed: {e}") class Agora: """Main AI Democracy system""" def __init__(self): self.config = Config() self.knowledge_base = self._setup_knowledge() self.db = DatabaseManager(self.config) self.agent_manager = AgentManager(self.config, self.knowledge_base) self.calculator = ConsensusCalculator() self.formatter = OutputFormatter() def _setup_knowledge(self) -> Optional[Knowledge]: """Setup knowledge base""" try: embedder = HuggingfaceCustomEmbedder() if not hasattr(embedder, 'embedding_dimension'): embedder.embedding_dimension = 384 if not self.config.has_database(): return None return Knowledge( embedder=embedder, vector_db=PgVector( host=self.config.SUPABASE_URL.replace("https://", "").split(".")[0], port=5432, user="postgres", password=self.config.SUPABASE_DB_PASSWORD, database="postgres", table_name="conversations_w_llm", embedding_dimension=384 ) ) except Exception as e: logger.error(f"Knowledge base setup failed: {e}") return None @property def agents(self) -> List[Agent]: """Get list of agents""" return self.agent_manager.agents def analyze(self, problem: Problem) -> DebateRound: """Run analysis on problem""" if not self.agents: raise Exception("No AI agents available") logger.info(f"Analyzing: {problem.title}") prompt = f"""Problem Analysis Request Title: {problem.title} Description: {problem.description} Domain: {problem.domain.value} Context: {problem.context} Provide expert analysis including: 1. Key considerations and challenges 2. Potential solutions or approaches 3. Risk and benefit assessment 4. Specific recommendations Be thorough and provide actionable insights.""" responses = [] for agent in self.agents: try: result = agent.run(prompt) if result and len(str(result).strip()) > 20: text = str(result).strip() confidence = self.calculator.calculate_response_quality(text) responses.append(ModelResponse( model_name=agent.name, response=text, confidence=confidence, reasoning=f"Analysis by {agent.role}", timestamp=datetime.now(), tokens_used=int(len(text.split()) * 1.3) )) logger.info(f"{agent.name} responded (conf: {confidence:.2f})") except Exception as e: logger.error(f"Error from {agent.name}: {e}") if not responses: raise Exception("No responses received from agents") consensus = self.calculator.calculate_consensus(responses) return DebateRound( round_number=1, responses=responses, consensus_score=consensus, timestamp=datetime.now() ) def save_results(self, problem: Problem, debate: DebateRound) -> bool: """Save results to database""" try: prob_saved = self.db.save_problem(problem) resp_saved = self.db.save_responses(problem.id, debate.responses) return prob_saved and resp_saved except Exception as e: logger.error(f"Save failed: {e}") return False def create_interface(): """Create professional Gradio interface""" agora = Agora() def analyze_problem(title, description, domain, user_id, context): """Analyze problem and return formatted results""" try: if not title or not description: return "Error: Title and description required", "" if not agora.agents: return "Error: No AI agents available", "" problem = Problem( id=str(uuid.uuid4()), title=title.strip(), description=description.strip(), domain=ProblemDomain(domain.lower()), context=context.strip() or "None provided", user_id=user_id.strip() or "anonymous", timestamp=datetime.now() ) debate = agora.analyze(problem) saved = agora.save_results(problem, debate) main, summary = agora.formatter.format_results(problem, debate, saved) return main, summary except Exception as e: logger.error(f"Analysis failed: {e}\n{traceback.format_exc()}") return f"Error: {str(e)}", "Analysis failed" with gr.Blocks() as demo: title="AI Democracy System", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="slate", neutral_hue="slate" ), gr.Markdown(""" # AI Democracy - Multi-Model Consensus System Professional platform for AI model deliberation and consensus building. """) status_md = f""" ### System Status **Active Agents:** {len(agora.agents)} | **Database:** {'Connected' if agora.db.connected else 'Offline'} """ gr.Markdown(status_md) with gr.Row(): with gr.Column(scale=2): title = gr.Textbox( label="Problem Title", placeholder="Enter problem title", lines=1 ) description = gr.Textbox( label="Description", placeholder="Detailed problem description", lines=5 ) domain = gr.Dropdown( label="Domain", choices=[d.value.title() for d in ProblemDomain], value="General" ) context = gr.Textbox( label="Context (Optional)", placeholder="Additional context", lines=2 ) user_id = gr.Textbox( label="User ID (Optional)", placeholder="Your identifier", lines=1 ) analyze_btn = gr.Button("Analyze", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown(""" ### Analysis Process 1. Submit problem details 2. AI models analyze independently 3. Consensus calculated 4. Results formatted 5. Saved to database ### Features - Real AI responses - Quality metrics - Consensus scoring - Professional formatting """) gr.Markdown("## Results") with gr.Row(): results = gr.Markdown(label="Analysis Results") with gr.Row(): summary = gr.Markdown(label="Executive Summary") analyze_btn.click( fn=analyze_problem, inputs=[title, description, domain, user_id, context], outputs=[results, summary] ) gr.Markdown(""" --- **System Information** - Framework: Agno AI Agent Framework - Models: Claude 3.5, GPT-4, Mistral, SambaNova - Database: Supabase PostgreSQL - Version: 2.0 Professional Edition """) return demo def main(): """Run the application""" try: logger.info("Starting AI Democracy System") demo = create_interface() demo.launch( server_name="127.0.0.1", server_port=7860, share=False ) except Exception as e: logger.error(f"Failed to start: {e}\n{traceback.format_exc()}") if __name__ == "__main__": main()