Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import random | |
| from datetime import datetime | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional, Generator | |
| import sqlite3 | |
| from dataclasses import dataclass, asdict | |
| import hashlib | |
| import time | |
| from enum import Enum | |
| import numpy as np | |
| import threading | |
| import queue | |
| import re | |
| # For LLM API integration | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| print("OpenAI library not installed. Install with: pip install openai") | |
| OpenAI = None | |
| try: | |
| from gradio_client import Client as GradioClient | |
| except ImportError: | |
| print("Gradio client not installed. Install with: pip install gradio_client") | |
| GradioClient = None | |
| # For Gemini API | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| except ImportError: | |
| print("Google GenAI library not installed. Install with: pip install google-genai") | |
| genai = None | |
| types = None | |
| # For Claude API | |
| try: | |
| import anthropic | |
| except ImportError: | |
| print("Anthropic library not installed. Install with: pip install anthropic") | |
| anthropic = None | |
| # For Hugging Face Dataset integration | |
| try: | |
| from huggingface_hub import HfApi, login, create_repo, upload_file, hf_hub_download | |
| from datasets import Dataset, load_dataset | |
| import pyarrow.parquet as pq | |
| import pyarrow as pa | |
| except ImportError: | |
| print("Hugging Face libraries not installed. Install with: pip install huggingface_hub datasets pyarrow") | |
| HfApi = None | |
| Dataset = None | |
| # ==================== Configuration ==================== | |
| class Category(Enum): | |
| STORYTELLING = "storytelling" | |
| INNOVATION = "innovation" | |
| BUSINESS = "business" | |
| class Battle: | |
| id: str | |
| prompt_id: str | |
| prompt_text: str | |
| model_a: str | |
| model_b: str | |
| response_a: str | |
| response_b: str | |
| winner: Optional[str] | |
| voter_id: str | |
| timestamp: datetime | |
| category: Category | |
| custom_prompt: bool = False | |
| language: str = "en" | |
| # ==================== Language Configurations ==================== | |
| LANGUAGES = { | |
| "en": "English", | |
| "ko": "한국어" | |
| } | |
| UI_TEXT = { | |
| "en": { | |
| "title": "🎨 AI Models Creativity Battle Arena", | |
| "subtitle": "Test cutting-edge AI models in creative challenges", | |
| "battle_tab": "⚔️ Battle Arena", | |
| "leaderboard_tab": "🏆 Leaderboard", | |
| "category_label": "Select Category", | |
| "custom_prompt_label": "✏️ Custom Challenge (Optional)", | |
| "custom_prompt_placeholder": "Enter your creative challenge for the models...", | |
| "new_battle_btn": "🎲 Start New Battle", | |
| "model_a": "### 🅰️ Model A", | |
| "model_b": "### 🅱️ Model B", | |
| "vote_a": "🅰️ Model A is more creative", | |
| "vote_b": "🅱️ Model B is more creative", | |
| "vote_complete": "### 🎉 Vote Complete!", | |
| "winner": "Winner", | |
| "leaderboard_title": "## 🏆 AI Models Leaderboard", | |
| "category_filter": "Category Filter", | |
| "refresh_btn": "🔄 Refresh", | |
| "language_label": "Language", | |
| "contact": "Contact: arxivgpt@gmail.com", | |
| "challenge_task": "### 📝 Challenge Task", | |
| "category": "Category", | |
| "prompt": "Challenge", | |
| "model_identity": "Model Identity", | |
| "elo_updated": "Scores have been updated!", | |
| "generating": "🔄 Generating response...", | |
| "categories": { | |
| "random": "🎲 Random", | |
| "storytelling": "📚 Storytelling", | |
| "innovation": "💡 Innovation", | |
| "business": "💼 Business" | |
| }, | |
| "filter_categories": { | |
| "overall": "Overall", | |
| "storytelling": "Storytelling", | |
| "innovation": "Innovation", | |
| "business": "Business" | |
| } | |
| }, | |
| "ko": { | |
| "title": "🎨 AI 모델 창의성 배틀 아레나", | |
| "subtitle": "최첨단 AI 모델들의 창의력 대결", | |
| "battle_tab": "⚔️ 배틀 아레나", | |
| "leaderboard_tab": "🏆 리더보드", | |
| "category_label": "카테고리 선택", | |
| "custom_prompt_label": "✏️ 커스텀 도전과제 (선택사항)", | |
| "custom_prompt_placeholder": "모델들에게 도전할 창의적인 과제를 입력하세요...", | |
| "new_battle_btn": "🎲 새로운 배틀 시작", | |
| "model_a": "### 🅰️ 모델 A", | |
| "model_b": "### 🅱️ 모델 B", | |
| "vote_a": "🅰️ 모델 A가 더 창의적이다", | |
| "vote_b": "🅱️ 모델 B가 더 창의적이다", | |
| "vote_complete": "### 🎉 투표 완료!", | |
| "winner": "승자", | |
| "leaderboard_title": "## 🏆 AI 모델 리더보드", | |
| "category_filter": "카테고리 필터", | |
| "refresh_btn": "🔄 새로고침", | |
| "language_label": "언어", | |
| "contact": "문의: arxivgpt@gmail.com", | |
| "challenge_task": "### 📝 도전 과제", | |
| "category": "카테고리", | |
| "prompt": "도전과제", | |
| "model_identity": "모델 정체", | |
| "elo_updated": "점수가 업데이트되었습니다!", | |
| "generating": "🔄 응답 생성 중...", | |
| "categories": { | |
| "random": "🎲 랜덤", | |
| "storytelling": "📚 스토리텔링", | |
| "innovation": "💡 혁신/발명", | |
| "business": "💼 비즈니스" | |
| }, | |
| "filter_categories": { | |
| "overall": "전체", | |
| "storytelling": "스토리텔링", | |
| "innovation": "혁신/발명", | |
| "business": "비즈니스" | |
| } | |
| } | |
| } | |
| # ==================== Simplified Prompt Database ==================== | |
| PROMPTS = { | |
| Category.STORYTELLING: { | |
| "en": [ | |
| {"text": "Write a sci-fi movie proposal with a never-before-explored concept", "difficulty": "high"}, | |
| {"text": "Create a story where the protagonists never meet but fall deeply in love", "difficulty": "high"}, | |
| {"text": "Design a thriller where the twist is revealed in the first scene but still surprises at the end", "difficulty": "high"} | |
| ], | |
| "ko": [ | |
| {"text": "한 번도 다뤄지지 않은 소재로 SF 영화 기획안을 작성하세요", "difficulty": "high"}, | |
| {"text": "주인공들이 한 번도 만나지 않지만 깊은 사랑에 빠지는 스토리를 창작하세요", "difficulty": "high"}, | |
| {"text": "첫 장면에서 반전을 공개하지만 마지막에 여전히 충격적인 스릴러를 설계하세요", "difficulty": "high"} | |
| ] | |
| }, | |
| Category.INNOVATION: { | |
| "en": [ | |
| {"text": "Present 5 innovative ideas that could revolutionize the bicycle", "difficulty": "high"}, | |
| {"text": "Propose 5 breakthrough innovations that could transform email communication", "difficulty": "high"}, | |
| {"text": "Design 5 inventions that could make elevators obsolete", "difficulty": "high"} | |
| ], | |
| "ko": [ | |
| {"text": "자전거를 혁신할 수 있는 획기적인 발명 아이디어를 5개 제시하세요", "difficulty": "high"}, | |
| {"text": "이메일 커뮤니케이션을 완전히 변화시킬 혁신 아이디어를 5개 제시하세요", "difficulty": "high"}, | |
| {"text": "엘리베이터를 대체할 수 있는 5가지 혁신적 발명을 설계하세요", "difficulty": "high"} | |
| ] | |
| }, | |
| Category.BUSINESS: { | |
| "en": [ | |
| {"text": "Design a business model in robotics/drone sector that could become a unicorn startup", "difficulty": "high"}, | |
| {"text": "Create a one-person SaaS business that could scale to $1M ARR", "difficulty": "high"}, | |
| {"text": "Develop a subscription model that people would happily pay $1000/month for", "difficulty": "high"} | |
| ], | |
| "ko": [ | |
| {"text": "로봇/드론 분야에서 유니콘 기업이 될 수 있는 비즈니스 모델을 설계하세요", "difficulty": "high"}, | |
| {"text": "연 매출 10억원을 달성할 수 있는 1인 SaaS 창업 아이템을 기획하세요", "difficulty": "high"}, | |
| {"text": "사람들이 기꺼이 월 100만원을 지불할 만한 구독 비즈니스를 개발하세요", "difficulty": "high"} | |
| ] | |
| } | |
| } | |
| # ==================== Database Management ==================== | |
| class ArenaDatabase: | |
| def __init__(self, db_path="ai_models_arena.db", use_hf=True): | |
| self.db_path = db_path | |
| self.use_hf = use_hf and HfApi is not None | |
| self.hf_token = os.getenv("HF_TOKEN") | |
| self.hf_dataset_name = os.getenv("HF_DATASET_NAME", "ai_models_arena") | |
| self.hf_username = None | |
| if self.use_hf and self.hf_token: | |
| try: | |
| login(token=self.hf_token) | |
| self.api = HfApi() | |
| user_info = self.api.whoami() | |
| self.hf_username = user_info["name"] | |
| self.hf_repo_id = f"{self.hf_username}/{self.hf_dataset_name}" | |
| # Create or access the dataset repository | |
| self._init_hf_dataset() | |
| print(f"✅ Connected to Hugging Face Dataset: {self.hf_repo_id}") | |
| # ⭐ CRITICAL: Try to restore from HF FIRST | |
| if self._restore_from_hf(): | |
| print("✅ Successfully restored data from Hugging Face Dataset") | |
| return # ⭐ EXIT HERE if data exists - DO NOT initialize new database | |
| else: | |
| print("📝 No existing data in HF Dataset, will create new database") | |
| except Exception as e: | |
| print(f"❌ Failed to connect to Hugging Face: {e}") | |
| self.use_hf = False | |
| # ⭐ ONLY initialize new database if HF restore failed or HF not available | |
| print("📝 Initializing new local database") | |
| self.init_database() | |
| def _init_hf_dataset(self): | |
| """Initialize Hugging Face dataset repository""" | |
| try: | |
| # Try to create the repository (it will fail if it already exists) | |
| create_repo( | |
| repo_id=self.hf_repo_id, | |
| repo_type="dataset", | |
| private=True, | |
| exist_ok=True | |
| ) | |
| print(f"✅ HF Dataset repository ready: {self.hf_repo_id}") | |
| except Exception as e: | |
| print(f"Dataset repo creation note: {e}") | |
| def _restore_from_hf(self): | |
| """⭐ NEW METHOD: Restore complete database from HF - returns True if successful""" | |
| try: | |
| print("🔄 Attempting to restore data from Hugging Face...") | |
| # Try to load battles data | |
| try: | |
| dataset = load_dataset(self.hf_repo_id, split="train", token=self.hf_token) | |
| except Exception as e: | |
| print(f"No existing battles data found: {e}") | |
| return False | |
| if not dataset or len(dataset) == 0: | |
| print("Dataset exists but is empty") | |
| return False | |
| print(f"Found {len(dataset)} battles in HF Dataset") | |
| # Create fresh local database with data from HF | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create tables | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS battles ( | |
| id TEXT PRIMARY KEY, | |
| prompt_id TEXT, | |
| prompt_text TEXT, | |
| category TEXT, | |
| model_a TEXT, | |
| model_b TEXT, | |
| response_a TEXT, | |
| response_b TEXT, | |
| winner TEXT, | |
| voter_id TEXT, | |
| timestamp DATETIME, | |
| custom_prompt INTEGER DEFAULT 0, | |
| language TEXT DEFAULT 'en' | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS model_stats ( | |
| model_name TEXT PRIMARY KEY, | |
| overall_score REAL DEFAULT 5.0, | |
| storytelling_score REAL DEFAULT 5.0, | |
| innovation_score REAL DEFAULT 5.0, | |
| business_score REAL DEFAULT 5.0, | |
| total_battles INTEGER DEFAULT 0, | |
| wins INTEGER DEFAULT 0, | |
| losses INTEGER DEFAULT 0, | |
| elo_rating INTEGER DEFAULT 1500 | |
| ) | |
| ''') | |
| # Restore battles data | |
| battles_df = dataset.to_pandas() | |
| battles_df.to_sql('battles', conn, if_exists='replace', index=False) | |
| print(f"✅ Restored {len(battles_df)} battles") | |
| # Try to restore model stats | |
| stats_restored = False | |
| try: | |
| stats_dataset = load_dataset(self.hf_repo_id, split="stats", token=self.hf_token) | |
| if stats_dataset and len(stats_dataset) > 0: | |
| stats_df = stats_dataset.to_pandas() | |
| stats_df.to_sql('model_stats', conn, if_exists='replace', index=False) | |
| print(f"✅ Restored model stats") | |
| stats_restored = True | |
| except Exception as e: | |
| print(f"Could not restore stats: {e}") | |
| # If stats not restored, recalculate from battles | |
| if not stats_restored: | |
| print("📊 Recalculating stats from battle history...") | |
| self._recalculate_stats_from_battles(cursor) | |
| conn.commit() | |
| conn.close() | |
| return True # Successfully restored | |
| except Exception as e: | |
| print(f"Failed to restore from HF: {e}") | |
| return False | |
| def _recalculate_stats_from_battles(self, cursor): | |
| """Recalculate model stats from battle history""" | |
| # Initialize all 4 models | |
| for model in ["GPT-5", "jetXA", "Gemini-2.5-Pro", "Claude-Opus-4.1"]: | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO model_stats | |
| (model_name, overall_score, storytelling_score, innovation_score, | |
| business_score, total_battles, wins, losses, elo_rating) | |
| VALUES (?, 5.0, 5.0, 5.0, 5.0, 0, 0, 0, 1500) | |
| ''', (model,)) | |
| # Get all battles with winners | |
| cursor.execute(''' | |
| SELECT model_a, model_b, winner, category FROM battles WHERE winner IS NOT NULL | |
| ''') | |
| battles = cursor.fetchall() | |
| # Process each battle | |
| for model_a, model_b, winner, category in battles: | |
| # Update win/loss counts | |
| if winner == model_a: | |
| cursor.execute('UPDATE model_stats SET wins = wins + 1, total_battles = total_battles + 1 WHERE model_name = ?', (model_a,)) | |
| cursor.execute('UPDATE model_stats SET losses = losses + 1, total_battles = total_battles + 1 WHERE model_name = ?', (model_b,)) | |
| # Update category scores | |
| self._update_category_scores(cursor, model_a, Category(category), True) | |
| self._update_category_scores(cursor, model_b, Category(category), False) | |
| else: | |
| cursor.execute('UPDATE model_stats SET wins = wins + 1, total_battles = total_battles + 1 WHERE model_name = ?', (model_b,)) | |
| cursor.execute('UPDATE model_stats SET losses = losses + 1, total_battles = total_battles + 1 WHERE model_name = ?', (model_a,)) | |
| # Update category scores | |
| self._update_category_scores(cursor, model_b, Category(category), True) | |
| self._update_category_scores(cursor, model_a, Category(category), False) | |
| # Recalculate ELO ratings | |
| self._recalculate_elo_from_battles(cursor) | |
| print(f"✅ Recalculated stats from {len(battles)} battles") | |
| def _recalculate_elo_from_battles(self, cursor): | |
| """Recalculate ELO ratings from battle history""" | |
| # Reset ELO to 1500 | |
| cursor.execute('UPDATE model_stats SET elo_rating = 1500') | |
| # Get battles in chronological order | |
| cursor.execute(''' | |
| SELECT model_a, model_b, winner FROM battles | |
| WHERE winner IS NOT NULL | |
| ORDER BY timestamp | |
| ''') | |
| battles = cursor.fetchall() | |
| for model_a, model_b, winner in battles: | |
| # Get current ELO ratings | |
| cursor.execute('SELECT elo_rating FROM model_stats WHERE model_name = ?', (model_a,)) | |
| elo_a = cursor.fetchone()[0] | |
| cursor.execute('SELECT elo_rating FROM model_stats WHERE model_name = ?', (model_b,)) | |
| elo_b = cursor.fetchone()[0] | |
| # Calculate new ELO | |
| K = 32 | |
| if winner == model_a: | |
| expected_a = 1 / (1 + 10**((elo_b - elo_a) / 400)) | |
| new_elo_a = int(elo_a + K * (1 - expected_a)) | |
| new_elo_b = int(elo_b + K * (0 - (1 - expected_a))) | |
| else: | |
| expected_b = 1 / (1 + 10**((elo_a - elo_b) / 400)) | |
| new_elo_a = int(elo_a + K * (0 - (1 - expected_b))) | |
| new_elo_b = int(elo_b + K * (1 - expected_b)) | |
| cursor.execute('UPDATE model_stats SET elo_rating = ? WHERE model_name = ?', (new_elo_a, model_a)) | |
| cursor.execute('UPDATE model_stats SET elo_rating = ? WHERE model_name = ?', (new_elo_b, model_b)) | |
| def _sync_to_hf(self): | |
| """Sync local database to Hugging Face with improved error handling""" | |
| if not self.use_hf: | |
| print("HF sync disabled") | |
| return | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| # Export battles | |
| battles_df = pd.read_sql_query("SELECT * FROM battles", conn) | |
| if len(battles_df) > 0: | |
| print(f"📤 Syncing {len(battles_df)} battles to HF...") | |
| # Convert to Dataset | |
| battles_dataset = Dataset.from_pandas(battles_df) | |
| # Push to hub with retry logic | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| battles_dataset.push_to_hub( | |
| self.hf_repo_id, | |
| split="train", | |
| token=self.hf_token, | |
| private=True | |
| ) | |
| print(f"✅ Successfully pushed {len(battles_df)} battles to HF") | |
| break | |
| except Exception as push_error: | |
| if attempt < max_retries - 1: | |
| print(f"⚠️ Push attempt {attempt + 1} failed, retrying...") | |
| time.sleep(2) # Wait before retry | |
| else: | |
| print(f"❌ Failed to push to HF after {max_retries} attempts: {push_error}") | |
| # Also sync model stats for backup | |
| stats_df = pd.read_sql_query("SELECT * FROM model_stats", conn) | |
| if len(stats_df) > 0: | |
| try: | |
| stats_dataset = Dataset.from_pandas(stats_df) | |
| stats_dataset.push_to_hub( | |
| self.hf_repo_id, | |
| split="stats", | |
| token=self.hf_token, | |
| private=True | |
| ) | |
| print(f"✅ Model stats synced to HF") | |
| except Exception as e: | |
| print(f"⚠️ Could not sync stats: {e}") | |
| conn.close() | |
| except Exception as e: | |
| print(f"❌ Critical error in HF sync: {e}") | |
| def init_database(self): | |
| """Initialize SQLite database - ONLY called when no existing data""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS battles ( | |
| id TEXT PRIMARY KEY, | |
| prompt_id TEXT, | |
| prompt_text TEXT, | |
| category TEXT, | |
| model_a TEXT, | |
| model_b TEXT, | |
| response_a TEXT, | |
| response_b TEXT, | |
| winner TEXT, | |
| voter_id TEXT, | |
| timestamp DATETIME, | |
| custom_prompt INTEGER DEFAULT 0, | |
| language TEXT DEFAULT 'en' | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS model_stats ( | |
| model_name TEXT PRIMARY KEY, | |
| overall_score REAL DEFAULT 5.0, | |
| storytelling_score REAL DEFAULT 5.0, | |
| innovation_score REAL DEFAULT 5.0, | |
| business_score REAL DEFAULT 5.0, | |
| total_battles INTEGER DEFAULT 0, | |
| wins INTEGER DEFAULT 0, | |
| losses INTEGER DEFAULT 0, | |
| elo_rating INTEGER DEFAULT 1500 | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| self._init_models() | |
| def _init_models(self): | |
| """Initialize all 4 models""" | |
| models = ["GPT-5", "jetXA", "Gemini-2.5-Pro", "Claude-Opus-4.1"] | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| for model in models: | |
| cursor.execute(''' | |
| INSERT OR IGNORE INTO model_stats (model_name) VALUES (?) | |
| ''', (model,)) | |
| conn.commit() | |
| conn.close() | |
| def save_battle(self, battle: Battle): | |
| """Save battle result with proper duplicate prevention and sync""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| # First check if this battle already exists | |
| cursor.execute('SELECT id, winner FROM battles WHERE id = ?', (battle.id,)) | |
| existing = cursor.fetchone() | |
| if existing and existing[1]: | |
| print(f"⚠️ Battle {battle.id} already has a winner: {existing[1]}") | |
| conn.close() | |
| return # Don't update if already voted | |
| # Insert or update the battle | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO battles VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| battle.id, | |
| battle.prompt_id, | |
| battle.prompt_text, | |
| battle.category.value, | |
| battle.model_a, | |
| battle.model_b, | |
| battle.response_a, | |
| battle.response_b, | |
| battle.winner, | |
| battle.voter_id, | |
| battle.timestamp.isoformat(), | |
| 1 if battle.custom_prompt else 0, | |
| battle.language | |
| )) | |
| if battle.winner: | |
| winner = battle.winner | |
| loser = battle.model_b if winner == battle.model_a else battle.model_a | |
| # Only update stats if this is a new vote | |
| if not existing or not existing[1]: | |
| print(f"📊 Updating stats: {winner} wins, {loser} loses") | |
| # Update winner stats | |
| cursor.execute(''' | |
| UPDATE model_stats | |
| SET total_battles = total_battles + 1, | |
| wins = wins + 1 | |
| WHERE model_name = ? | |
| ''', (winner,)) | |
| # Update loser stats | |
| cursor.execute(''' | |
| UPDATE model_stats | |
| SET total_battles = total_battles + 1, | |
| losses = losses + 1 | |
| WHERE model_name = ? | |
| ''', (loser,)) | |
| # Update category scores | |
| self._update_category_scores(cursor, winner, battle.category, True) | |
| self._update_category_scores(cursor, loser, battle.category, False) | |
| # Update ELO ratings | |
| self._update_elo_ratings(cursor, winner, loser) | |
| print(f"✅ Stats updated for battle {battle.id}") | |
| conn.commit() | |
| print(f"💾 Battle {battle.id} saved to local database") | |
| except Exception as e: | |
| print(f"❌ Error saving battle: {e}") | |
| conn.rollback() | |
| finally: | |
| conn.close() | |
| # Sync to Hugging Face after saving | |
| self._sync_to_hf() | |
| def _update_category_scores(self, cursor, model, category, is_winner): | |
| """Update category-specific scores""" | |
| column_map = { | |
| Category.STORYTELLING: "storytelling_score", | |
| Category.INNOVATION: "innovation_score", | |
| Category.BUSINESS: "business_score" | |
| } | |
| score_column = column_map.get(category, "overall_score") | |
| cursor.execute(f'SELECT {score_column} FROM model_stats WHERE model_name = ?', (model,)) | |
| result = cursor.fetchone() | |
| if result: | |
| current_score = result[0] | |
| else: | |
| current_score = 5.0 | |
| if is_winner: | |
| new_score = min(10, current_score + 0.2) | |
| else: | |
| new_score = max(0, current_score - 0.1) | |
| cursor.execute(f'UPDATE model_stats SET {score_column} = ? WHERE model_name = ?', | |
| (new_score, model)) | |
| # Update overall score | |
| cursor.execute(''' | |
| UPDATE model_stats | |
| SET overall_score = (storytelling_score + innovation_score + business_score) / 3.0 | |
| WHERE model_name = ? | |
| ''', (model,)) | |
| def _update_elo_ratings(self, cursor, winner, loser): | |
| """Update ELO ratings""" | |
| K = 32 | |
| cursor.execute('SELECT elo_rating FROM model_stats WHERE model_name = ?', (winner,)) | |
| winner_elo = cursor.fetchone()[0] | |
| cursor.execute('SELECT elo_rating FROM model_stats WHERE model_name = ?', (loser,)) | |
| loser_elo = cursor.fetchone()[0] | |
| expected_winner = 1 / (1 + 10**((loser_elo - winner_elo) / 400)) | |
| expected_loser = 1 / (1 + 10**((winner_elo - loser_elo) / 400)) | |
| new_winner_elo = int(winner_elo + K * (1 - expected_winner)) | |
| new_loser_elo = int(loser_elo + K * (0 - expected_loser)) | |
| cursor.execute('UPDATE model_stats SET elo_rating = ? WHERE model_name = ?', | |
| (new_winner_elo, winner)) | |
| cursor.execute('UPDATE model_stats SET elo_rating = ? WHERE model_name = ?', | |
| (new_loser_elo, loser)) | |
| def get_leaderboard(self, category: Optional[Category] = None) -> pd.DataFrame: | |
| """Get leaderboard data""" | |
| conn = sqlite3.connect(self.db_path) | |
| if category: | |
| column_map = { | |
| Category.STORYTELLING: "storytelling_score", | |
| Category.INNOVATION: "innovation_score", | |
| Category.BUSINESS: "business_score" | |
| } | |
| sort_column = column_map.get(category, "overall_score") | |
| else: | |
| sort_column = "overall_score" | |
| query = f''' | |
| SELECT | |
| model_name, | |
| ROUND(overall_score, 1) as overall_score, | |
| ROUND(storytelling_score, 1) as storytelling_score, | |
| ROUND(innovation_score, 1) as innovation_score, | |
| ROUND(business_score, 1) as business_score, | |
| total_battles, | |
| wins, | |
| CASE | |
| WHEN total_battles > 0 | |
| THEN ROUND(100.0 * wins / total_battles, 1) | |
| ELSE 0 | |
| END as win_rate, | |
| elo_rating | |
| FROM model_stats | |
| ORDER BY {sort_column} DESC, elo_rating DESC | |
| ''' | |
| df = pd.read_sql_query(query, conn) | |
| conn.close() | |
| df.insert(0, 'rank', range(1, len(df) + 1)) | |
| return df | |
| def debug_database_state(self): | |
| """Debug method to check current database state""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Check battles count | |
| cursor.execute("SELECT COUNT(*) FROM battles") | |
| total_battles = cursor.fetchone()[0] | |
| cursor.execute("SELECT COUNT(*) FROM battles WHERE winner IS NOT NULL") | |
| voted_battles = cursor.fetchone()[0] | |
| # Check model stats | |
| cursor.execute("SELECT * FROM model_stats ORDER BY elo_rating DESC") | |
| stats = cursor.fetchall() | |
| conn.close() | |
| print("\n" + "="*50) | |
| print("📊 DATABASE STATE DEBUG") | |
| print("="*50) | |
| print(f"Total battles: {total_battles}") | |
| print(f"Voted battles: {voted_battles}") | |
| print("\nModel Stats:") | |
| print("-"*50) | |
| for stat in stats: | |
| print(f"{stat[0]:20} | Battles: {stat[5]:3} | Wins: {stat[6]:3} | ELO: {stat[8]:4}") | |
| print("="*50 + "\n") | |
| return { | |
| "total_battles": total_battles, | |
| "voted_battles": voted_battles, | |
| "model_stats": stats | |
| } | |
| # ==================== Fixed LLM Interface with 4 Models ==================== | |
| class LLMInterface: | |
| """Interface for GPT-5, jetXA, Gemini 2.5 Pro, and Claude Opus 4.1 models""" | |
| def __init__(self): | |
| self.models = ["GPT-5", "jetXA", "Gemini-2.5-Pro", "Claude-Opus-4.1"] | |
| self.response_cache = {} | |
| self.cache_enabled = False # Disable caching by default | |
| # Initialize OpenAI client for GPT-5 | |
| self.openai_client = None | |
| openai_key = os.getenv("OPENAI_API_KEY") | |
| if openai_key and OpenAI: | |
| try: | |
| self.openai_client = OpenAI(api_key=openai_key) | |
| print("✅ GPT-5 client initialized") | |
| except Exception as e: | |
| print(f"❌ GPT-5 initialization failed: {e}") | |
| else: | |
| print("⚠️ GPT-5: No API key or OpenAI library not installed") | |
| # Initialize Gradio client for jetXA | |
| self.gradio_client = None | |
| jetxa_space = os.getenv("jetXA_API", "aiqtech/tests") | |
| hf_token = os.getenv("HF_TOKEN") | |
| if GradioClient: | |
| connection_attempts = [ | |
| lambda: GradioClient(jetxa_space, hf_token=hf_token) if hf_token else GradioClient(jetxa_space), | |
| lambda: GradioClient(f"https://huggingface.co/spaces/{jetxa_space}"), | |
| lambda: GradioClient(f"https://{jetxa_space.replace('/', '-')}.hf.space"), | |
| lambda: GradioClient(src=jetxa_space), | |
| lambda: GradioClient("aiqtech/tests") | |
| ] | |
| for i, attempt in enumerate(connection_attempts, 1): | |
| try: | |
| self.gradio_client = attempt() | |
| if hasattr(self.gradio_client, 'view_api'): | |
| api_info = self.gradio_client.view_api() | |
| print(f"✅ jetXA client initialized successfully using method {i}!") | |
| break | |
| except Exception as e: | |
| if i == len(connection_attempts): | |
| print(f"⚠️ jetXA: All connection attempts failed. Last error: {e}") | |
| print("Will use fallback responses for jetXA") | |
| else: | |
| continue | |
| else: | |
| print("⚠️ jetXA: Gradio client not installed") | |
| # Initialize Gemini client | |
| self.gemini_client = None | |
| gemini_key = os.getenv("GEMINI_API_KEY") | |
| if gemini_key and genai: | |
| try: | |
| self.gemini_client = genai.Client(api_key=gemini_key) | |
| print("✅ Gemini 2.5 Pro client initialized") | |
| except Exception as e: | |
| print(f"❌ Gemini initialization failed: {e}") | |
| else: | |
| print("⚠️ Gemini: No API key or google-genai library not installed") | |
| # Initialize Claude client | |
| self.claude_client = None | |
| claude_key = os.getenv("ANTHROPIC_API_KEY") | |
| if claude_key and anthropic: | |
| try: | |
| self.claude_client = anthropic.Anthropic(api_key=claude_key) | |
| print("✅ Claude Opus 4.1 client initialized") | |
| except Exception as e: | |
| print(f"❌ Claude initialization failed: {e}") | |
| else: | |
| print("⚠️ Claude: No API key or anthropic library not installed") | |
| def clear_cache(self): | |
| """Clear all cached responses""" | |
| self.response_cache = {} | |
| print("✅ Cache cleared") | |
| def generate_response_stream(self, model: str, prompt: str, language: str = "en") -> Generator[str, None, None]: | |
| """Generate streaming response with proper accumulation""" | |
| # Add language and creativity instructions | |
| if language == "ko": | |
| instruction = "창의적이고 혁신적인 한국어 답변을 작성해주세요. 독창적이고 상세한 아이디어를 제시하세요." | |
| else: | |
| instruction = "Provide a highly creative and innovative response. Be original and detailed." | |
| full_prompt = f"{instruction}\n\n{prompt}" | |
| try: | |
| if model == "GPT-5": | |
| # Stream GPT-5 with proper accumulation | |
| accumulated = "" | |
| for chunk in self._stream_gpt5(full_prompt): | |
| accumulated += chunk | |
| yield accumulated # Always yield the accumulated text | |
| elif model == "jetXA": | |
| # Get full response and simulate streaming | |
| full_response = self._get_jetxa_response(full_prompt) | |
| if full_response: | |
| # Format jetXA response with proper spacing | |
| formatted_response = self._format_jetxa_response(full_response) | |
| # Simulate streaming word by word for jetXA for smoother effect | |
| words = formatted_response.split() | |
| accumulated = "" | |
| # Stream words in small batches for natural effect | |
| batch_size = 2 # Stream 2 words at a time | |
| for i in range(0, len(words), batch_size): | |
| batch = words[i:i+batch_size] | |
| for word in batch: | |
| if accumulated: | |
| accumulated += " " | |
| accumulated += word | |
| yield accumulated # Yield accumulated text after each batch | |
| time.sleep(0.03) # Small delay between batches | |
| else: | |
| # Use fallback if jetXA fails | |
| fallback = self._generate_fallback(model, prompt, language) | |
| # Stream fallback with accumulation | |
| words = fallback.split() | |
| accumulated = "" | |
| for word in words: | |
| if accumulated: | |
| accumulated += " " | |
| accumulated += word | |
| yield accumulated | |
| time.sleep(0.02) | |
| elif model == "Gemini-2.5-Pro": | |
| # Stream Gemini with proper accumulation | |
| accumulated = "" | |
| for chunk in self._stream_gemini(full_prompt): | |
| accumulated += chunk | |
| yield accumulated | |
| elif model == "Claude-Opus-4.1": | |
| # Stream Claude with proper accumulation | |
| accumulated = "" | |
| for chunk in self._stream_claude(full_prompt): | |
| accumulated += chunk | |
| yield accumulated | |
| else: | |
| # Unknown model - use fallback | |
| fallback = self._generate_fallback(model, prompt, language) | |
| # Stream fallback with accumulation | |
| words = fallback.split() | |
| accumulated = "" | |
| for word in words: | |
| if accumulated: | |
| accumulated += " " | |
| accumulated += word | |
| yield accumulated | |
| time.sleep(0.02) | |
| except Exception as e: | |
| print(f"Error streaming {model}: {e}") | |
| fallback = self._generate_fallback(model, prompt, language) | |
| yield fallback | |
| def _stream_gemini(self, prompt: str) -> Generator[str, None, None]: | |
| """Stream Gemini 2.5 Pro response""" | |
| if not self.gemini_client: | |
| fallback = self._generate_fallback("Gemini-2.5-Pro", prompt, "en") | |
| words = fallback.split() | |
| for word in words: | |
| yield word + " " | |
| time.sleep(0.02) | |
| return | |
| try: | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=prompt)], | |
| ), | |
| ] | |
| # 수정된 설정 - max_output_tokens 증가 및 thinking_config 제거 | |
| generate_content_config = types.GenerateContentConfig( | |
| response_mime_type="text/plain", | |
| temperature=0.9, # 창의성을 위해 온도 상승 | |
| max_output_tokens=2048, # 토큰 수 증가 | |
| top_p=0.95, | |
| top_k=40, | |
| ) | |
| # 전체 응답을 수집 | |
| full_response = "" | |
| for chunk in self.gemini_client.models.generate_content_stream( | |
| model="gemini-2.0-flash-exp", # 또는 "gemini-2.0-flash-thinking-exp-1219" | |
| contents=contents, | |
| config=generate_content_config, | |
| ): | |
| if chunk.text: | |
| full_response += chunk.text | |
| yield chunk.text | |
| # 응답이 너무 짧으면 재시도 | |
| if len(full_response) < 100: | |
| print(f"⚠️ Gemini response too short ({len(full_response)} chars), using fallback") | |
| fallback = self._generate_fallback("Gemini-2.5-Pro", prompt, "en") | |
| yield fallback | |
| except Exception as e: | |
| print(f"Gemini streaming error: {e}") | |
| fallback = self._generate_fallback("Gemini-2.5-Pro", prompt, "en") | |
| yield fallback | |
| def _stream_claude(self, prompt: str) -> Generator[str, None, None]: | |
| """Stream Claude Opus 4.1 response""" | |
| if not self.claude_client: | |
| fallback = self._generate_fallback("Claude-Opus-4.1", prompt, "en") | |
| words = fallback.split() | |
| for word in words: | |
| yield word + " " | |
| time.sleep(0.02) | |
| return | |
| try: | |
| with self.claude_client.messages.stream( | |
| model="claude-opus-4-1-20250805", | |
| max_tokens=1500, | |
| temperature=0.8, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": prompt | |
| } | |
| ] | |
| } | |
| ] | |
| ) as stream: | |
| for text in stream.text_stream: | |
| yield text | |
| except Exception as e: | |
| print(f"Claude streaming error: {e}") | |
| fallback = self._generate_fallback("Claude-Opus-4.1", prompt, "en") | |
| yield fallback | |
| def _format_jetxa_response(self, text: str) -> str: | |
| """Format jetXA response with proper spacing and line breaks for better readability""" | |
| # Clean up the response first | |
| text = self._clean_markdown_response(text) | |
| # Split into lines | |
| lines = text.split('\n') | |
| formatted_lines = [] | |
| for i, line in enumerate(lines): | |
| line = line.strip() | |
| if not line: | |
| # Keep empty lines for spacing | |
| formatted_lines.append('') | |
| continue | |
| # Add extra spacing around headers | |
| if line.startswith('#'): | |
| # Add double blank line before headers (except first line) | |
| if i > 0 and formatted_lines and formatted_lines[-1].strip(): | |
| formatted_lines.append('') | |
| formatted_lines.append('') | |
| formatted_lines.append(line) | |
| # Add blank line after major headers | |
| if line.startswith('# ') or line.startswith('## '): | |
| formatted_lines.append('') | |
| # Add spacing around lists | |
| elif line.startswith('- ') or line.startswith('* ') or re.match(r'^\d+\. ', line): | |
| # Add blank line before first list item | |
| if i > 0 and formatted_lines and formatted_lines[-1].strip() and not ( | |
| formatted_lines[-1].startswith('- ') or | |
| formatted_lines[-1].startswith('* ') or | |
| re.match(r'^\d+\. ', formatted_lines[-1]) | |
| ): | |
| formatted_lines.append('') | |
| formatted_lines.append(line) | |
| else: | |
| formatted_lines.append(line) | |
| # Join with newlines | |
| result = '\n'.join(formatted_lines) | |
| # Clean up excessive blank lines (max 2 consecutive) | |
| while '\n\n\n\n' in result: | |
| result = result.replace('\n\n\n\n', '\n\n') | |
| while '\n\n\n' in result: | |
| result = result.replace('\n\n\n', '\n\n') | |
| return result.strip() | |
| def _stream_gpt5(self, prompt: str) -> Generator[str, None, None]: | |
| """Stream GPT-5 API response - returns chunks only (not accumulated)""" | |
| if not self.openai_client: | |
| fallback = self._generate_fallback("GPT-5", prompt, "en") | |
| words = fallback.split() | |
| for word in words: | |
| yield word + " " | |
| time.sleep(0.02) | |
| return | |
| try: | |
| stream = self.openai_client.chat.completions.create( | |
| model="gpt-4", # Use gpt-4 as fallback if gpt-5 not available | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=1500, | |
| temperature=0.8, | |
| stream=True | |
| ) | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content # Yield only the chunk | |
| except Exception as e: | |
| print(f"GPT-5 streaming error: {e}") | |
| fallback = self._generate_fallback("GPT-5", prompt, "en") | |
| yield fallback | |
| def _get_jetxa_response(self, prompt: str) -> str: | |
| """Get complete response from jetXA""" | |
| if not self.gradio_client: | |
| return "" | |
| try: | |
| result = self.gradio_client.predict( | |
| message=prompt, | |
| history=[], | |
| use_search=False, | |
| show_agent_thoughts=False, | |
| search_count=5, | |
| api_name="/process_query_optimized" | |
| ) | |
| response_text = "" | |
| if result and isinstance(result, (tuple, list)) and len(result) >= 1: | |
| chat_history = result[0] | |
| if isinstance(chat_history, list) and len(chat_history) > 0: | |
| for msg in reversed(chat_history): | |
| if isinstance(msg, dict): | |
| content = msg.get('content', '') | |
| if content: | |
| response_text = str(content) | |
| break | |
| elif isinstance(msg, (list, tuple)) and len(msg) >= 2: | |
| if msg[1]: | |
| response_text = str(msg[1]) | |
| break | |
| if not response_text: | |
| for i in range(1, min(3, len(result))): | |
| if result[i] and isinstance(result[i], str) and result[i].strip(): | |
| response_text = result[i] | |
| break | |
| if response_text: | |
| # Clean up any potential formatting issues | |
| response_text = self._clean_markdown_response(response_text) | |
| return response_text | |
| except Exception as e: | |
| print(f"jetXA response error: {e}") | |
| return "" | |
| def _clean_markdown_response(self, text: str) -> str: | |
| """Clean and fix common markdown formatting issues""" | |
| # Remove any duplicate markers or broken formatting | |
| text = text.replace('| ---', '|---') # Fix table separators | |
| text = text.replace('---\n---', '---') # Remove duplicate horizontal rules | |
| # Ensure proper spacing around headers | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| for i, line in enumerate(lines): | |
| # Fix header formatting | |
| if line.strip().startswith('#'): | |
| # Ensure space after # symbols | |
| if '#' in line and not line.startswith('# '): | |
| parts = line.split('#', 1) | |
| if len(parts) > 1: | |
| hash_count = len(line) - len(line.lstrip('#')) | |
| line = '#' * hash_count + ' ' + parts[-1].strip() | |
| # Add blank line before headers (except first line) | |
| if i > 0 and cleaned_lines and cleaned_lines[-1].strip(): | |
| cleaned_lines.append('') | |
| # Fix table formatting | |
| if '|' in line: | |
| # Ensure proper table separator | |
| if all(c in ['-', '|', ' '] for c in line.strip()): | |
| line = line.replace(' ', '').replace('|-', '|---').replace('-|', '---|') | |
| if not line.startswith('|'): | |
| line = '|' + line | |
| if not line.endswith('|'): | |
| line = line + '|' | |
| cleaned_lines.append(line) | |
| return '\n'.join(cleaned_lines) | |
| def _generate_fallback(self, model: str, prompt: str, language: str) -> str: | |
| """Generate high-quality fallback response with language support and proper markdown""" | |
| # Determine category from prompt | |
| if any(word in prompt.lower() for word in ["story", "movie", "novel", "plot", "스토리", "영화", "소설"]): | |
| category = "story" | |
| elif any(word in prompt.lower() for word in ["innovate", "invent", "revolution", "혁신", "발명", "개발"]): | |
| category = "innovation" | |
| else: | |
| category = "business" | |
| # Korean responses with better markdown formatting | |
| if language == "ko": | |
| responses = { | |
| "story": { | |
| "GPT-5": """# 양자 거울 | |
| ## 시놉시스 | |
| 한 형사가 도시의 모든 거울이 실제로 **범죄가 예방된 다른 타임라인**으로 통하는 포털임을 발견한다.""", | |
| "jetXA": """# 감정 고고학 | |
| ## 기획 의도 | |
| 2045년, 고고학자들은 유물을 발굴하지 않는다—그들은 **비극의 장소에 남겨진 압축된 인간 감정**을 발굴한다.""", | |
| "Gemini-2.5-Pro": """# 기억의 도서관 | |
| ## 줄거리 | |
| 죽은 사람들의 마지막 기억이 책으로 변하는 **사후 도서관**을 발견한 사서의 이야기.""", | |
| "Claude-Opus-4.1": """# 시간의 정원사 | |
| ## 개요 | |
| 매일 밤 다른 시대로 이동하는 정원을 관리하며 **역사의 순간들을 가꾸는** 정원사의 모험.""" | |
| }, | |
| "innovation": { | |
| "GPT-5": """# 🚲 자전거 혁신 5가지 | |
| ## 1. **중력 무시 바퀴** (Gravity Defiance Wheels) | |
| - **기술**: 전자기 림이 오르막길에서 무게를 거의 0으로 감소""", | |
| "jetXA": """# 📧 이메일 혁명 5가지 | |
| ## 1. **시간 메시징** (Temporal Messaging) | |
| ### 핵심 기능 | |
| - ⏰ 과거/미래로 이메일 전송""", | |
| "Gemini-2.5-Pro": """# 🚲 자전거 미래 혁신 | |
| ## 1. **AI 균형 시스템** | |
| - 자이로스코프와 AI가 결합되어 절대 넘어지지 않는 자전거""", | |
| "Claude-Opus-4.1": """# 📧 이메일 진화 | |
| ## 1. **감정 전송 시스템** | |
| - 텍스트와 함께 작성자의 감정 상태를 전달하는 기술""" | |
| }, | |
| "business": { | |
| "GPT-5": """# 🚁 NeuralNest - 10억달러 드론 심리 플랫폼 | |
| ## 사업 개요 | |
| ### 비전 | |
| > **"위기 지역에서 실시간 정신 건강 지원을 제공하는 세계 최초 AI 드론 플랫폼"**""", | |
| "jetXA": """# 💾 MemoryBank - 월 100만원 구독 서비스 | |
| ## 서비스 개요 | |
| ### 핵심 가치 | |
| > **"당신의 모든 기억을 영원히 보존하고 다시 경험하세요"**""", | |
| "Gemini-2.5-Pro": """# 🤖 RoboChef - 로봇 요리사 플랫폼 | |
| ## 비즈니스 모델 | |
| ### 목표 | |
| > **"미슐랭 스타 셰프의 요리를 집에서 재현하는 AI 로봇"**""", | |
| "Claude-Opus-4.1": """# 🏢 VirtualOffice - 메타버스 사무실 | |
| ## 서비스 컨셉 | |
| ### 미션 | |
| > **"물리적 사무실이 필요 없는 완벽한 가상 근무 환경"**""" | |
| } | |
| } | |
| else: | |
| # English responses | |
| responses = { | |
| "story": { | |
| "GPT-5": """# The Quantum Mirror | |
| ## Synopsis | |
| A detective discovers that every mirror in the city is actually a portal to **alternate timelines where crimes were prevented**.""", | |
| "jetXA": """# Emotional Archaeology | |
| ## Concept | |
| In 2045, archaeologists don't dig for artifacts—they excavate **compressed human emotions left in places of tragedy**.""", | |
| "Gemini-2.5-Pro": """# The Memory Library | |
| ## Plot | |
| A librarian discovers a **posthumous library** where dead people's last memories transform into books.""", | |
| "Claude-Opus-4.1": """# The Time Gardener | |
| ## Overview | |
| Adventures of a gardener who tends to a garden that **shifts to different historical eras** each night.""" | |
| }, | |
| "innovation": { | |
| "GPT-5": """# 🚲 5 Bicycle Innovations | |
| ## 1. **Gravity Defiance Wheels** | |
| - **Tech**: Electromagnetic rims reduce weight to near-zero when pedaling uphill""", | |
| "jetXA": """# 📧 5 Email Revolutionaries | |
| ## 1. **Temporal Messaging** | |
| ### Core Features | |
| - ⏰ Send emails to past/future""", | |
| "Gemini-2.5-Pro": """# 🚲 Future Bicycle Tech | |
| ## 1. **AI Balance System** | |
| - Gyroscope + AI creates a bicycle that never falls over""", | |
| "Claude-Opus-4.1": """# 📧 Email Evolution | |
| ## 1. **Emotion Transfer System** | |
| - Technology that transmits the sender's emotional state with text""" | |
| }, | |
| "business": { | |
| "GPT-5": """# 🚁 NeuralNest - $1B Drone Psychology Platform | |
| ## Business Overview | |
| ### Vision | |
| > **"World's first AI drone platform providing real-time mental health support in crisis zones"**""", | |
| "jetXA": """# 💾 MemoryBank - $1000/month Subscription | |
| ## Service Overview | |
| ### Core Value | |
| > **"Preserve and re-experience all your memories forever"**""", | |
| "Gemini-2.5-Pro": """# 🤖 RoboChef - Robot Chef Platform | |
| ## Business Model | |
| ### Goal | |
| > **"AI robots that recreate Michelin star chef dishes at home"**""", | |
| "Claude-Opus-4.1": """# 🏢 VirtualOffice - Metaverse Workspace | |
| ## Service Concept | |
| ### Mission | |
| > **"Perfect virtual work environment eliminating need for physical offices"**""" | |
| } | |
| } | |
| return responses[category].get(model, responses[category]["GPT-5"]) | |
| # ==================== Main Arena Class ==================== | |
| class CreativityArena: | |
| def __init__(self): | |
| self.db = ArenaDatabase() | |
| self.llm = LLMInterface() | |
| self.current_battle = None | |
| def get_random_prompt(self, category: Category, language: str = "en") -> dict: | |
| """Get random prompt from database""" | |
| prompts = PROMPTS[category].get(language, PROMPTS[category]["en"]) | |
| return random.choice(prompts) | |
| def start_new_battle_stream(self, category: str, custom_prompt: str = None, language: str = "en"): | |
| """Start new battle with streaming responses""" | |
| # Select category | |
| if category == "random": | |
| category = random.choice(list(Category)) | |
| else: | |
| category = Category(category) | |
| # Get or set prompt | |
| if custom_prompt and custom_prompt.strip(): | |
| prompt_text = custom_prompt.strip() | |
| is_custom = True | |
| else: | |
| prompt_data = self.get_random_prompt(category, language) | |
| prompt_text = prompt_data["text"] | |
| is_custom = False | |
| # Randomly select 2 models from the 4 available | |
| models = random.sample(["GPT-5", "jetXA", "Gemini-2.5-Pro", "Claude-Opus-4.1"], 2) | |
| # Create battle structure | |
| battle = Battle( | |
| id=hashlib.md5(f"{datetime.now().isoformat()}-{random.randint(0,999999)}".encode()).hexdigest(), | |
| prompt_id=hashlib.md5(prompt_text.encode()).hexdigest(), | |
| prompt_text=prompt_text, | |
| model_a=models[0], | |
| model_b=models[1], | |
| response_a="", | |
| response_b="", | |
| winner=None, | |
| voter_id="", | |
| timestamp=datetime.now(), | |
| category=category, | |
| custom_prompt=is_custom, | |
| language=language | |
| ) | |
| self.current_battle = battle | |
| return { | |
| "prompt": prompt_text, | |
| "category": category.value, | |
| "models": models, | |
| "battle": battle | |
| } | |
| def vote(self, choice: str, voter_id: str = None): | |
| """Process vote with better error handling""" | |
| if not self.current_battle: | |
| print("❌ No active battle to vote on") | |
| return {"error": "No active battle"} | |
| # Ensure we have the complete battle data | |
| if not self.current_battle.response_a or not self.current_battle.response_b: | |
| print("⚠️ Battle responses not complete") | |
| return {"error": "Battle responses not complete"} | |
| # Set the winner | |
| self.current_battle.winner = self.current_battle.model_a if choice == "A" else self.current_battle.model_b | |
| self.current_battle.voter_id = voter_id or f"anonymous_{datetime.now().timestamp()}" | |
| print(f"🗳️ Vote recorded: {choice} -> {self.current_battle.winner}") | |
| # Save to database | |
| self.db.save_battle(self.current_battle) | |
| # Force immediate sync to HF | |
| self.db._sync_to_hf() | |
| return { | |
| "model_a": self.current_battle.model_a, | |
| "model_b": self.current_battle.model_b, | |
| "winner": self.current_battle.winner | |
| } | |
| def get_leaderboard(self, category: Optional[Category] = None): | |
| """Get leaderboard from database""" | |
| return self.db.get_leaderboard(category) | |
| # ==================== Periodic Sync Function ==================== | |
| def periodic_sync(arena): | |
| """Periodically sync to HF every 30 seconds""" | |
| while True: | |
| time.sleep(30) | |
| try: | |
| arena.db._sync_to_hf() | |
| print(f"⏰ Periodic sync completed at {datetime.now()}") | |
| except Exception as e: | |
| print(f"⏰ Periodic sync failed: {e}") | |
| # ==================== Gradio Interface ==================== | |
| def create_app(): | |
| arena = CreativityArena() | |
| # Updated CSS with pastel colors and proper markdown rendering | |
| css = """ | |
| .gradio-container { | |
| background: linear-gradient(135deg, #f5e6ff 0%, #e6f3ff 50%, #ffeef5 100%); | |
| font-family: 'Inter', sans-serif; | |
| } | |
| .main-header { | |
| background: rgba(255, 255, 255, 0.98); | |
| border-radius: 20px; | |
| padding: 2rem; | |
| text-align: center; | |
| margin-bottom: 2rem; | |
| box-shadow: 0 4px 20px rgba(150, 100, 200, 0.15); | |
| border: 1px solid rgba(200, 180, 220, 0.3); | |
| } | |
| .response-container { | |
| background: rgba(255, 255, 255, 0.95); | |
| border-radius: 15px; | |
| padding: 1.5rem; | |
| min-height: 400px; | |
| max-height: 800px; | |
| overflow-y: auto; | |
| box-shadow: 0 3px 15px rgba(150, 100, 200, 0.1); | |
| transition: transform 0.3s ease; | |
| border: 1px solid rgba(200, 180, 220, 0.2); | |
| } | |
| .response-container:hover { | |
| transform: translateY(-3px); | |
| box-shadow: 0 6px 20px rgba(150, 100, 200, 0.2); | |
| } | |
| /* Markdown specific styles */ | |
| .markdown-text { | |
| line-height: 1.6; | |
| color: #2d3748; | |
| } | |
| .markdown-text h1 { | |
| font-size: 2.5em !important; | |
| font-weight: bold; | |
| color: #6b46c1; | |
| margin-top: 1em; | |
| margin-bottom: 0.5em; | |
| border-bottom: 2px solid #e9d8fd; | |
| padding-bottom: 0.3em; | |
| } | |
| .markdown-text h2 { | |
| font-size: 2em !important; | |
| font-weight: bold; | |
| color: #805ad5; | |
| margin-top: 0.8em; | |
| margin-bottom: 0.4em; | |
| } | |
| .markdown-text h3 { | |
| font-size: 1.5em !important; | |
| font-weight: bold; | |
| color: #9f7aea; | |
| margin-top: 0.6em; | |
| margin-bottom: 0.3em; | |
| } | |
| """ | |
| with gr.Blocks(title="AI Models Battle Arena", theme=gr.themes.Soft(), css=css) as app: | |
| current_lang = gr.State(value="en") | |
| # Language change handler | |
| def update_language(lang_value): | |
| return lang_value | |
| def update_ui_text(lang): | |
| ui = UI_TEXT[lang] | |
| return ( | |
| f""" | |
| <div class="main-header"> | |
| <h1 style="color: #6b46c1; font-size: 2.5rem;">{ui['title']}</h1> | |
| <p style="color: #805ad5; font-size: 1.2rem;">{ui['subtitle']}</p> | |
| </div> | |
| """, | |
| ui['leaderboard_title'], | |
| gr.update(label=ui['category_label']), | |
| gr.update(label=ui['custom_prompt_label']), | |
| gr.update(placeholder=ui['custom_prompt_placeholder']), | |
| gr.update(value=ui['new_battle_btn']), | |
| ui['model_a'], | |
| ui['model_b'], | |
| gr.update(value=ui['vote_a']), | |
| gr.update(value=ui['vote_b']), | |
| gr.update(label=ui['category_filter']), | |
| gr.update(value=ui['refresh_btn']), | |
| gr.update(choices=[ | |
| (ui['categories']['random'], "random"), | |
| (ui['categories']['storytelling'], "storytelling"), | |
| (ui['categories']['innovation'], "innovation"), | |
| (ui['categories']['business'], "business") | |
| ]), | |
| gr.update(choices=[ | |
| (ui['filter_categories']['overall'], "overall"), | |
| (ui['filter_categories']['storytelling'], "storytelling"), | |
| (ui['filter_categories']['innovation'], "innovation"), | |
| (ui['filter_categories']['business'], "business") | |
| ]) | |
| ) | |
| # Header | |
| with gr.Row(): | |
| with gr.Column(scale=10): | |
| header_html = gr.HTML(f""" | |
| <div class="main-header"> | |
| <h1 style="color: #6b46c1; font-size: 2.5rem;">🎨 AI Models Creativity Battle Arena</h1> | |
| <p style="color: #805ad5; font-size: 1.2rem;">Test cutting-edge AI models in creative challenges</p> | |
| <p style="color: #9f7aea; font-size: 1rem;">GPT-5 vs jetXA vs Gemini 2.5 Pro vs Claude Opus 4.1</p> | |
| </div> | |
| """) | |
| with gr.Column(scale=1): | |
| language_select = gr.Dropdown( | |
| choices=[("English", "en"), ("한국어", "ko")], | |
| value="en", | |
| label="Language", | |
| interactive=True, | |
| elem_classes="category-select" | |
| ) | |
| with gr.Tabs(elem_classes="tab-nav") as tabs: | |
| # Battle Arena Tab | |
| with gr.TabItem("⚔️ Battle Arena", id="battle_tab") as battle_tab: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| category_select = gr.Dropdown( | |
| choices=[ | |
| ("🎲 Random", "random"), | |
| ("📚 Storytelling", "storytelling"), | |
| ("💡 Innovation", "innovation"), | |
| ("💼 Business", "business") | |
| ], | |
| value="random", | |
| label="Select Category", | |
| interactive=True, | |
| elem_classes="category-select" | |
| ) | |
| custom_prompt_accordion = gr.Accordion("✏️ Custom Challenge (Optional)", open=False) | |
| with custom_prompt_accordion: | |
| custom_prompt_input = gr.Textbox( | |
| label="", | |
| placeholder="Enter your creative challenge...", | |
| lines=3 | |
| ) | |
| new_battle_btn = gr.Button( | |
| "🎲 Start New Battle", | |
| variant="primary", | |
| size="lg", | |
| elem_classes="vote-button" | |
| ) | |
| with gr.Column(scale=3): | |
| prompt_display = gr.Markdown("") | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_a_label = gr.Markdown("### 🅰️ Model A") | |
| response_a = gr.Markdown( | |
| "", | |
| elem_classes=["response-container", "markdown-text"], | |
| sanitize_html=False, | |
| line_breaks=True, | |
| latex_delimiters=[ | |
| {"left": "$", "right": "$", "display": True}, | |
| {"left": "$", "right": "$", "display": False} | |
| ] | |
| ) | |
| model_a_reveal = gr.Textbox(label="Model Identity", visible=False) | |
| with gr.Column(): | |
| model_b_label = gr.Markdown("### 🅱️ Model B") | |
| response_b = gr.Markdown( | |
| "", | |
| elem_classes=["response-container", "markdown-text"], | |
| sanitize_html=False, | |
| line_breaks=True, | |
| latex_delimiters=[ | |
| {"left": "$", "right": "$", "display": True}, | |
| {"left": "$", "right": "$", "display": False} | |
| ] | |
| ) | |
| model_b_reveal = gr.Textbox(label="Model Identity", visible=False) | |
| with gr.Row(): | |
| vote_a_btn = gr.Button("🅰️ Model A is more creative", size="lg", variant="primary", elem_classes="vote-button") | |
| vote_b_btn = gr.Button("🅱️ Model B is more creative", size="lg", variant="primary", elem_classes="vote-button") | |
| vote_result = gr.Markdown("") | |
| battle_state = gr.State({}) | |
| # Leaderboard Tab | |
| with gr.TabItem("🏆 Leaderboard", id="leaderboard_tab") as leaderboard_tab: | |
| leaderboard_title = gr.Markdown("## 🏆 AI Models Leaderboard") | |
| category_filter = gr.Radio( | |
| choices=[ | |
| ("Overall", "overall"), | |
| ("Storytelling", "storytelling"), | |
| ("Innovation", "innovation"), | |
| ("Business", "business") | |
| ], | |
| value="overall", | |
| label="Category Filter", | |
| elem_classes="category-select" | |
| ) | |
| leaderboard_display = gr.Dataframe( | |
| headers=["Rank", "Model", "Overall", "Story", "Innovation", "Business", "Battles", "Win%", "ELO"], | |
| datatype=["number", "str", "number", "number", "number", "number", "number", "number", "number"] | |
| ) | |
| refresh_btn = gr.Button("🔄 Refresh", variant="secondary") | |
| # Footer | |
| footer_html = gr.HTML(""" | |
| <div class="footer"> | |
| <p>Testing GPT-5, jetXA, Gemini 2.5 Pro, and Claude Opus 4.1 in creative challenges | Contact: arxivgpt@gmail.com</p> | |
| </div> | |
| """) | |
| # Event handlers with streaming support | |
| def start_battle_stream(category, custom_prompt, lang): | |
| # Clear cache for fresh responses if needed | |
| arena.llm.clear_cache() | |
| battle_info = arena.start_new_battle_stream(category, custom_prompt, lang) | |
| ui = UI_TEXT[lang] | |
| category_display = ui["categories"].get(battle_info['category'], battle_info['category']) | |
| prompt_text = f""" | |
| {ui['challenge_task']} | |
| **{ui['category']}**: {category_display} | |
| **{ui['prompt']}**: | |
| > {battle_info['prompt']} | |
| """ | |
| # Initialize with loading state | |
| initial_response = ui['generating'] | |
| # Start streaming in separate threads | |
| response_a_queue = queue.Queue() | |
| response_b_queue = queue.Queue() | |
| response_a_final = "" | |
| response_b_final = "" | |
| done_a = False | |
| done_b = False | |
| def stream_model_a(): | |
| nonlocal response_a_final, done_a | |
| try: | |
| for chunk in arena.llm.generate_response_stream( | |
| battle_info['models'][0], | |
| battle_info['prompt'], | |
| lang | |
| ): | |
| # chunk is already accumulated text | |
| response_a_queue.put(('update', chunk)) # Add type marker | |
| response_a_final = chunk | |
| battle_info['battle'].response_a = response_a_final | |
| except Exception as e: | |
| print(f"Error in stream_model_a: {e}") | |
| response_a_final = arena.llm._generate_fallback( | |
| battle_info['models'][0], | |
| battle_info['prompt'], | |
| lang | |
| ) | |
| response_a_queue.put(('update', response_a_final)) | |
| battle_info['battle'].response_a = response_a_final | |
| finally: | |
| response_a_queue.put(('done', None)) # Signal completion | |
| done_a = True | |
| def stream_model_b(): | |
| nonlocal response_b_final, done_b | |
| try: | |
| for chunk in arena.llm.generate_response_stream( | |
| battle_info['models'][1], | |
| battle_info['prompt'], | |
| lang | |
| ): | |
| # chunk is already accumulated text | |
| response_b_queue.put(('update', chunk)) # Add type marker | |
| response_b_final = chunk | |
| battle_info['battle'].response_b = response_b_final | |
| except Exception as e: | |
| print(f"Error in stream_model_b: {e}") | |
| response_b_final = arena.llm._generate_fallback( | |
| battle_info['models'][1], | |
| battle_info['prompt'], | |
| lang | |
| ) | |
| response_b_queue.put(('update', response_b_final)) | |
| battle_info['battle'].response_b = response_b_final | |
| finally: | |
| response_b_queue.put(('done', None)) # Signal completion | |
| done_b = True | |
| thread_a = threading.Thread(target=stream_model_a) | |
| thread_b = threading.Thread(target=stream_model_b) | |
| thread_a.start() | |
| thread_b.start() | |
| # Yield updates for both responses | |
| response_a_text = initial_response | |
| response_b_text = initial_response | |
| last_update_time = time.time() | |
| stream_a_done = False | |
| stream_b_done = False | |
| while not (stream_a_done and stream_b_done): | |
| updated = False | |
| current_time = time.time() | |
| # Process all updates from model A | |
| try: | |
| while True: | |
| msg_type, content = response_a_queue.get_nowait() | |
| if msg_type == 'done': | |
| stream_a_done = True | |
| elif msg_type == 'update': | |
| response_a_text = content | |
| updated = True | |
| except queue.Empty: | |
| pass | |
| # Process all updates from model B | |
| try: | |
| while True: | |
| msg_type, content = response_b_queue.get_nowait() | |
| if msg_type == 'done': | |
| stream_b_done = True | |
| elif msg_type == 'update': | |
| response_b_text = content | |
| updated = True | |
| except queue.Empty: | |
| pass | |
| # Always yield updates more frequently for better streaming effect | |
| if updated or (current_time - last_update_time) > 0.05: # Update every 50ms | |
| yield ( | |
| prompt_text, | |
| response_a_text, | |
| response_b_text, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| "", | |
| battle_info | |
| ) | |
| last_update_time = current_time | |
| time.sleep(0.02) # Smaller sleep for more responsive updates | |
| # Final update with complete responses | |
| yield ( | |
| prompt_text, | |
| response_a_final if response_a_final else initial_response, | |
| response_b_final if response_b_final else initial_response, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| "", | |
| battle_info | |
| ) | |
| def process_vote(choice, state, lang): | |
| if not state or 'battle' not in state: | |
| print("❌ No battle in state") | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| "Error: No active battle" | |
| ) | |
| # Ensure the battle object is properly set | |
| battle_obj = state['battle'] | |
| arena.current_battle = battle_obj | |
| print(f"🎯 Processing vote: Choice={choice}, Battle ID={battle_obj.id}") | |
| # Process the vote | |
| result = arena.vote(choice) | |
| if "error" in result: | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| f"Error: {result['error']}" | |
| ) | |
| ui = UI_TEXT[lang] | |
| winner_emoji = "🏆" if result['winner'] == result['model_a'] else "🥈" | |
| loser_emoji = "🥈" if winner_emoji == "🏆" else "🏆" | |
| result_text = f""" | |
| {ui['vote_complete']} | |
| **{ui['winner']}**: {winner_emoji} **{result['winner']}** | |
| **Model A**: {result['model_a']} {winner_emoji if choice == "A" else loser_emoji} | |
| **Model B**: {result['model_b']} {winner_emoji if choice == "B" else loser_emoji} | |
| {ui['elo_updated']} | |
| """ | |
| # Debug: Check database state after vote | |
| arena.db.debug_database_state() | |
| return ( | |
| gr.update(value=result['model_a'], visible=True), | |
| gr.update(value=result['model_b'], visible=True), | |
| result_text | |
| ) | |
| def update_leaderboard(category): | |
| df = arena.get_leaderboard( | |
| Category(category) if category != "overall" else None | |
| ) | |
| return df[['rank', 'model_name', 'overall_score', 'storytelling_score', | |
| 'innovation_score', 'business_score', 'total_battles', 'win_rate', 'elo_rating']] | |
| # Update UI when language changes | |
| language_select.change( | |
| fn=update_language, | |
| inputs=[language_select], | |
| outputs=[current_lang] | |
| ).then( | |
| fn=update_ui_text, | |
| inputs=[current_lang], | |
| outputs=[ | |
| header_html, | |
| leaderboard_title, | |
| category_select, | |
| custom_prompt_accordion, | |
| custom_prompt_input, | |
| new_battle_btn, | |
| model_a_label, | |
| model_b_label, | |
| vote_a_btn, | |
| vote_b_btn, | |
| category_filter, | |
| refresh_btn, | |
| category_select, | |
| category_filter | |
| ] | |
| ) | |
| # Connect events with streaming | |
| new_battle_btn.click( | |
| fn=start_battle_stream, | |
| inputs=[category_select, custom_prompt_input, current_lang], | |
| outputs=[prompt_display, response_a, response_b, model_a_reveal, model_b_reveal, vote_result, battle_state] | |
| ) | |
| vote_a_btn.click( | |
| fn=lambda s, l: process_vote("A", s, l), | |
| inputs=[battle_state, current_lang], | |
| outputs=[model_a_reveal, model_b_reveal, vote_result] | |
| ) | |
| vote_b_btn.click( | |
| fn=lambda s, l: process_vote("B", s, l), | |
| inputs=[battle_state, current_lang], | |
| outputs=[model_a_reveal, model_b_reveal, vote_result] | |
| ) | |
| category_filter.change( | |
| fn=update_leaderboard, | |
| inputs=[category_filter], | |
| outputs=[leaderboard_display] | |
| ) | |
| refresh_btn.click( | |
| fn=update_leaderboard, | |
| inputs=[category_filter], | |
| outputs=[leaderboard_display] | |
| ) | |
| # Initialize on load | |
| app.load( | |
| fn=lambda: update_leaderboard("overall"), | |
| outputs=[leaderboard_display] | |
| ) | |
| return app | |
| # ==================== Main ==================== | |
| if __name__ == "__main__": | |
| print("="*50) | |
| print("🚀 AI Models Creativity Battle Arena") | |
| print("="*50) | |
| print("\n📋 Environment Setup:") | |
| print("1. Set OPENAI_API_KEY for GPT-5") | |
| print("2. Set GEMINI_API_KEY for Gemini 2.5 Pro") | |
| print("3. Set ANTHROPIC_API_KEY for Claude Opus 4.1") | |
| print("4. jetXA will use 'aiqtech/tests' by default") | |
| print("5. Set HF_TOKEN for persistent data storage (REQUIRED)") | |
| print("6. Optional: Set HF_DATASET_NAME (default: ai_models_arena)") | |
| print("\n⚠️ Without HF_TOKEN, data will be lost on server restart!") | |
| print("\n" + "="*50 + "\n") | |
| # Check for required API keys | |
| if not os.getenv("HF_TOKEN"): | |
| print("⚠️ WARNING: HF_TOKEN not set - data will not persist!") | |
| print("Set it with: export HF_TOKEN='your_token_here'") | |
| print("") | |
| if not os.getenv("OPENAI_API_KEY"): | |
| print("⚠️ GPT-5: No API key found - will use fallback responses") | |
| if not os.getenv("GEMINI_API_KEY"): | |
| print("⚠️ Gemini: No API key found - will use fallback responses") | |
| if not os.getenv("ANTHROPIC_API_KEY"): | |
| print("⚠️ Claude: No API key found - will use fallback responses") | |
| print("\n🎯 Starting arena with 4 models: GPT-5, jetXA, Gemini 2.5 Pro, Claude Opus 4.1") | |
| print("="*50 + "\n") | |
| # Create app | |
| app = create_app() | |
| # Start periodic sync in background (optional) | |
| arena = CreativityArena() | |
| sync_thread = threading.Thread(target=lambda: periodic_sync(arena), daemon=True) | |
| sync_thread.start() | |
| print("✅ Background sync thread started (every 30 seconds)") | |
| # Launch app | |
| app.launch() | |