import gradio as gr from sentence_transformers import SentenceTransformer, util import torch import logging import re import os from typing import List, Tuple, Dict import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import json from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize models try: logger.info("Loading Arabic language model...") # Using a more robust Arabic model model = SentenceTransformer( "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", device="cuda" if torch.cuda.is_available() else "cpu" ) logger.info(f"Model loaded on {model.device}") except Exception as e: logger.error(f"Model loading failed: {str(e)}") raise RuntimeError("Failed to initialize the AI model") # Initialize Arabic LLM for text generation and rephrasing try: logger.info("Loading Arabic LLM for text generation...") # Using ArabianGPT for Arabic text generation llm_model_name = "riotu-lab/ArabianGPT-01B" # Load tokenizer and model llm_tokenizer = AutoTokenizer.from_pretrained(llm_model_name) llm_model = AutoModelForCausalLM.from_pretrained( llm_model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None ) # Create text generation pipeline text_generator = pipeline( "text-generation", model=llm_model, tokenizer=llm_tokenizer, max_length=512, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=llm_tokenizer.eos_token_id ) logger.info("Arabic LLM loaded successfully") llm_available = True except Exception as e: logger.warning(f"LLM loading failed: {str(e)}. Falling back to basic response generation.") text_generator = None llm_available = False # Initialize TF-IDF for hybrid search tfidf_vectorizer = TfidfVectorizer( max_features=1000, stop_words=None, # Keep Arabic stop words ngram_range=(1, 2), analyzer='word' ) class KnowledgeBase: def __init__(self): self.chunks = [] self.embeddings = None self.tfidf_matrix = None self.section_mapping = {} def load_and_process_knowledge(self) -> None: """Enhanced knowledge loading with better chunking strategy""" try: knowledge_file = "knowledge.txt" if not os.path.exists(knowledge_file): raise FileNotFoundError(f"{knowledge_file} file not found") with open(knowledge_file, "r", encoding="utf-8") as f: content = f.read().strip() if not content: raise ValueError(f"{knowledge_file} is empty") sections = {} current_section = "معلومات عامة" with open(knowledge_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line.startswith("## "): current_section = line[3:].strip() sections[current_section] = [] elif line and current_section: sections[current_section].append(line) # Enhanced chunking strategy self.chunks = [] chunk_id = 0 for section, content_list in sections.items(): section_text = " ".join(content_list) # Split into meaningful chunks while preserving context if len(section_text) <= 200: # Small sections: keep as single chunk chunk_text = f"{section}: {section_text}" self.chunks.append(chunk_text) self.section_mapping[chunk_id] = section chunk_id += 1 else: # Large sections: split intelligently sentences = re.split(r'(?<=[.!?\n])\s+', section_text) current_chunk = "" for sent in sentences: sent = sent.strip() if not sent: continue # Check if adding this sentence exceeds optimal chunk size if len(current_chunk) + len(sent) < 180: current_chunk += " " + sent if current_chunk else sent else: if current_chunk: chunk_text = f"{section}: {current_chunk}" self.chunks.append(chunk_text) self.section_mapping[chunk_id] = section chunk_id += 1 current_chunk = sent # Add remaining chunk if current_chunk: chunk_text = f"{section}: {current_chunk}" self.chunks.append(chunk_text) self.section_mapping[chunk_id] = section chunk_id += 1 # Generate embeddings self.embeddings = model.encode(self.chunks, convert_to_tensor=True) # Generate TF-IDF matrix for hybrid search self.tfidf_matrix = tfidf_vectorizer.fit_transform(self.chunks) logger.info(f"Loaded {len(self.chunks)} knowledge chunks from {len(sections)} sections") except Exception as e: logger.error(f"Knowledge loading error: {str(e)}") self.chunks = ["عام: النظام جاهز للرد على استفساراتك. يرجى طرح سؤالك."] self.embeddings = model.encode(self.chunks, convert_to_tensor=True) self.tfidf_matrix = tfidf_vectorizer.fit_transform(self.chunks) # Initialize knowledge base kb = KnowledgeBase() kb.load_and_process_knowledge() class ArabicQueryProcessor: def __init__(self): # Enhanced Arabic text normalization patterns self.normalization_patterns = { # Normalize Arabic characters r'[أإآا]': 'ا', r'[ىي]': 'ي', r'[ؤو]': 'و', r'[ةه]': 'ه', # Question word normalization r'\bماهي\b': 'ما هي', r'\bماهو\b': 'ما هو', r'\bكيفية\b': 'كيف', r'\bطريقة\b': 'كيف', r'\bاريد\b': 'كيف يمكن', r'\bعاوز\b': 'كيف يمكن', r'\bعايز\b': 'كيف يمكن', r'\bازاي\b': 'كيف', r'\bايه\b': 'ما', r'\bمين\b': 'من', r'\bفين\b': 'أين', r'\bامتى\b': 'متى', # Common variations r'\bالموازنه\b': 'الموازنة', r'\bالشفافيه\b': 'الشفافية', r'\bالمشاركه\b': 'المشاركة', } # Question type classification self.question_types = { 'definition': [r'\bما هي\b', r'\bما هو\b', r'\bتعريف\b', r'\bمعنى\b'], 'how': [r'\bكيف\b', r'\bكيفية\b', r'\bطريقة\b'], 'why': [r'\bلماذا\b', r'\bليه\b', r'\bسبب\b'], 'who': [r'\bمن\b', r'\bمين\b'], 'when': [r'\bمتى\b', r'\bامتى\b'], 'where': [r'\bأين\b', r'\bفين\b'], 'list': [r'\bاذكر\b', r'\bعدد\b', r'\bقائمة\b', r'\bأنواع\b'] } def normalize_text(self, text: str) -> str: """Apply comprehensive Arabic text normalization""" text = text.strip() # Apply normalization patterns for pattern, replacement in self.normalization_patterns.items(): text = re.sub(pattern, replacement, text) # Remove extra whitespace and punctuation text = re.sub(r'[؟\?،,\.]+', '', text) text = re.sub(r'\s+', ' ', text) return text.strip() def classify_question_type(self, question: str) -> str: """Classify the type of question to improve response generation""" question_lower = question.lower() for q_type, patterns in self.question_types.items(): for pattern in patterns: if re.search(pattern, question_lower): return q_type return 'general' def extract_keywords(self, question: str) -> List[str]: """Extract key terms from the question for better matching""" # Remove common question words and focus on content words stop_words = { 'ما', 'هي', 'هو', 'كيف', 'لماذا', 'متى', 'أين', 'من', 'في', 'على', 'إلى', 'عن', 'مع', 'هذا', 'هذه', 'ذلك', 'تلك', 'التي', 'الذي', 'يمكن', 'يجب' } words = question.split() keywords = [word for word in words if word not in stop_words and len(word) > 2] return keywords # Initialize query processor query_processor = ArabicQueryProcessor() class HybridRetriever: def __init__(self, kb: KnowledgeBase, alpha: float = 0.7): self.kb = kb self.alpha = alpha # Weight for semantic similarity vs TF-IDF def retrieve(self, question: str, top_k: int = 5) -> List[Tuple[str, float, str]]: """Hybrid retrieval combining semantic and lexical matching""" try: # Semantic search using sentence transformers question_embedding = model.encode(question, convert_to_tensor=True) semantic_scores = util.cos_sim(question_embedding, self.kb.embeddings)[0] # Lexical search using TF-IDF question_tfidf = tfidf_vectorizer.transform([question]) lexical_scores = cosine_similarity(question_tfidf, self.kb.tfidf_matrix)[0] # Combine scores combined_scores = [] for i in range(len(self.kb.chunks)): semantic_score = semantic_scores[i].item() lexical_score = lexical_scores[i] # Weighted combination combined_score = self.alpha * semantic_score + (1 - self.alpha) * lexical_score combined_scores.append((i, combined_score, semantic_score)) # Sort by combined score combined_scores.sort(key=lambda x: x[1], reverse=True) # Return top results with minimum threshold results = [] for idx, combined_score, semantic_score in combined_scores[:top_k]: if combined_score > 0.3: # Adjusted threshold chunk = self.kb.chunks[idx] section = self.kb.section_mapping.get(idx, "عام") results.append((chunk, combined_score, section)) logger.info(f"Retrieved {len(results)} relevant chunks (top score: {combined_scores[0][1]:.3f})") return results except Exception as e: logger.error(f"Retrieval failed: {str(e)}") return [] # Initialize retriever retriever = HybridRetriever(kb) class ResponseGenerator: def __init__(self): self.response_templates = { 'definition': { 'icon': 'التعريف', 'title': 'التعريف والمفهوم', 'structure': 'definition' }, 'how': { 'icon': 'الآلية', 'title': 'الآلية والطريقة', 'structure': 'process' }, 'why': { 'icon': 'الأسباب', 'title': 'الأسباب والمبررات', 'structure': 'reasons' }, 'who': { 'icon': 'الأشخاص', 'title': 'الأشخاص والجهات', 'structure': 'entities' }, 'when': { 'icon': 'التوقيت', 'title': 'التوقيت والمراحل', 'structure': 'timeline' }, 'list': { 'icon': 'القائمة', 'title': 'القائمة والعناصر', 'structure': 'list' }, 'general': { 'icon': 'معلومات', 'title': 'معلومات عامة', 'structure': 'general' } } def generate_response(self, question: str, retrieved_chunks: List[Tuple[str, float, str]], question_type: str) -> str: """Generate professionally formatted Arabic responses with LLM enhancement""" try: if not retrieved_chunks: return self._generate_fallback_response(question) # Group chunks by section sections = {} for chunk, score, section in retrieved_chunks: if section not in sections: sections[section] = [] sections[section].append((chunk, score)) # Get template info template_info = self.response_templates.get(question_type, self.response_templates['general']) # Extract raw content for LLM processing raw_content = self._extract_raw_content(sections) # Use LLM to enhance and rephrase the response if available if llm_available and raw_content: enhanced_response = self._generate_llm_enhanced_response(question, raw_content, template_info) if enhanced_response: return enhanced_response # Fallback to original response generation response = self._build_response_header(question, template_info) response += self._build_main_content(sections, template_info) response += self._build_additional_info(sections) response += self._build_suggestions(sections.keys(), question_type) response += self._build_footer() return response except Exception as e: logger.error(f"Response generation failed: {str(e)}") return self._generate_error_response() def _extract_raw_content(self, sections: Dict) -> str: """Extract raw content from sections for LLM processing""" content_parts = [] for section, chunks in sections.items(): for chunk, score in chunks[:2]: # Take top 2 chunks per section if ":" in chunk: content = chunk.split(":", 1)[1].strip() content_parts.append(content) return " ".join(content_parts[:3]) # Limit to avoid token limits def _generate_llm_enhanced_response(self, question: str, raw_content: str, template_info: Dict) -> str: """Generate enhanced response using LLM""" try: # Create a prompt for the LLM prompt = f"""بناءً على المعلومات التالية، أجب على السؤال بطريقة مهنية ومفصلة: السؤال: {question} المعلومات المتاحة: {raw_content} الإجابة المطلوبة يجب أن تكون: - مهنية ومنظمة - باللغة العربية الفصحى - تحتوي على تفاصيل مفيدة - مناسبة لموضوع الموازنة التشاركية والشفافية المالية الإجابة:""" # Generate response using LLM generated = text_generator( prompt, max_length=400, num_return_sequences=1, temperature=0.7, do_sample=True, pad_token_id=llm_tokenizer.eos_token_id ) if generated and len(generated) > 0: full_response = generated[0]['generated_text'] # Extract only the answer part after "الإجابة:" if "الإجابة:" in full_response: answer = full_response.split("الإجابة:")[-1].strip() # Format the enhanced response formatted_response = f""" ╔══════════════════════════════════════════════════════════════╗ ║ {template_info["icon"]} **{template_info["title"]}** ╚══════════════════════════════════════════════════════════════╝ **استعلامك:** {question} ## الإجابة المطورة {answer} --- **للمزيد من المعلومات:** تواصل مع وحدة الشفافية والمشاركة المجتمعية **المصدر:** وزارة المالية - جمهورية مصر العربية """ return formatted_response except Exception as e: logger.error(f"LLM enhancement failed: {str(e)}") return None def _build_response_header(self, question: str, template_info: Dict) -> str: """Build professional response header""" header = f""" ╔══════════════════════════════════════════════════════════════╗ ║ {template_info["icon"]} **{template_info["title"]}** ╚══════════════════════════════════════════════════════════════╝ **استعلامك:** {question} """ return header def _build_main_content(self, sections: Dict, template_info: Dict) -> str: """Build the main content section""" if not sections: return "" # Find the most relevant section main_section = max(sections.keys(), key=lambda k: max(score for _, score in sections[k])) content = f"## {main_section}\n\n" # Format main content based on structure type main_content = self._format_section_content_professional( sections[main_section], template_info['structure'] ) content += main_content + "\n\n" content += "---\n\n" return content def _build_additional_info(self, sections: Dict) -> str: """Build additional information section""" other_sections = list(sections.keys())[1:3] # Take up to 2 additional sections if not other_sections: return "" content = "## معلومات إضافية ذات صلة\n\n" for i, section in enumerate(other_sections, 1): content += f"### {i}. **{section}**\n" section_content = self._format_section_content_professional( sections[section][:2], 'general' ) content += section_content + "\n\n" content += "---\n\n" return content def _build_suggestions(self, available_sections: List[str], question_type: str) -> str: """Build suggestions section""" suggestions = [] # Section-based suggestions for section in list(available_sections)[:3]: if len(section.split()) <= 4: suggestions.append(f"المزيد حول {section}") # Type-based suggestions type_suggestions = { 'definition': ["الأهداف والفوائد", "التطبيق العملي"], 'how': ["الخطوات التفصيلية", "المتطلبات والشروط"], 'who': ["الأدوار والمسؤوليات", "التواصل والاتصال"], 'when': ["الجدول الزمني", "المراحل القادمة"] } if question_type in type_suggestions: suggestions.extend(type_suggestions[question_type]) if suggestions: content = "## اقتراحات للاستفسارات الإضافية\n\n" for i, suggestion in enumerate(suggestions[:4], 1): content += f"{i}. {suggestion}\n" content += "\n" return content return "" def _build_footer(self) -> str: """Build response footer""" footer = """ --- 📞 **للمزيد من المعلومات:** تواصل مع وحدة الشفافية والمشاركة المجتمعية 🌐 **المصدر:** وزارة المالية - جمهورية مصر العربية """ return footer def _format_section_content_professional(self, chunk_list: List[Tuple[str, float]], structure_type: str) -> str: """Format content professionally based on structure type""" content_parts = [] for chunk, score in sorted(chunk_list, key=lambda x: x[1], reverse=True)[:3]: if ":" in chunk: content = chunk.split(":", 1)[1].strip() if structure_type == 'definition': content_parts.append(f"- **{content}**") elif structure_type == 'process': content_parts.append(f"- {content}") elif structure_type == 'list': content_parts.append(f"- {content}") elif structure_type == 'entities': content_parts.append(f"- {content}") elif structure_type == 'timeline': content_parts.append(f"- {content}") else: # general content_parts.append(f"- {content}") return "\n\n".join(content_parts) def _extract_topic(self, question: str) -> str: """Extract the main topic from the question""" keywords = query_processor.extract_keywords(question) if keywords: return " ".join(keywords[:2]) return "الموضوع المطلوب" def _generate_fallback_response(self, question: str) -> str: """Generate professional fallback response""" return f""" ╔══════════════════════════════════════════════════════════════╗ ║ البحث في قاعدة المعرفة ╚══════════════════════════════════════════════════════════════╝ **استعلامك:** {question} ## لم يتم العثور على نتائج مطابقة لم أتمكن من العثور على معلومات محددة تجيب على استفسارك في قاعدة المعرفة الحالية. ## اقتراحات لتحسين البحث 1. **إعادة صياغة السؤال:** جرب استخدام كلمات مفتاحية مختلفة 2. **البحث في الموضوعات الرئيسية:** - الموازنة التشاركية - الشفافية المالية - المشاركة المجتمعية - وحدة الشفافية 3. **أمثلة على أسئلة مفيدة:** - ما هي أهداف الموازنة التشاركية؟ - كيف يمكن للمواطن المشاركة؟ - من هم أعضاء فريق العمل؟ --- 📞 **للمزيد من المعلومات:** تواصل مع وحدة الشفافية والمشاركة المجتمعية """ def _generate_error_response(self) -> str: """Generate professional error response""" return """ ╔══════════════════════════════════════════════════════════════╗ ║ خطأ في النظام ╚══════════════════════════════════════════════════════════════╝ حدث خطأ غير متوقع أثناء معالجة استفسارك. ## الخطوات المقترحة 1. تأكد من صحة صياغة السؤال 2. أعد المحاولة بعد قليل 3. تواصل مع الدعم الفني إذا استمر الخطأ --- 📞 **الدعم الفني:** وحدة الشفافية والمشاركة المجتمعية """ # Initialize response generator response_generator = ResponseGenerator() def answer_question(question: str) -> str: """Enhanced question answering with improved processing pipeline""" try: # Input validation if not question or len(question.strip()) < 3: return "الرجاء إدخال سؤال واضح (3 كلمات على الأقل)" # Preprocess question normalized_question = query_processor.normalize_text(question) question_type = query_processor.classify_question_type(normalized_question) logger.info(f"Processing question: '{normalized_question}' (type: {question_type})") # Retrieve relevant content retrieved_chunks = retriever.retrieve(normalized_question, top_k=6) # Generate response response = response_generator.generate_response( normalized_question, retrieved_chunks, question_type ) return response except Exception as e: logger.error(f"Question processing failed: {str(e)}") return "حدث خطأ غير متوقع. يرجى المحاولة مرة أخرى." # Enhanced UI with better styling for professional responses css = """ .arabic-ui { direction: rtl; text-align: right; font-family: 'Tahoma', 'Arial', sans-serif; line-height: 1.8; background-color: #2c3e50; /* Dark background for overall consistency */ color: #ecf0f1; /* Light text for readability */ } .header { background: #34495e; /* Slightly lighter dark for header */ color: #ecf0f1; padding: 25px; border-radius: 12px; margin-bottom: 25px; box-shadow: 0 4px 6px rgba(0,0,0,0.3); } .footer { margin-top: 25px; font-size: 0.9em; color: #bdc3c7; text-align: center; padding: 15px; background: #34495e; /* Consistent dark background for footer */ border-radius: 8px; } .example-box { border: 2px solid #34495e; /* Darker border */ border-radius: 12px; padding: 20px; margin-bottom: 20px; background: #34495e; /* Dark background for example box */ color: #ecf0f1; } .answer-box { min-height: 300px; line-height: 1.8; font-size: 14px; font-family: 'Tahoma', 'Arial', monospace; background: #2c3e50; /* Dark background for answer box */ border: 1px solid #34495e; /* Darker border for answer box */ border-radius: 8px; padding: 15px; white-space: pre-wrap; overflow-y: auto; color: #ecf0f1; } .question-input { font-size: 16px; padding: 12px; border-radius: 8px; font-family: 'Tahoma', 'Arial', sans-serif; background-color: #34495e; /* Dark background for input */ border: 1px solid #2c3e50; /* Darker border */ color: #ecf0f1; } /* Enhanced markdown support for Arabic */ .answer-box h1, .answer-box h2, .answer-box h3 { color: #ecf0f1; margin-top: 20px; margin-bottom: 10px; } .answer-box h2 { border-bottom: 2px solid #3498db; padding-bottom: 5px; } .answer-box h3 { color: #bdc3c7; } .answer-box hr { border: none; border-top: 1px solid #7f8c8d; margin: 20px 0; } .answer-box strong { color: #ecf0f1; font-weight: bold; } .answer-box ul, .answer-box ol { margin: 10px 0; padding-right: 20px; } .answer-box li { margin: 5px 0; } /* Box drawing characters support */ .answer-box { font-feature-settings: "liga" 1, "calt" 1; } """ # Create Gradio interface with gr.Blocks(css=css, title="المساعد الآلي للموازنة التشاركية") as demo: with gr.Column(elem_classes="arabic-ui"): gr.Markdown("""

المساعد الآلي المطور للموازنة التشاركية مع الذكاء الاصطناعي

نسخة محسّنة مع نموذج لغوي ذكي لإعادة صياغة الإجابات وتوليد محتوى أكثر دقة ومهنية

""") with gr.Row(): question = gr.Textbox( label="اكتب سؤالك هنا", placeholder="مثال: ما هي مراحل تطبيق الموازنة التشاركية في مصر؟", lines=3, elem_classes="question-input" ) with gr.Row(): submit_btn = gr.Button("إرسال السؤال", variant="primary", size="lg") clear_btn = gr.Button("مسح", variant="secondary") answer = gr.Textbox( label="الإجابة المطورة", interactive=False, lines=12, elem_classes="answer-box" ) with gr.Column(elem_classes="example-box"): gr.Markdown("**أسئلة مقترحة للتجربة:**") gr.Examples( examples=[ ["ما هي أهداف الموازنة التشاركية؟"], ["كيف يمكن للمواطن المشاركة في صنع القرار المالي؟"], ["ما هي أهم إنجازات وحدة الشفافية والمشاركة المجتمعية؟"], ["من هم أعضاء فريق عمل وحدة الشفافية؟"], ["كيف تطور أداء مصر في مؤشرات الشفافية الدولية؟"], ["ما هي الوثائق المتاحة للجمهور في الموازنة؟"] ], inputs=question, label="" ) gr.Markdown(""" """) # Event handlers submit_btn.click(answer_question, inputs=question, outputs=answer) clear_btn.click(lambda: ("", ""), outputs=[question, answer]) question.submit(answer_question, inputs=question, outputs=answer) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )