FTCE-chatbot / app.py
mo-456's picture
Update app.py
85241d2 verified
import gradio as gr
from sentence_transformers import SentenceTransformer, util
import torch
import logging
import re
import os
from typing import List, Tuple, Dict
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import json
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize models
try:
logger.info("Loading Arabic language model...")
# Using a more robust Arabic model
model = SentenceTransformer(
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
device="cuda" if torch.cuda.is_available() else "cpu"
)
logger.info(f"Model loaded on {model.device}")
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise RuntimeError("Failed to initialize the AI model")
# Initialize Arabic LLM for text generation and rephrasing
try:
logger.info("Loading Arabic LLM for text generation...")
# Using ArabianGPT for Arabic text generation
llm_model_name = "riotu-lab/ArabianGPT-01B"
# Load tokenizer and model
llm_tokenizer = AutoTokenizer.from_pretrained(llm_model_name)
llm_model = AutoModelForCausalLM.from_pretrained(
llm_model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None
)
# Create text generation pipeline
text_generator = pipeline(
"text-generation",
model=llm_model,
tokenizer=llm_tokenizer,
max_length=512,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=llm_tokenizer.eos_token_id
)
logger.info("Arabic LLM loaded successfully")
llm_available = True
except Exception as e:
logger.warning(f"LLM loading failed: {str(e)}. Falling back to basic response generation.")
text_generator = None
llm_available = False
# Initialize TF-IDF for hybrid search
tfidf_vectorizer = TfidfVectorizer(
max_features=1000,
stop_words=None, # Keep Arabic stop words
ngram_range=(1, 2),
analyzer='word'
)
class KnowledgeBase:
def __init__(self):
self.chunks = []
self.embeddings = None
self.tfidf_matrix = None
self.section_mapping = {}
def load_and_process_knowledge(self) -> None:
"""Enhanced knowledge loading with better chunking strategy"""
try:
knowledge_file = "knowledge.txt"
if not os.path.exists(knowledge_file):
raise FileNotFoundError(f"{knowledge_file} file not found")
with open(knowledge_file, "r", encoding="utf-8") as f:
content = f.read().strip()
if not content:
raise ValueError(f"{knowledge_file} is empty")
sections = {}
current_section = "معلومات عامة"
with open(knowledge_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("## "):
current_section = line[3:].strip()
sections[current_section] = []
elif line and current_section:
sections[current_section].append(line)
# Enhanced chunking strategy
self.chunks = []
chunk_id = 0
for section, content_list in sections.items():
section_text = " ".join(content_list)
# Split into meaningful chunks while preserving context
if len(section_text) <= 200:
# Small sections: keep as single chunk
chunk_text = f"{section}: {section_text}"
self.chunks.append(chunk_text)
self.section_mapping[chunk_id] = section
chunk_id += 1
else:
# Large sections: split intelligently
sentences = re.split(r'(?<=[.!?\n])\s+', section_text)
current_chunk = ""
for sent in sentences:
sent = sent.strip()
if not sent:
continue
# Check if adding this sentence exceeds optimal chunk size
if len(current_chunk) + len(sent) < 180:
current_chunk += " " + sent if current_chunk else sent
else:
if current_chunk:
chunk_text = f"{section}: {current_chunk}"
self.chunks.append(chunk_text)
self.section_mapping[chunk_id] = section
chunk_id += 1
current_chunk = sent
# Add remaining chunk
if current_chunk:
chunk_text = f"{section}: {current_chunk}"
self.chunks.append(chunk_text)
self.section_mapping[chunk_id] = section
chunk_id += 1
# Generate embeddings
self.embeddings = model.encode(self.chunks, convert_to_tensor=True)
# Generate TF-IDF matrix for hybrid search
self.tfidf_matrix = tfidf_vectorizer.fit_transform(self.chunks)
logger.info(f"Loaded {len(self.chunks)} knowledge chunks from {len(sections)} sections")
except Exception as e:
logger.error(f"Knowledge loading error: {str(e)}")
self.chunks = ["عام: النظام جاهز للرد على استفساراتك. يرجى طرح سؤالك."]
self.embeddings = model.encode(self.chunks, convert_to_tensor=True)
self.tfidf_matrix = tfidf_vectorizer.fit_transform(self.chunks)
# Initialize knowledge base
kb = KnowledgeBase()
kb.load_and_process_knowledge()
class ArabicQueryProcessor:
def __init__(self):
# Enhanced Arabic text normalization patterns
self.normalization_patterns = {
# Normalize Arabic characters
r'[أإآا]': 'ا',
r'[ىي]': 'ي',
r'[ؤو]': 'و',
r'[ةه]': 'ه',
# Question word normalization
r'\bماهي\b': 'ما هي',
r'\bماهو\b': 'ما هو',
r'\bكيفية\b': 'كيف',
r'\bطريقة\b': 'كيف',
r'\bاريد\b': 'كيف يمكن',
r'\bعاوز\b': 'كيف يمكن',
r'\bعايز\b': 'كيف يمكن',
r'\bازاي\b': 'كيف',
r'\bايه\b': 'ما',
r'\bمين\b': 'من',
r'\bفين\b': 'أين',
r'\bامتى\b': 'متى',
# Common variations
r'\bالموازنه\b': 'الموازنة',
r'\bالشفافيه\b': 'الشفافية',
r'\bالمشاركه\b': 'المشاركة',
}
# Question type classification
self.question_types = {
'definition': [r'\bما هي\b', r'\bما هو\b', r'\bتعريف\b', r'\bمعنى\b'],
'how': [r'\bكيف\b', r'\bكيفية\b', r'\bطريقة\b'],
'why': [r'\bلماذا\b', r'\bليه\b', r'\bسبب\b'],
'who': [r'\bمن\b', r'\bمين\b'],
'when': [r'\bمتى\b', r'\bامتى\b'],
'where': [r'\bأين\b', r'\bفين\b'],
'list': [r'\bاذكر\b', r'\bعدد\b', r'\bقائمة\b', r'\bأنواع\b']
}
def normalize_text(self, text: str) -> str:
"""Apply comprehensive Arabic text normalization"""
text = text.strip()
# Apply normalization patterns
for pattern, replacement in self.normalization_patterns.items():
text = re.sub(pattern, replacement, text)
# Remove extra whitespace and punctuation
text = re.sub(r'[؟\?،,\.]+', '', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def classify_question_type(self, question: str) -> str:
"""Classify the type of question to improve response generation"""
question_lower = question.lower()
for q_type, patterns in self.question_types.items():
for pattern in patterns:
if re.search(pattern, question_lower):
return q_type
return 'general'
def extract_keywords(self, question: str) -> List[str]:
"""Extract key terms from the question for better matching"""
# Remove common question words and focus on content words
stop_words = {
'ما', 'هي', 'هو', 'كيف', 'لماذا', 'متى', 'أين', 'من', 'في', 'على', 'إلى',
'عن', 'مع', 'هذا', 'هذه', 'ذلك', 'تلك', 'التي', 'الذي', 'يمكن', 'يجب'
}
words = question.split()
keywords = [word for word in words if word not in stop_words and len(word) > 2]
return keywords
# Initialize query processor
query_processor = ArabicQueryProcessor()
class HybridRetriever:
def __init__(self, kb: KnowledgeBase, alpha: float = 0.7):
self.kb = kb
self.alpha = alpha # Weight for semantic similarity vs TF-IDF
def retrieve(self, question: str, top_k: int = 5) -> List[Tuple[str, float, str]]:
"""Hybrid retrieval combining semantic and lexical matching"""
try:
# Semantic search using sentence transformers
question_embedding = model.encode(question, convert_to_tensor=True)
semantic_scores = util.cos_sim(question_embedding, self.kb.embeddings)[0]
# Lexical search using TF-IDF
question_tfidf = tfidf_vectorizer.transform([question])
lexical_scores = cosine_similarity(question_tfidf, self.kb.tfidf_matrix)[0]
# Combine scores
combined_scores = []
for i in range(len(self.kb.chunks)):
semantic_score = semantic_scores[i].item()
lexical_score = lexical_scores[i]
# Weighted combination
combined_score = self.alpha * semantic_score + (1 - self.alpha) * lexical_score
combined_scores.append((i, combined_score, semantic_score))
# Sort by combined score
combined_scores.sort(key=lambda x: x[1], reverse=True)
# Return top results with minimum threshold
results = []
for idx, combined_score, semantic_score in combined_scores[:top_k]:
if combined_score > 0.3: # Adjusted threshold
chunk = self.kb.chunks[idx]
section = self.kb.section_mapping.get(idx, "عام")
results.append((chunk, combined_score, section))
logger.info(f"Retrieved {len(results)} relevant chunks (top score: {combined_scores[0][1]:.3f})")
return results
except Exception as e:
logger.error(f"Retrieval failed: {str(e)}")
return []
# Initialize retriever
retriever = HybridRetriever(kb)
class ResponseGenerator:
def __init__(self):
self.response_templates = {
'definition': {
'icon': 'التعريف',
'title': 'التعريف والمفهوم',
'structure': 'definition'
},
'how': {
'icon': 'الآلية',
'title': 'الآلية والطريقة',
'structure': 'process'
},
'why': {
'icon': 'الأسباب',
'title': 'الأسباب والمبررات',
'structure': 'reasons'
},
'who': {
'icon': 'الأشخاص',
'title': 'الأشخاص والجهات',
'structure': 'entities'
},
'when': {
'icon': 'التوقيت',
'title': 'التوقيت والمراحل',
'structure': 'timeline'
},
'list': {
'icon': 'القائمة',
'title': 'القائمة والعناصر',
'structure': 'list'
},
'general': {
'icon': 'معلومات',
'title': 'معلومات عامة',
'structure': 'general'
}
}
def generate_response(self, question: str, retrieved_chunks: List[Tuple[str, float, str]], question_type: str) -> str:
"""Generate professionally formatted Arabic responses with LLM enhancement"""
try:
if not retrieved_chunks:
return self._generate_fallback_response(question)
# Group chunks by section
sections = {}
for chunk, score, section in retrieved_chunks:
if section not in sections:
sections[section] = []
sections[section].append((chunk, score))
# Get template info
template_info = self.response_templates.get(question_type, self.response_templates['general'])
# Extract raw content for LLM processing
raw_content = self._extract_raw_content(sections)
# Use LLM to enhance and rephrase the response if available
if llm_available and raw_content:
enhanced_response = self._generate_llm_enhanced_response(question, raw_content, template_info)
if enhanced_response:
return enhanced_response
# Fallback to original response generation
response = self._build_response_header(question, template_info)
response += self._build_main_content(sections, template_info)
response += self._build_additional_info(sections)
response += self._build_suggestions(sections.keys(), question_type)
response += self._build_footer()
return response
except Exception as e:
logger.error(f"Response generation failed: {str(e)}")
return self._generate_error_response()
def _extract_raw_content(self, sections: Dict) -> str:
"""Extract raw content from sections for LLM processing"""
content_parts = []
for section, chunks in sections.items():
for chunk, score in chunks[:2]: # Take top 2 chunks per section
if ":" in chunk:
content = chunk.split(":", 1)[1].strip()
content_parts.append(content)
return " ".join(content_parts[:3]) # Limit to avoid token limits
def _generate_llm_enhanced_response(self, question: str, raw_content: str, template_info: Dict) -> str:
"""Generate enhanced response using LLM"""
try:
# Create a prompt for the LLM
prompt = f"""بناءً على المعلومات التالية، أجب على السؤال بطريقة مهنية ومفصلة:
السؤال: {question}
المعلومات المتاحة: {raw_content}
الإجابة المطلوبة يجب أن تكون:
- مهنية ومنظمة
- باللغة العربية الفصحى
- تحتوي على تفاصيل مفيدة
- مناسبة لموضوع الموازنة التشاركية والشفافية المالية
الإجابة:"""
# Generate response using LLM
generated = text_generator(
prompt,
max_length=400,
num_return_sequences=1,
temperature=0.7,
do_sample=True,
pad_token_id=llm_tokenizer.eos_token_id
)
if generated and len(generated) > 0:
full_response = generated[0]['generated_text']
# Extract only the answer part after "الإجابة:"
if "الإجابة:" in full_response:
answer = full_response.split("الإجابة:")[-1].strip()
# Format the enhanced response
formatted_response = f"""
╔══════════════════════════════════════════════════════════════╗
{template_info["icon"]} **{template_info["title"]}**
╚══════════════════════════════════════════════════════════════╝
**استعلامك:** {question}
## الإجابة المطورة
{answer}
---
**للمزيد من المعلومات:** تواصل مع وحدة الشفافية والمشاركة المجتمعية
**المصدر:** وزارة المالية - جمهورية مصر العربية
"""
return formatted_response
except Exception as e:
logger.error(f"LLM enhancement failed: {str(e)}")
return None
def _build_response_header(self, question: str, template_info: Dict) -> str:
"""Build professional response header"""
header = f"""
╔══════════════════════════════════════════════════════════════╗
{template_info["icon"]} **{template_info["title"]}**
╚══════════════════════════════════════════════════════════════╝
**استعلامك:** {question}
"""
return header
def _build_main_content(self, sections: Dict, template_info: Dict) -> str:
"""Build the main content section"""
if not sections:
return ""
# Find the most relevant section
main_section = max(sections.keys(),
key=lambda k: max(score for _, score in sections[k]))
content = f"## {main_section}\n\n"
# Format main content based on structure type
main_content = self._format_section_content_professional(
sections[main_section], template_info['structure']
)
content += main_content + "\n\n"
content += "---\n\n"
return content
def _build_additional_info(self, sections: Dict) -> str:
"""Build additional information section"""
other_sections = list(sections.keys())[1:3] # Take up to 2 additional sections
if not other_sections:
return ""
content = "## معلومات إضافية ذات صلة\n\n"
for i, section in enumerate(other_sections, 1):
content += f"### {i}. **{section}**\n"
section_content = self._format_section_content_professional(
sections[section][:2], 'general'
)
content += section_content + "\n\n"
content += "---\n\n"
return content
def _build_suggestions(self, available_sections: List[str], question_type: str) -> str:
"""Build suggestions section"""
suggestions = []
# Section-based suggestions
for section in list(available_sections)[:3]:
if len(section.split()) <= 4:
suggestions.append(f"المزيد حول {section}")
# Type-based suggestions
type_suggestions = {
'definition': ["الأهداف والفوائد", "التطبيق العملي"],
'how': ["الخطوات التفصيلية", "المتطلبات والشروط"],
'who': ["الأدوار والمسؤوليات", "التواصل والاتصال"],
'when': ["الجدول الزمني", "المراحل القادمة"]
}
if question_type in type_suggestions:
suggestions.extend(type_suggestions[question_type])
if suggestions:
content = "## اقتراحات للاستفسارات الإضافية\n\n"
for i, suggestion in enumerate(suggestions[:4], 1):
content += f"{i}. {suggestion}\n"
content += "\n"
return content
return ""
def _build_footer(self) -> str:
"""Build response footer"""
footer = """
---
📞 **للمزيد من المعلومات:** تواصل مع وحدة الشفافية والمشاركة المجتمعية
🌐 **المصدر:** وزارة المالية - جمهورية مصر العربية
"""
return footer
def _format_section_content_professional(self, chunk_list: List[Tuple[str, float]], structure_type: str) -> str:
"""Format content professionally based on structure type"""
content_parts = []
for chunk, score in sorted(chunk_list, key=lambda x: x[1], reverse=True)[:3]:
if ":" in chunk:
content = chunk.split(":", 1)[1].strip()
if structure_type == 'definition':
content_parts.append(f"- **{content}**")
elif structure_type == 'process':
content_parts.append(f"- {content}")
elif structure_type == 'list':
content_parts.append(f"- {content}")
elif structure_type == 'entities':
content_parts.append(f"- {content}")
elif structure_type == 'timeline':
content_parts.append(f"- {content}")
else: # general
content_parts.append(f"- {content}")
return "\n\n".join(content_parts)
def _extract_topic(self, question: str) -> str:
"""Extract the main topic from the question"""
keywords = query_processor.extract_keywords(question)
if keywords:
return " ".join(keywords[:2])
return "الموضوع المطلوب"
def _generate_fallback_response(self, question: str) -> str:
"""Generate professional fallback response"""
return f"""
╔══════════════════════════════════════════════════════════════╗
║ البحث في قاعدة المعرفة
╚══════════════════════════════════════════════════════════════╝
**استعلامك:** {question}
## لم يتم العثور على نتائج مطابقة
لم أتمكن من العثور على معلومات محددة تجيب على استفسارك في قاعدة المعرفة الحالية.
## اقتراحات لتحسين البحث
1. **إعادة صياغة السؤال:** جرب استخدام كلمات مفتاحية مختلفة
2. **البحث في الموضوعات الرئيسية:**
- الموازنة التشاركية
- الشفافية المالية
- المشاركة المجتمعية
- وحدة الشفافية
3. **أمثلة على أسئلة مفيدة:**
- ما هي أهداف الموازنة التشاركية؟
- كيف يمكن للمواطن المشاركة؟
- من هم أعضاء فريق العمل؟
---
📞 **للمزيد من المعلومات:** تواصل مع وحدة الشفافية والمشاركة المجتمعية
"""
def _generate_error_response(self) -> str:
"""Generate professional error response"""
return """
╔══════════════════════════════════════════════════════════════╗
║ خطأ في النظام
╚══════════════════════════════════════════════════════════════╝
حدث خطأ غير متوقع أثناء معالجة استفسارك.
## الخطوات المقترحة
1. تأكد من صحة صياغة السؤال
2. أعد المحاولة بعد قليل
3. تواصل مع الدعم الفني إذا استمر الخطأ
---
📞 **الدعم الفني:** وحدة الشفافية والمشاركة المجتمعية
"""
# Initialize response generator
response_generator = ResponseGenerator()
def answer_question(question: str) -> str:
"""Enhanced question answering with improved processing pipeline"""
try:
# Input validation
if not question or len(question.strip()) < 3:
return "الرجاء إدخال سؤال واضح (3 كلمات على الأقل)"
# Preprocess question
normalized_question = query_processor.normalize_text(question)
question_type = query_processor.classify_question_type(normalized_question)
logger.info(f"Processing question: '{normalized_question}' (type: {question_type})")
# Retrieve relevant content
retrieved_chunks = retriever.retrieve(normalized_question, top_k=6)
# Generate response
response = response_generator.generate_response(
normalized_question, retrieved_chunks, question_type
)
return response
except Exception as e:
logger.error(f"Question processing failed: {str(e)}")
return "حدث خطأ غير متوقع. يرجى المحاولة مرة أخرى."
# Enhanced UI with better styling for professional responses
css = """
.arabic-ui {
direction: rtl;
text-align: right;
font-family: 'Tahoma', 'Arial', sans-serif;
line-height: 1.8;
background-color: #2c3e50; /* Dark background for overall consistency */
color: #ecf0f1; /* Light text for readability */
}
.header {
background: #34495e; /* Slightly lighter dark for header */
color: #ecf0f1;
padding: 25px;
border-radius: 12px;
margin-bottom: 25px;
box-shadow: 0 4px 6px rgba(0,0,0,0.3);
}
.footer {
margin-top: 25px;
font-size: 0.9em;
color: #bdc3c7;
text-align: center;
padding: 15px;
background: #34495e; /* Consistent dark background for footer */
border-radius: 8px;
}
.example-box {
border: 2px solid #34495e; /* Darker border */
border-radius: 12px;
padding: 20px;
margin-bottom: 20px;
background: #34495e; /* Dark background for example box */
color: #ecf0f1;
}
.answer-box {
min-height: 300px;
line-height: 1.8;
font-size: 14px;
font-family: 'Tahoma', 'Arial', monospace;
background: #2c3e50; /* Dark background for answer box */
border: 1px solid #34495e; /* Darker border for answer box */
border-radius: 8px;
padding: 15px;
white-space: pre-wrap;
overflow-y: auto;
color: #ecf0f1;
}
.question-input {
font-size: 16px;
padding: 12px;
border-radius: 8px;
font-family: 'Tahoma', 'Arial', sans-serif;
background-color: #34495e; /* Dark background for input */
border: 1px solid #2c3e50; /* Darker border */
color: #ecf0f1;
}
/* Enhanced markdown support for Arabic */
.answer-box h1, .answer-box h2, .answer-box h3 {
color: #ecf0f1;
margin-top: 20px;
margin-bottom: 10px;
}
.answer-box h2 {
border-bottom: 2px solid #3498db;
padding-bottom: 5px;
}
.answer-box h3 {
color: #bdc3c7;
}
.answer-box hr {
border: none;
border-top: 1px solid #7f8c8d;
margin: 20px 0;
}
.answer-box strong {
color: #ecf0f1;
font-weight: bold;
}
.answer-box ul, .answer-box ol {
margin: 10px 0;
padding-right: 20px;
}
.answer-box li {
margin: 5px 0;
}
/* Box drawing characters support */
.answer-box {
font-feature-settings: "liga" 1, "calt" 1;
}
"""
# Create Gradio interface
with gr.Blocks(css=css, title="المساعد الآلي للموازنة التشاركية") as demo:
with gr.Column(elem_classes="arabic-ui"):
gr.Markdown("""
<div class="header">
<h1>المساعد الآلي المطور للموازنة التشاركية مع الذكاء الاصطناعي</h1>
<p>نسخة محسّنة مع نموذج لغوي ذكي لإعادة صياغة الإجابات وتوليد محتوى أكثر دقة ومهنية</p>
</div>
""")
with gr.Row():
question = gr.Textbox(
label="اكتب سؤالك هنا",
placeholder="مثال: ما هي مراحل تطبيق الموازنة التشاركية في مصر؟",
lines=3,
elem_classes="question-input"
)
with gr.Row():
submit_btn = gr.Button("إرسال السؤال", variant="primary", size="lg")
clear_btn = gr.Button("مسح", variant="secondary")
answer = gr.Textbox(
label="الإجابة المطورة",
interactive=False,
lines=12,
elem_classes="answer-box"
)
with gr.Column(elem_classes="example-box"):
gr.Markdown("**أسئلة مقترحة للتجربة:**")
gr.Examples(
examples=[
["ما هي أهداف الموازنة التشاركية؟"],
["كيف يمكن للمواطن المشاركة في صنع القرار المالي؟"],
["ما هي أهم إنجازات وحدة الشفافية والمشاركة المجتمعية؟"],
["من هم أعضاء فريق عمل وحدة الشفافية؟"],
["كيف تطور أداء مصر في مؤشرات الشفافية الدولية؟"],
["ما هي الوثائق المتاحة للجمهور في الموازنة؟"]
],
inputs=question,
label=""
)
gr.Markdown("""
<div class="footer">
<p><strong>وحدة الشفافية والمشاركة المجتمعية - وزارة المالية</strong></p>
<p>نسخة محسّنة مع نموذج لغوي ذكي لإعادة صياغة الإجابات وتوليد محتوى أكثر دقة ومهنية</p>
</div>
""")
# Event handlers
submit_btn.click(answer_question, inputs=question, outputs=answer)
clear_btn.click(lambda: ("", ""), outputs=[question, answer])
question.submit(answer_question, inputs=question, outputs=answer)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)