import gradio as gr import json import requests import random import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple import threading from dataclasses import dataclass, field import hashlib import sqlite3 import base64 from io import BytesIO import os # Hugging Face Imports from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from diffusers import StableDiffusionPipeline import torch from huggingface_hub import InferenceApi # Model configurations TEXT_GENERATION_MODELS = { "Qwen2.5-3B-Instruct": { "model_id": "Qwen/Qwen2.5-3B-Instruct", "description": "Efficient instruction-following model", "max_tokens": 512, "temperature": 0.7 }, "Mistral-7B-Instruct": { "model_id": "mistralai/Mistral-7B-Instruct-v0.2", "description": "Popular conversational model", "max_tokens": 1024, "temperature": 0.8 }, "Llama-3.2-1B-Instruct": { "model_id": "meta-llama/Llama-3.2-1B-Instruct", "description": "Meta's small efficient model", "max_tokens": 512, "temperature": 0.6 }, "GPT2": { "model_id": "openai-community/gpt2", "description": "Classic text generation", "max_tokens": 256, "temperature": 0.9 } } IMAGE_GENERATION_MODELS = { "FLUX.1-dev": { "model_id": "black-forest-labs/FLUX.1-dev", "description": "High quality image generation" }, "Stable-Diffusion-XL": { "model_id": "stabilityai/stable-diffusion-xl-base-1.0", "description": "Reliable stable diffusion" }, "Z-Image-Turbo": { "model_id": "Tongyi-MAI/Z-Image-Turbo", "description": "Fast image generation" } } # Database Setup class DatabaseManager: def __init__(self, db_path="cult_simulator.db"): self.db_path = db_path self.init_database() def init_database(self): """Initialize SQLite database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create tables cursor.execute(''' CREATE TABLE IF NOT EXISTS personalities ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, personality_type TEXT, avatar_prompt TEXT, avatar_image BLOB, traits TEXT, -- JSON background_story TEXT, system_prompt TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, personality_id INTEGER, message_content TEXT, context TEXT, -- JSON response_model TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (personality_id) REFERENCES personalities (id) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS webhooks ( id INTEGER PRIMARY KEY AUTOINCREMENT, personality_id INTEGER, webhook_url TEXT, discord_channel_id TEXT, is_active BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (personality_id) REFERENCES personalities (id) ) ''') conn.commit() conn.close() def save_personality(self, name, personality_type, avatar_prompt, avatar_blob, traits, background_story, system_prompt): """Save personality to database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO personalities (name, personality_type, avatar_prompt, avatar_image, traits, background_story, system_prompt) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (name, personality_type, avatar_prompt, avatar_blob, json.dumps(traits), background_story, system_prompt)) personality_id = cursor.lastrowid conn.commit() conn.close() return personality_id def save_conversation(self, personality_id, message_content, context, response_model): """Save conversation to database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO conversations (personality_id, message_content, context, response_model) VALUES (?, ?, ?, ?) ''', (personality_id, message_content, json.dumps(context), response_model)) conn.commit() conn.close() def get_personalities(self): """Get all personalities""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT * FROM personalities ORDER BY created_at DESC') rows = cursor.fetchall() conn.close() return rows def get_personalities_with_webhooks(self): """Get personalities with their webhooks""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT p.*, w.webhook_url, w.discord_channel_id, w.is_active FROM personalities p LEFT JOIN webhooks w ON p.id = w.personality_id ORDER BY p.created_at DESC ''') rows = cursor.fetchall() conn.close() return rows class HuggingFaceModelManager: """Manage Hugging Face models for text and image generation""" def __init__(self): self.text_models = {} self.image_models = {} self.loaded_models = {} def load_text_model(self, model_key): """Load a text generation model""" if model_key not in self.loaded_models: config = TEXT_GENERATION_MODELS[model_key] try: self.loaded_models[model_key] = pipeline( "text-generation", model=config["model_id"], torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None ) print(f"โ Loaded {model_key}") except Exception as e: print(f"โ Error loading {model_key}: {e}") return None return self.loaded_models[model_key] def generate_text(self, model_key, prompt, max_length=200): """Generate text using specified model""" model = self.load_text_model(model_key) if not model: return f"Error: Could not load model {model_key}" try: config = TEXT_GENERATION_MODELS[model_key] result = model( prompt, max_new_tokens=max_length, temperature=config["temperature"], do_sample=True, pad_token_id=model.tokenizer.eos_token_id ) generated_text = result[0]['generated_text'] # Remove the prompt from the response if generated_text.startswith(prompt): generated_text = generated_text[len(prompt):].strip() return generated_text except Exception as e: return f"Error generating text: {e}" def generate_avatar(self, model_key, personality_description): """Generate avatar using image model""" config = IMAGE_GENERATION_MODELS[model_key] # Create prompt for avatar avatar_prompt = f""" Portrait of {personality_description}, professional headshot, realistic style, soft lighting, detailed facial features, professional attire, clean background, high quality """ try: # Use Hugging Face Inference API for image generation # (Simpler than loading local models) inference = InferenceApi(repo_id=config["model_id"], token=os.getenv("HF_TOKEN")) # For demonstration, we'll return a placeholder URL # In production, you'd call the actual inference avatar_url = f"https://image.pollinations.ai/prompt/{avatar_prompt.replace(' ', '%20')}" return avatar_url except Exception as e: return f"https://api.dicebear.com/7.x/avataaars/svg?seed={hash(personality_description)}" class PersonalityGenerator: """Generate AI personalities using language models""" def __init__(self, model_manager): self.model_manager = model_manager def generate_personality_traits(self, model_key, context=""): """Generate personality traits using AI""" prompt = f""" Generate a detailed personality profile for a fictional character. Include specific personality traits, communication style, background story. {context} Format as JSON: {{ "name": "Character name", "traits": {{ "welcoming": 0.8, "empathetic": 0.7, "cautious": 0.3, "enthusiastic": 0.6, "mysterious": 0.4 }}, "background_story": "Brief background", "communication_style": "How they talk", "description": "Physical appearance description" }} """ response = self.model_manager.generate_text(model_key, prompt, max_length=300) try: # Extract JSON from response import re json_match = re.search(r'\\{.*\\}', response, re.DOTALL) if json_match: return json.loads(json_match.group()) except: pass # Fallback to basic personality return { "name": f"AI_Personality_{random.randint(1000, 9999)}", "traits": { "welcoming": random.uniform(0.5, 1.0), "empathetic": random.uniform(0.3, 0.9), "cautious": random.uniform(0.2, 0.8), "enthusiastic": random.uniform(0.4, 1.0), "mysterious": random.uniform(0.1, 0.7) }, "background_story": "An AI-generated personality created for social simulation.", "communication_style": "Friendly and engaging", "description": "A unique AI-generated character" } def generate_system_prompt(self, personality_data, context=""): """Generate dynamic system prompt for AI personality""" traits_desc = [] for trait, value in personality_data["traits"].items(): if value > 0.7: traits_desc.append(f"very {trait}") elif value > 0.4: traits_desc.append(f"somewhat {trait}") prompt = f""" You are {personality_data["name"]}, a character in a social simulation. Your personality: {', '.join(traits_desc)}. Background: {personality_data["background_story"]} Communication style: {personality_data["communication_style"]} {context} Respond naturally as this character would, maintaining your personality traits. Be engaging but authentic to your character. """ return prompt.strip() class CultSimulatorApp: """Main application class""" def __init__(self): self.db = DatabaseManager() self.model_manager = HuggingFaceModelManager() self.personality_generator = PersonalityGenerator(self.model_manager) self.active_personalities = [] self.simulation_running = False def create_personality(self, name, model_key, context=""): """Create a new AI personality""" if not name: name = f"AI_Character_{random.randint(1000, 9999)}" # Generate personality traits personality_data = self.personality_generator.generate_personality_traits(model_key, context) personality_data["name"] = name # Generate system prompt system_prompt = self.personality_generator.generate_system_prompt(personality_data) # Generate avatar avatar_url = self.model_manager.generate_avatar("FLUX.1-dev", personality_data["description"]) # Save to database personality_id = self.db.save_personality( name=name, personality_type=model_key, avatar_prompt=personality_data["description"], avatar_blob=avatar_url.encode(), # Store URL as bytes for demo traits=personality_data["traits"], background_story=personality_data["background_story"], system_prompt=system_prompt ) personality_data["id"] = personality_id personality_data["avatar_url"] = avatar_url personality_data["system_prompt"] = system_prompt self.active_personalities.append(personality_data) return personality_data def generate_response(self, personality_id, message, model_key, context=""): """Generate response from AI personality""" # Get personality data personality = next((p for p in self.active_personalities if p.get("id") == personality_id), None) if not personality: return "Personality not found" # Create full prompt full_prompt = f"{personality['system_prompt']}\n\nUser message: {message}\n\nResponse as {personality['name']}:" # Generate response response = self.model_manager.generate_text(model_key, full_prompt, max_length=150) # Save conversation self.db.save_conversation(personality_id, message, {"context": context}, model_key) return response def send_webhook_message(self, webhook_url, content, username, avatar_url): """Send message via Discord webhook""" try: data = { "content": content, "username": username, "avatar_url": avatar_url } response = requests.post(webhook_url, json=data, timeout=10) return response.status_code == 204 except Exception as e: print(f"Webhook error: {e}") return False def simulate_conversation(self, trigger_message="", model_key="Qwen2.5-3B-Instruct", participants=None): """Simulate conversation between AI personalities""" if not participants: participants = random.sample(self.active_personalities, min(3, len(self.active_personalities))) if len(participants) < 2: return ["Need at least 2 personalities for conversation"] conversation_log = [] # Start conversation starter = random.choice(participants) starter_response = self.generate_response(starter["id"], trigger_message or "Start a conversation", model_key) conversation_log.append(f"**{starter['name']}**: {starter_response}") # Send webhook if available if hasattr(starter, 'webhook_url'): self.send_webhook_message(starter['webhook_url'], starter_response, starter['name'], starter['avatar_url']) time.sleep(1) # Other participants respond for participant in participants: if participant != starter and random.random() > 0.3: context = f"Responding to: {starter_response}" response = self.generate_response(participant["id"], "What do you think about that?", model_key, context) conversation_log.append(f"**{participant['name']}**: {response}") # Send webhook if available if hasattr(participant, 'webhook_url'): self.send_webhook_message(participant['webhook_url'], response, participant['name'], participant['avatar_url']) time.sleep(random.uniform(0.5, 2)) return conversation_log def get_personalities_display(self): """Get formatted display of all personalities""" personalities = self.db.get_personalities() if not personalities: return "No personalities created yet" display = "## AI Personalities Database\n\n" for personality in personalities: id, name, personality_type, avatar_prompt, _, traits_json, background, system_prompt, created_at = personality try: traits = json.loads(traits_json) if traits_json else {} display += f"### {name}\n" display += f"- **Type**: {personality_type}\n" display += f"- **Created**: {created_at}\n" display += f"- **Traits**: {', '.join([f'{k}: {v:.2f}' for k, v in traits.items()])}\n" display += f"- **Background**: {background[:100]}...\n\n" except Exception as e: display += f"### {name}\nError loading personality data\n\n" return display # Initialize application app = CultSimulatorApp() def create_gradio_interface(): """Create the Gradio interface""" with gr.Blocks(title="๐ค Hugging Face Cult Simulator", theme=gr.themes.Soft()) as interface: gr.HTML("""
Generate unique AI personalities using state-of-the-art models and watch them interact