Spaces:
Sleeping
Sleeping
| # app.py | |
| import gradio as gr | |
| import wikipedia | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import time | |
| from datetime import datetime, timedelta | |
| from gtts import gTTS | |
| from langdetect import detect | |
| from pydub import AudioSegment | |
| from pydub.silence import split_on_silence | |
| import speech_recognition as sr | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import pipeline | |
| import re | |
| import torch | |
| # --- USER MANAGEMENT SYSTEM --- | |
| class UserManager: | |
| def __init__(self): | |
| self.user_data = {} | |
| self.max_warnings = 1 | |
| self.block_duration = timedelta(days=30) | |
| def get_user_status(self, user_id): | |
| if user_id not in self.user_data: | |
| return "active" | |
| if self.user_data[user_id].get('permanently_banned', False): | |
| return "banned" | |
| if 'blocked_until' in self.user_data[user_id]: | |
| if datetime.now() < self.user_data[user_id]['blocked_until']: | |
| return "blocked" | |
| del self.user_data[user_id]['blocked_until'] | |
| return "active" | |
| def add_warning(self, user_id, violation_type): | |
| if user_id not in self.user_data: | |
| self.user_data[user_id] = {'warnings': 1, 'flags': [violation_type]} | |
| else: | |
| self.user_data[user_id]['warnings'] += 1 | |
| self.user_data[user_id]['flags'].append(violation_type) | |
| if self.user_data[user_id]['warnings'] > self.max_warnings: | |
| self.user_data[user_id]['blocked_until'] = datetime.now() + self.block_duration | |
| return "blocked" | |
| return "warned" | |
| user_manager = UserManager() | |
| # --- MODEL INITIALIZATION --- | |
| def load_models(): | |
| models = { | |
| 'translator': pipeline('translation', model='Helsinki-NLP/opus-mt-mul-en'), | |
| 'answer_gen': pipeline('text2text-generation', model='google/flan-t5-base'), | |
| 'encoder': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'), | |
| 'toxic-bert': pipeline("text-classification", model="unitary/toxic-bert"), | |
| 'roberta-hate': pipeline("text-classification", model="facebook/roberta-hate-speech-dynabench-r4-target") | |
| } | |
| for lang in ['fr', 'ar', 'zh', 'es']: | |
| models[f'en_to_{lang}'] = pipeline(f'translation_en_to_{lang}', model=f'Helsinki-NLP/opus-mt-en-{lang}') | |
| return models | |
| models = load_models() | |
| # --- UNIVERSAL HATE SPEECH DETECTION --- | |
| class HateSpeechDetector: | |
| def __init__(self): | |
| self.keyword_banks = { | |
| 'racial': ['nigger', 'chink', 'spic', 'kike', 'gook', 'wetback'], | |
| 'gender': ['fag', 'dyke', 'tranny', 'whore', 'slut', 'bitch'], | |
| 'violence': ['kill', 'murder', 'harm', 'hurt', 'abuse', 'torture'], | |
| 'general': ['scum', 'vermin', 'subhuman', 'untermensch'] | |
| } | |
| self.patterns = [ | |
| (r'\b(all|every)\s\w+\s(should|must)\s(die|burn)', 'group violence'), | |
| (r'\b(how to|ways? to)\s(kill|harm|hurt)', 'harm instructions'), | |
| (r'[!@#$%^&*]igg[!@#$%^&*]', 'coded racial slur') | |
| ] | |
| def detect(self, text): | |
| text_lower = text.lower() | |
| violations = [] | |
| # Keyword detection | |
| for category, keywords in self.keyword_banks.items(): | |
| found = [kw for kw in keywords if kw in text_lower] | |
| if found: | |
| violations.append(f"{category} terms: {', '.join(found[:3])}") | |
| # Pattern detection | |
| for pattern, desc in self.patterns: | |
| if re.search(pattern, text_lower): | |
| violations.append(f"pattern: {desc}") | |
| # Model detection | |
| try: | |
| toxic_result = models['toxic-bert'](text)[0] | |
| if toxic_result['label'].lower() in ['toxic', 'hate'] and toxic_result['score'] > 0.7: | |
| violations.append(f"toxic-bert: {toxic_result['label']} ({toxic_result['score']:.2f})") | |
| hate_result = models['roberta-hate'](text)[0] | |
| if hate_result['label'].lower() in ['hate', 'offensive'] and hate_result['score'] > 0.7: | |
| violations.append(f"roberta-hate: {hate_result['label']} ({hate_result['score']:.2f})") | |
| except Exception as e: | |
| print(f"Model error: {e}") | |
| return violations if violations else None | |
| hate_detector = HateSpeechDetector() | |
| # --- RESPONSE GENERATION --- | |
| def generate_response(text, topic, lang): | |
| try: | |
| wikipedia.set_lang('en') | |
| try: | |
| page = wikipedia.page(topic, auto_suggest=False) | |
| context = page.summary[:1000] | |
| except wikipedia.exceptions.DisambiguationError as e: | |
| page = wikipedia.page(e.options[0]) | |
| context = page.summary[:1000] | |
| except Exception as e: | |
| print(f"Wikipedia error: {e}") | |
| return "Could not find information. Please try another topic.", None | |
| prompt = f"Context: {context}\nQuestion: {text}\nAnswer:" | |
| answer = models['answer_gen'](prompt, max_length=200)[0]['generated_text'] | |
| translated = translate(answer, 'en', lang) if lang != 'en' else answer | |
| audio_path = text_to_speech(translated, lang) | |
| return translated, audio_path | |
| # --- WARNING MESSAGES --- | |
| def create_warning_message(violations): | |
| return gr.HTML(f""" | |
| <div style=' | |
| border: 2px solid #ff0000; | |
| border-radius: 5px; | |
| padding: 10px; | |
| background-color: #fff0f0; | |
| margin: 10px 0; | |
| '> | |
| <div style='color: #ff0000; font-weight: bold;'> | |
| β οΈ WARNING: Violation Detected | |
| </div> | |
| <div style='margin-top: 8px;'> | |
| Your message contains prohibited content | |
| </div> | |
| <div style='margin-top: 8px; font-size: 0.9em;'> | |
| <b>Reason:</b> {', '.join(violations[:2])} | |
| </div> | |
| </div> | |
| """) | |
| def create_blocked_message(): | |
| return gr.HTML(""" | |
| <div style=' | |
| border: 2px solid #990000; | |
| border-radius: 5px; | |
| padding: 10px; | |
| background-color: #ffebee; | |
| '> | |
| β ACCOUNT TEMPORARILY SUSPENDED | |
| </div> | |
| """) | |
| # --- MAIN HANDLER --- | |
| def handle_interaction(audio, text, topic, lang, chat_history, request: gr.Request): | |
| user_id = request.client.host if request else "default_user" | |
| status = user_manager.get_user_status(user_id) | |
| if status == "banned": | |
| return chat_history.append(("", "β Account permanently banned")), "", None | |
| if status == "blocked": | |
| return chat_history.append(("", create_blocked_message())), "", None | |
| if audio: | |
| text = process_audio(audio) or text | |
| if not text.strip(): | |
| return chat_history.append(("", "βοΈ Please enter a question")), "", None | |
| violations = hate_detector.detect(text) | |
| if violations: | |
| action = user_manager.add_warning(user_id, violations[0]) | |
| if action == "warned": | |
| chat_history.append((text, create_warning_message(violations))) | |
| elif action == "blocked": | |
| chat_history.append(("", create_blocked_message())) | |
| return chat_history, "", None | |
| response, audio_output = generate_response(text, topic, lang) | |
| chat_history.append((text, response)) | |
| return chat_history, "", audio_output | |
| # --- AUDIO PROCESSING --- | |
| def process_audio(audio_path): | |
| recognizer = sr.Recognizer() | |
| sound = AudioSegment.from_file(audio_path) | |
| chunks = split_on_silence(sound, min_silence_len=500, silence_thresh=sound.dBFS-14) | |
| full_text = "" | |
| for chunk in chunks: | |
| with tempfile.NamedTemporaryFile(suffix='.wav') as f: | |
| chunk.export(f.name, format="wav") | |
| with sr.AudioFile(f.name) as source: | |
| audio = recognizer.record(source) | |
| try: full_text += recognizer.recognize_google(audio) + " " | |
| except: continue | |
| return full_text.strip() | |
| def text_to_speech(text, lang): | |
| try: | |
| tts = gTTS(text=text, lang=lang) | |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: | |
| tts.save(f.name) | |
| return f.name | |
| except Exception as e: | |
| print(f"TTS Error: {e}") | |
| return None | |
| def translate(text, src, tgt): | |
| if src == tgt: return text | |
| if src != 'en': text = models['translator'](text)[0]['translation_text'] | |
| if f'en_to_{tgt}' in models: return models[f'en_to_{tgt}'](text)[0]['translation_text'] | |
| return text | |
| # --- INTERACTIVE DESCRIPTION --- | |
| description_html = """ | |
| <div style="font-family: 'Arial', sans-serif; max-width: 800px; margin: 0 auto;"> | |
| <div style="text-align: center; margin-bottom: 30px;"> | |
| <img src="https://i.imgur.com/6wBs5mO.png" style="width: 120px; height: 120px; border-radius: 50%; border: 3px solid #00008b;"> | |
| <h1 style="color: #00008b; margin-top: 15px;">π Multilingual AI Assistant</h1> | |
| <p style="color: #555;">Powered by Transformers and Gradio</p> | |
| </div> | |
| <div style="background-color: #e6f2ff; padding: 25px; border-radius: 10px; border: 2px solid #00008b; margin-bottom: 20px;"> | |
| <h2 style="color: #00008b; margin-top: 0;">β¨ Features</h2> | |
| <div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px;"> | |
| <div style="background: white; padding: 15px; border-radius: 8px;"> | |
| <h3 style="margin-top: 0;">π Wikipedia Knowledge</h3> | |
| <p>Answers questions using Wikipedia content</p> | |
| </div> | |
| <div style="background: white; padding: 15px; border-radius: 8px;"> | |
| <h3 style="margin-top: 0;">π£οΈ Voice Interaction</h3> | |
| <p>Speak or type your questions</p> | |
| </div> | |
| <div style="background: white; padding: 15px; border-radius: 8px;"> | |
| <h3 style="margin-top: 0;">π 5 Languages</h3> | |
| <p>English, French, Spanish, Chinese, Arabic</p> | |
| </div> | |
| <div style="background: white; padding: 15px; border-radius: 8px;"> | |
| <h3 style="margin-top: 0;">π‘οΈ Content Moderation</h3> | |
| <p>Automated hate speech detection</p> | |
| </div> | |
| </div> | |
| </div> | |
| <div style="background-color: #fff0f0; padding: 25px; border-radius: 10px; border: 2px solid #ff0000; margin-bottom: 20px;"> | |
| <h2 style="color: #ff0000; margin-top: 0;">π« Restricted Content</h2> | |
| <ul> | |
| <li>Hate speech or discrimination</li> | |
| <li>Violent or harmful content</li> | |
| <li>Personal/medical/legal advice</li> | |
| </ul> | |
| </div> | |
| </div> | |
| """ | |
| # --- GRADIO INTERFACE --- | |
| with gr.Blocks(title="π Multilingual AI Assistant") as demo: | |
| gr.HTML(description_html) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="π€ Speak or upload audio") | |
| topic_input = gr.Textbox("Artificial Intelligence", label="π Wikipedia Topic") | |
| lang_input = gr.Dropdown(["en", "fr", "es", "zh", "ar"], value="en", label="π Output Language") | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Conversation") | |
| text_input = gr.Textbox(placeholder="Type your question...", label="βοΈ Or type here") | |
| with gr.Row(): | |
| clear_btn = gr.Button("ποΈ Clear Chat") | |
| submit_btn = gr.Button("π Submit", variant="primary") | |
| audio_output = gr.Audio(label="π Answer", visible=True) | |
| submit_btn.click( | |
| handle_interaction, | |
| inputs=[audio_input, text_input, topic_input, lang_input, chatbot], | |
| outputs=[chatbot, text_input, audio_output] | |
| ) | |
| text_input.submit( | |
| handle_interaction, | |
| inputs=[audio_input, text_input, topic_input, lang_input, chatbot], | |
| outputs=[chatbot, text_input, audio_output] | |
| ) | |
| clear_btn.click(lambda: ([], "", None), outputs=[chatbot, text_input, audio_output]) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |