TheAgora / app.py
Addyk24's picture
Refactor: Redefined entire Gradio interface and updated core logic
8cf2f0f
from dotenv import load_dotenv
load_dotenv()
"""
AI Democracy - Multi-Model Consensus System
Professional Edition with Clean Architecture
"""
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime
import json
import traceback
import re
import logging
from enum import Enum
import os
import uuid
# pred Agent frameworks
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.models.anthropic import Claude
from agno.models.mistral import MistralChat
from agno.models.sambanova import Sambanova
from agno.knowledge.knowledge import Knowledge
from agno.vectordb.pgvector import PgVector
from agno.knowledge.embedder.huggingface import HuggingfaceCustomEmbedder
from supabase import create_client
import gradio as gr
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# DATA MODELS
class ProblemDomain(Enum):
MEDICAL = "medical"
LEGAL = "legal"
BUSINESS = "business"
TECHNICAL = "technical"
ETHICAL = "ethical"
GENERAL = "general"
@dataclass
class ModelResponse:
model_name: str
response: str
confidence: float
reasoning: str
timestamp: datetime
tokens_used: int = 0
@dataclass
class DebateRound:
round_number: int
responses: List[ModelResponse]
consensus_score: float
timestamp: datetime
@dataclass
class Problem:
id: str
title: str
description: str
domain: ProblemDomain
context: str
user_id: str
timestamp: datetime
# CONFIGURATION
class Config:
"""Central configuration management"""
def __init__(self):
self.ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
self.MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")
self.SAMBANOVA_API_KEY = os.getenv("SAMBANOVA_API_KEY")
self.SUPABASE_URL = os.getenv("SUPABASE_URL")
self.SUPABASE_KEY = os.getenv("SUPABASE_KEY")
self.SUPABASE_DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD")
self._validate()
def _validate(self):
"""Log warnings for missing keys"""
keys = {
'ANTHROPIC_API_KEY': self.ANTHROPIC_API_KEY,
'OPENAI_API_KEY': self.OPENAI_API_KEY,
'MISTRAL_API_KEY': self.MISTRAL_API_KEY,
'SAMBANOVA_API_KEY': self.SAMBANOVA_API_KEY,
}
for name, value in keys.items():
if not value:
logger.warning(f"{name} not configured")
def has_ai_models(self) -> bool:
"""Check if at least one AI model is available"""
return any([
self.ANTHROPIC_API_KEY,
self.OPENAI_API_KEY,
self.MISTRAL_API_KEY,
self.SAMBANOVA_API_KEY
])
def has_database(self) -> bool:
"""Check if database is configured"""
return all([self.SUPABASE_URL, self.SUPABASE_KEY, self.SUPABASE_DB_PASSWORD])
class TextUtils:
"""Text formatting and processing utilities"""
@staticmethod
def clean_markdown(text: str) -> str:
"""Remove markdown formatting"""
text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
text = re.sub(r'\*([^*]+)\*', r'\1', text)
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'```[^`]*```', '[Code Block]', text, flags=re.DOTALL)
text = re.sub(r'`([^`]+)`', r'\1', text)
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
text = re.sub(r'^[\*\-\+]\s+', '• ', text, flags=re.MULTILINE)
text = re.sub(r'^\d+\.\s+', '• ', text, flags=re.MULTILINE)
return re.sub(r'\n\s*\n', '\n\n', text).strip()
@staticmethod
def wrap_text(text: str, width: int = 80) -> str:
"""Wrap text to specified width"""
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
if current_length + len(word) + 1 <= width:
current_line.append(word)
current_length += len(word) + 1
else:
if current_line:
lines.append(" ".join(current_line))
current_line = [word]
current_length = len(word)
if current_line:
lines.append(" ".join(current_line))
return "\n".join(lines)
class OutputFormatter:
"""Professional output formatting"""
SEPARATOR = "─" * 80
SECTION_SEPARATOR = "─" * 60
@staticmethod
def get_quality_label(score: float) -> str:
"""Get quality label for consensus score"""
if score >= 0.8: return "Excellent Agreement"
if score >= 0.7: return "High Agreement"
if score >= 0.6: return "Good Agreement"
if score >= 0.4: return "Moderate Agreement"
if score >= 0.3: return "Low Agreement"
return "Divergent Views"
@staticmethod
def get_confidence_label(confidence: float) -> str:
"""Get confidence level label"""
if confidence >= 0.7: return "High"
if confidence >= 0.5: return "Moderate"
return "Low"
@staticmethod
def create_progress_bar(score: float, width: int = 50) -> str:
"""Create progress bar"""
filled = int(score * width)
return f"[{' ' * filled}{' ' * (width - filled)}] {score:.1%}"
def format_results(
self,
problem: Problem,
debate_round: DebateRound,
save_success: bool
) -> tuple[str, str]:
"""Format complete analysis results"""
main = self._format_main(problem, debate_round, save_success)
summary = self._format_summary(problem, debate_round)
return main, summary
def _format_main(self, problem: Problem, debate_round: DebateRound, save_success: bool) -> str:
"""Format main output"""
quality = self.get_quality_label(debate_round.consensus_score)
db_status = "Saved successfully" if save_success else "Save failed"
header = f"""ANALYSIS RESULTS
{self.SEPARATOR}
Problem: {problem.title}
Domain: {problem.domain.value.title()}
Consensus: {debate_round.consensus_score:.2f}/1.00 ({quality})
Completed: {debate_round.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
Models: {len(debate_round.responses)} agents
Database: {db_status}
Description:
{TextUtils.wrap_text(problem.description, 75)}
"""
responses_section = self._format_responses(debate_round.responses)
consensus_section = self._format_consensus(debate_round)
metrics_section = self._format_metrics(debate_round.responses)
return f"{header}\n\n{responses_section}\n\n{consensus_section}\n\n{metrics_section}"
def _format_responses(self, responses: List[ModelResponse]) -> str:
"""Format agent responses"""
lines = [f"AGENT RESPONSES\n{self.SEPARATOR}\n"]
for i, resp in enumerate(responses, 1):
conf_label = self.get_confidence_label(resp.confidence)
clean_resp = TextUtils.clean_markdown(resp.response)
lines.append(f"{i}. {resp.model_name}")
lines.append(f"Confidence: {resp.confidence:.2f} ({conf_label}) | "
f"Tokens: ~{int(resp.tokens_used)} | "
f"Time: {resp.timestamp.strftime('%H:%M:%S')}")
lines.append(self.SECTION_SEPARATOR)
lines.append(TextUtils.wrap_text(clean_resp, 75))
lines.append("")
return "\n".join(lines)
def _format_consensus(self, debate_round: DebateRound) -> str:
"""Format consensus section"""
quality = self.get_quality_label(debate_round.consensus_score)
bar = self.create_progress_bar(debate_round.consensus_score)
interpretation = self._get_interpretation(debate_round.consensus_score)
return f"""CONSENSUS ANALYSIS
{self.SEPARATOR}
Agreement Level:
Score: {debate_round.consensus_score:.2f}/1.00 ({quality})
{bar}
Interpretation:
{TextUtils.wrap_text(interpretation, 75)}"""
def _format_metrics(self, responses: List[ModelResponse]) -> str:
"""Format quality metrics"""
avg_conf = sum(r.confidence for r in responses) / len(responses)
total_tokens = sum(r.tokens_used for r in responses)
high = sum(1 for r in responses if r.confidence >= 0.7)
med = sum(1 for r in responses if 0.5 <= r.confidence < 0.7)
low = sum(1 for r in responses if r.confidence < 0.5)
return f"""QUALITY METRICS
{self.SEPARATOR}
Average Confidence: {avg_conf:.2f}/1.00
Total Tokens: ~{int(total_tokens)}
Distribution: High: {high}, Medium: {med}, Low: {low}"""
def _format_summary(self, problem: Problem, debate_round: DebateRound) -> str:
"""Format executive summary"""
avg_conf = sum(r.confidence for r in debate_round.responses) / len(debate_round.responses)
quality = self.get_quality_label(debate_round.consensus_score)
return f"""EXECUTIVE SUMMARY
{self.SEPARATOR}
Problem Domain: {problem.domain.value.title()}
Models Consulted: {len(debate_round.responses)}
Average Confidence: {avg_conf:.2f}/1.00
Consensus Quality: {quality}
Key Insights:
• Strategic planning considerations identified
• Risk assessment and mitigation strategies discussed
• Multiple perspectives analyzed
• Evidence-based recommendations provided
Reliability:
All responses from genuine AI models
Quality: {quality} ({debate_round.consensus_score:.2f}/1.00)"""
@staticmethod
def _get_interpretation(score: float) -> str:
"""Get consensus interpretation"""
if score >= 0.8:
return ("Excellent: Strong agreement on key points and approaches. "
"High confidence in recommendations with consistent reasoning.")
if score >= 0.6:
return ("Good: Models generally align with some variation. "
"Solid foundation for decision-making.")
if score >= 0.4:
return ("Moderate: Mixed agreement - different priorities identified. "
"Consider multiple approaches.")
return ("Divergent: Significant disagreement on complex issue. "
"Explore different perspectives carefully.")
class DatabaseManager:
"""Handles all database operations"""
def __init__(self, config: Config):
self.config = config
self.client = None
self.connected = False
self._init_connection()
def _init_connection(self):
"""Initialize database connection"""
if not self.config.has_database():
logger.warning("Database not configured")
return
try:
self.client = create_client(
self.config.SUPABASE_URL,
self.config.SUPABASE_KEY
)
self.client.table('conversations').select('id').limit(1).execute()
self.connected = True
logger.info("Database connected")
except Exception as e:
logger.error(f"Database connection failed: {e}")
self.connected = False
def save_problem(self, problem: Problem) -> bool:
"""Save problem to database"""
if not self.connected:
return False
try:
data = {
'id': problem.id,
'title': problem.title,
'description': problem.description,
'domain': problem.domain.value,
'context': problem.context,
'user_id': problem.user_id,
'timestamp': problem.timestamp.isoformat()
}
self.client.table('problems').insert(data).execute()
return True
except Exception as e:
logger.error(f"Failed to save problem: {e}")
return False
def save_responses(self, problem_id: str, responses: List[ModelResponse]) -> bool:
"""Save agent responses"""
if not self.connected:
return False
try:
for resp in responses:
data = {
'session_id': problem_id,
'query': f"Analysis by {resp.model_name}",
'response': resp.response,
'context': json.dumps({
'confidence': resp.confidence,
'tokens': resp.tokens_used
}),
'timestamp': resp.timestamp.isoformat()
}
self.client.table('conversations').insert(data).execute()
return True
except Exception as e:
logger.error(f"Failed to save responses: {e}")
return False
# CONSENSUS CALCULATion
class ConsensusCalculator:
"""Calculate consensus and quality metrics"""
@staticmethod
def calculate_response_quality(response: str) -> float:
"""Calculate quality score for response"""
if not response or len(response.strip()) < 10:
return 0.1
words = response.split()
if not words:
return 0.1
# Length score
length_score = min(1.0, len(words) / 100)
# structure score
sentences = response.split('.')
structure_score = min(1.0, len(sentences) / 5) if sentences else 0.1
# evidence markers
evidence = ['research', 'studies', 'data', 'analysis', 'according']
evidence_score = min(0.3, sum(
0.1 for m in evidence if m in response.lower()
))
# reasoning markers
reasoning = ['because', 'therefore', 'however', 'furthermore']
reasoning_score = min(0.3, sum(
0.05 for m in reasoning if m in response.lower()
))
# uncertainty penalty
uncertainty = ['maybe', 'possibly', 'might', 'unclear']
penalty = min(0.2, sum(
0.05 for m in uncertainty if m in response.lower()
))
score = (
length_score * 0.25 +
structure_score * 0.15 +
evidence_score +
reasoning_score -
penalty
)
return max(0.1, min(1.0, score))
@staticmethod
def calculate_consensus(responses: List[ModelResponse]) -> float:
"""Calculate overall consensus score"""
if not responses:
return 0.0
try:
# confidence
avg_conf = sum(r.confidence for r in responses) / len(responses)
# consistency
lengths = [len(r.response.split()) for r in responses]
if not lengths:
return avg_conf * 0.7
mean_len = sum(lengths) / len(lengths)
variance = sum((l - mean_len)**2 for l in lengths) / len(lengths)
consistency = max(0, 1 - (variance / 1000))
return min(1.0, max(0.0, avg_conf * 0.7 + consistency * 0.3))
except Exception as e:
logger.error(f"Consensus calculation error: {e}")
return 0.5
class AgentManager:
"""Manages AI agent initialization"""
INSTRUCTIONS = """You are an expert AI analyst. Provide thorough, evidence-based analysis.
Focus on actionable insights, clear reasoning, and professional recommendations.
Structure your response clearly without using markdown formatting."""
def __init__(self, config: Config, knowledge_base: Optional[Knowledge]):
self.config = config
self.knowledge_base = knowledge_base
self.agents = []
self._initialize()
def _initialize(self):
"""Initialize all available agents"""
if self.config.ANTHROPIC_API_KEY:
self._add_claude()
if self.config.OPENAI_API_KEY:
self._add_openai()
if self.config.MISTRAL_API_KEY:
self._add_mistral()
if self.config.SAMBANOVA_API_KEY:
self._add_sambanova()
logger.info(f"Initialized {len(self.agents)} agents")
def _add_claude(self):
"""Add Claude agent"""
try:
agent = Agent(
name="Claude Analyst",
role="Critical Analysis",
model=Claude(id="claude-3-5-sonnet-20240620"),
instructions=self.INSTRUCTIONS,
knowledge=self.knowledge_base
)
self.agents.append(agent)
except Exception as e:
logger.error(f"Claude agent failed: {e}")
def _add_openai(self):
"""Add OpenAI agent"""
try:
agent = Agent(
name="GPT-4 Strategist",
role="Strategic Planning",
model=OpenAIChat(id="gpt-4o"),
instructions=self.INSTRUCTIONS,
knowledge=self.knowledge_base
)
self.agents.append(agent)
except Exception as e:
logger.error(f"OpenAI agent failed: {e}")
def _add_mistral(self):
"""Add Mistral agent"""
try:
agent = Agent(
name="Mistral Evaluator",
role="Solution Evaluation",
model=MistralChat(
id="mistral-large-latest",
api_key=self.config.MISTRAL_API_KEY
),
instructions=self.INSTRUCTIONS,
knowledge=self.knowledge_base
)
self.agents.append(agent)
except Exception as e:
logger.error(f"Mistral agent failed: {e}")
def _add_sambanova(self):
"""Add SambaNova agent"""
try:
agent = Agent(
name="SambaNova Specialist",
role="Technical Implementation",
model=Sambanova(),
instructions=self.INSTRUCTIONS,
knowledge=self.knowledge_base
)
self.agents.append(agent)
except Exception as e:
logger.error(f"SambaNova agent failed: {e}")
class Agora:
"""Main AI Democracy system"""
def __init__(self):
self.config = Config()
self.knowledge_base = self._setup_knowledge()
self.db = DatabaseManager(self.config)
self.agent_manager = AgentManager(self.config, self.knowledge_base)
self.calculator = ConsensusCalculator()
self.formatter = OutputFormatter()
def _setup_knowledge(self) -> Optional[Knowledge]:
"""Setup knowledge base"""
try:
embedder = HuggingfaceCustomEmbedder()
if not hasattr(embedder, 'embedding_dimension'):
embedder.embedding_dimension = 384
if not self.config.has_database():
return None
return Knowledge(
embedder=embedder,
vector_db=PgVector(
host=self.config.SUPABASE_URL.replace("https://", "").split(".")[0],
port=5432,
user="postgres",
password=self.config.SUPABASE_DB_PASSWORD,
database="postgres",
table_name="conversations_w_llm",
embedding_dimension=384
)
)
except Exception as e:
logger.error(f"Knowledge base setup failed: {e}")
return None
@property
def agents(self) -> List[Agent]:
"""Get list of agents"""
return self.agent_manager.agents
def analyze(self, problem: Problem) -> DebateRound:
"""Run analysis on problem"""
if not self.agents:
raise Exception("No AI agents available")
logger.info(f"Analyzing: {problem.title}")
prompt = f"""Problem Analysis Request
Title: {problem.title}
Description: {problem.description}
Domain: {problem.domain.value}
Context: {problem.context}
Provide expert analysis including:
1. Key considerations and challenges
2. Potential solutions or approaches
3. Risk and benefit assessment
4. Specific recommendations
Be thorough and provide actionable insights."""
responses = []
for agent in self.agents:
try:
result = agent.run(prompt)
if result and len(str(result).strip()) > 20:
text = str(result).strip()
confidence = self.calculator.calculate_response_quality(text)
responses.append(ModelResponse(
model_name=agent.name,
response=text,
confidence=confidence,
reasoning=f"Analysis by {agent.role}",
timestamp=datetime.now(),
tokens_used=int(len(text.split()) * 1.3)
))
logger.info(f"{agent.name} responded (conf: {confidence:.2f})")
except Exception as e:
logger.error(f"Error from {agent.name}: {e}")
if not responses:
raise Exception("No responses received from agents")
consensus = self.calculator.calculate_consensus(responses)
return DebateRound(
round_number=1,
responses=responses,
consensus_score=consensus,
timestamp=datetime.now()
)
def save_results(self, problem: Problem, debate: DebateRound) -> bool:
"""Save results to database"""
try:
prob_saved = self.db.save_problem(problem)
resp_saved = self.db.save_responses(problem.id, debate.responses)
return prob_saved and resp_saved
except Exception as e:
logger.error(f"Save failed: {e}")
return False
def create_interface():
"""Create professional Gradio interface"""
agora = Agora()
def analyze_problem(title, description, domain, user_id, context):
"""Analyze problem and return formatted results"""
try:
if not title or not description:
return "Error: Title and description required", ""
if not agora.agents:
return "Error: No AI agents available", ""
problem = Problem(
id=str(uuid.uuid4()),
title=title.strip(),
description=description.strip(),
domain=ProblemDomain(domain.lower()),
context=context.strip() or "None provided",
user_id=user_id.strip() or "anonymous",
timestamp=datetime.now()
)
debate = agora.analyze(problem)
saved = agora.save_results(problem, debate)
main, summary = agora.formatter.format_results(problem, debate, saved)
return main, summary
except Exception as e:
logger.error(f"Analysis failed: {e}\n{traceback.format_exc()}")
return f"Error: {str(e)}", "Analysis failed"
with gr.Blocks() as demo:
title="AI Democracy System",
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
neutral_hue="slate"
),
gr.Markdown("""
# AI Democracy - Multi-Model Consensus System
Professional platform for AI model deliberation and consensus building.
""")
status_md = f"""
### System Status
**Active Agents:** {len(agora.agents)} | **Database:** {'Connected' if agora.db.connected else 'Offline'}
"""
gr.Markdown(status_md)
with gr.Row():
with gr.Column(scale=2):
title = gr.Textbox(
label="Problem Title",
placeholder="Enter problem title",
lines=1
)
description = gr.Textbox(
label="Description",
placeholder="Detailed problem description",
lines=5
)
domain = gr.Dropdown(
label="Domain",
choices=[d.value.title() for d in ProblemDomain],
value="General"
)
context = gr.Textbox(
label="Context (Optional)",
placeholder="Additional context",
lines=2
)
user_id = gr.Textbox(
label="User ID (Optional)",
placeholder="Your identifier",
lines=1
)
analyze_btn = gr.Button("Analyze", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("""
### Analysis Process
1. Submit problem details
2. AI models analyze independently
3. Consensus calculated
4. Results formatted
5. Saved to database
### Features
- Real AI responses
- Quality metrics
- Consensus scoring
- Professional formatting
""")
gr.Markdown("## Results")
with gr.Row():
results = gr.Markdown(label="Analysis Results")
with gr.Row():
summary = gr.Markdown(label="Executive Summary")
analyze_btn.click(
fn=analyze_problem,
inputs=[title, description, domain, user_id, context],
outputs=[results, summary]
)
gr.Markdown("""
---
**System Information**
- Framework: Agno AI Agent Framework
- Models: Claude 3.5, GPT-4, Mistral, SambaNova
- Database: Supabase PostgreSQL
- Version: 2.0 Professional Edition
""")
return demo
def main():
"""Run the application"""
try:
logger.info("Starting AI Democracy System")
demo = create_interface()
demo.launch(
server_name="127.0.0.1",
server_port=7860,
share=False
)
except Exception as e:
logger.error(f"Failed to start: {e}\n{traceback.format_exc()}")
if __name__ == "__main__":
main()