chatbox / app.py
anaspro
🔧 إصلاحات مهمة للـ deployment
df14f5f
raw
history blame
17.1 kB
import os
import pathlib
import tempfile
from collections.abc import Iterator
from threading import Thread
import av
import gradio as gr
import spaces
import torch
from transformers import AutoModelForImageTextToText, AutoProcessor
from transformers.generation.streamers import TextIteratorStreamer
# استيراد نظام RAG
from simple_rag import SimpleRAG
# Model configuration
model_id = "anaspro/Shako-4B-it-v5"
processor = AutoProcessor.from_pretrained(model_id)
model = AutoModelForImageTextToText.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.bfloat16
)
# Supported file types
IMAGE_FILE_TYPES = (".jpg", ".jpeg", ".png", ".webp")
VIDEO_FILE_TYPES = (".mp4", ".mov", ".webm")
AUDIO_FILE_TYPES = (".mp3", ".wav")
# Video processing settings
TARGET_FPS = int(os.getenv("TARGET_FPS", "3"))
MAX_FRAMES = int(os.getenv("MAX_FRAMES", "30"))
MAX_INPUT_TOKENS = int(os.getenv("MAX_INPUT_TOKENS", "10_000"))
# تهيئة نظام RAG للدعم الفني
print("🔄 جاري تهيئة نظام RAG للدعم الفني...")
try:
rag_system = SimpleRAG()
# محاولة تحميل فهرس موجود
if not rag_system.load_index():
print("🔄 إنشاء فهرس جديد...")
# تحميل ملف شركة NBTEL
nbtel_file = "./data/nbtel_company_profile.md"
if os.path.exists(nbtel_file):
print(f"📁 وجد ملف البيانات: {nbtel_file}")
documents = rag_system.load_markdown_file(nbtel_file)
if documents:
rag_system.add_documents(documents)
rag_system.build_index()
rag_system.save_index()
print("✅ تم إنشاء فهرس RAG بنجاح")
else:
print("⚠️ لم يتم استخراج أي مستندات من الملف")
else:
print(f"⚠️ لم يتم العثور على ملف البيانات: {nbtel_file}")
# إنشاء بيانات تجريبية بسيطة
sample_docs = [
{
'title': 'معلومات أساسية عن NBTEL',
'content': 'شركة NBTEL عراقية متخصصة في خدمات الإنترنت والاتصالات. نقدم خدمات WiFi و FTTX في محافظات نينوى وكركوك وصلاح الدين.',
'source': 'fallback'
},
{
'title': 'معلومات التواصل',
'content': 'للدعم الفني: 6337، واتساب: 0773 633 7777، إيميل: Info@nbtel.iq',
'source': 'fallback'
}
]
rag_system.add_documents(sample_docs)
rag_system.build_index()
rag_system.save_index()
print("✅ تم إنشاء فهرس تجريبي")
print(f"✅ نظام RAG جاهز - {len(rag_system.documents)} مستند")
RAG_ENABLED = True
except Exception as e:
print(f"❌ خطأ في تهيئة نظام RAG: {str(e)}")
print("⚠️ سيتم تشغيل النظام بدون RAG")
rag_system = None
RAG_ENABLED = False
def get_file_type(path: str) -> str:
if path.endswith(IMAGE_FILE_TYPES):
return "image"
if path.endswith(VIDEO_FILE_TYPES):
return "video"
if path.endswith(AUDIO_FILE_TYPES):
return "audio"
error_message = f"Unsupported file type: {path}"
raise ValueError(error_message)
def search_knowledge_base(query: str) -> str:
"""البحث في قاعدة المعرفة باستخدام RAG"""
if not RAG_ENABLED or rag_system is None:
return ""
try:
# البحث في قاعدة المعرفة
context = rag_system.get_context_for_query(query, max_results=3)
if context and "لم أجد معلومات" not in context:
return f"\n\n📚 معلومات من قاعدة المعرفة:\n{context}"
else:
return ""
except Exception as e:
print(f"خطأ في البحث في قاعدة المعرفة: {str(e)}")
return ""
def create_technical_support_prompt(user_message: str, knowledge_context: str = "") -> str:
"""إنشاء prompt محسن للدعم الفني"""
base_prompt = """أنت مساعد دعم فني ذكي لشركة NBTEL العراقية.
هويتك ومهامك:
- اسمك: مساعد NBTEL الذكي
- تتحدث باللهجة العراقية الودية والمرحة
- خبير في خدمات الإنترنت والاتصالات والدعم الفني
- تساعد العملاء في حل مشاكلهم التقنية
- تقدم معلومات عن خدمات وأسعار الشركة
قواعد التعامل:
1. رحب بالعميل بطريقة عراقية ودية (أهلاً وسهلاً، مرحبا بيك، الخ)
2. استخدم المعلومات من قاعدة المعرفة إن وجدت
3. اطرح أسئلة توضيحية لفهم المشكلة بدقة
4. قدم حلول عملية خطوة بخطوة
5. استخدم أمثلة عراقية مفهومة
6. كن صبور ومساعد مع العملاء
7. إذا ما تعرف الجواب، اعترف واطلب التواصل مع الدعم المباشر
أسلوب الكلام:
- استخدم "احنا" بدلاً من "نحن"
- استخدم "شلونك" بدلاً من "كيف حالك"
- استخدم "شنو" بدلاً من "ماذا"
- استخدم "وين" بدلاً من "أين"
- استخدم "اكو" بدلاً من "يوجد"
- اجعل الكلام طبيعي ومرح
معلومات الشركة الأساسية:
- شركة NBTEL عراقية متخصصة بخدمات الإنترنت والاتصالات
- نخدم محافظات نينوى، كركوك، وصلاح الدين
- نقدم خدمات WiFi و FTTX (الكيبل الضوئي)
- مقرنا الرئيسي في الموصل
- رقم الدعم الفني: 6337
- واتساب: 0773 633 7777
"""
if knowledge_context:
enhanced_prompt = f"""{base_prompt}
{knowledge_context}
الاستفسار: {user_message}
جاوب بطريقة ودية وعملية، واستخدم المعلومات اللي فوق إذا كانت مفيدة:"""
else:
enhanced_prompt = f"""{base_prompt}
الاستفسار: {user_message}
جاوب بطريقة ودية وعملية:"""
return enhanced_prompt
def count_files_in_new_message(paths: list[str]) -> tuple[int, int]:
video_count = 0
non_video_count = 0
for path in paths:
if path.endswith(VIDEO_FILE_TYPES):
video_count += 1
else:
non_video_count += 1
return video_count, non_video_count
def validate_media_constraints(message: dict) -> bool:
video_count, non_video_count = count_files_in_new_message(message["files"])
if video_count > 1:
gr.Warning("Only one video is supported.")
return False
if video_count == 1 and non_video_count > 0:
gr.Warning("Mixing images and videos is not allowed.")
return False
return True
def extract_frames_to_tempdir(
video_path: str,
target_fps: float,
max_frames: int | None = None,
parent_dir: str | None = None,
prefix: str = "frames_",
) -> str:
temp_dir = tempfile.mkdtemp(prefix=prefix, dir=parent_dir)
container = av.open(video_path)
video_stream = container.streams.video[0]
if video_stream.duration is None or video_stream.time_base is None:
raise ValueError("video_stream is missing duration or time_base")
time_base = video_stream.time_base
duration = float(video_stream.duration * time_base)
interval = 1.0 / target_fps
total_frames = int(duration * target_fps)
if max_frames is not None:
total_frames = min(total_frames, max_frames)
target_times = [i * interval for i in range(total_frames)]
target_index = 0
for frame in container.decode(video=0):
if frame.pts is None:
continue
timestamp = float(frame.pts * time_base)
if target_index < len(target_times) and abs(timestamp - target_times[target_index]) < (interval / 2):
frame_path = pathlib.Path(temp_dir) / f"frame_{target_index:04d}.jpg"
frame.to_image().save(frame_path)
target_index += 1
if max_frames is not None and target_index >= max_frames:
break
container.close()
return temp_dir
def process_new_user_message(message: dict) -> list[dict]:
if not message["files"]:
return [{"type": "text", "text": message["text"]}]
file_types = [get_file_type(path) for path in message["files"]]
if len(file_types) == 1 and file_types[0] == "video":
gr.Info(f"Video will be processed at {TARGET_FPS} FPS, max {MAX_FRAMES} frames in this Space.")
temp_dir = extract_frames_to_tempdir(
message["files"][0],
target_fps=TARGET_FPS,
max_frames=MAX_FRAMES,
)
paths = sorted(pathlib.Path(temp_dir).glob("*.jpg"))
return [
{"type": "text", "text": message["text"]},
*[{"type": "image", "image": path.as_posix()} for path in paths],
]
return [
{"type": "text", "text": message["text"]},
*[{"type": file_type, file_type: path} for path, file_type in zip(message["files"], file_types, strict=True)],
]
def process_history(history: list[dict]) -> list[dict]:
messages = []
current_user_content: list[dict] = []
for item in history:
if item["role"] == "assistant":
if current_user_content:
messages.append({"role": "user", "content": current_user_content})
current_user_content = []
messages.append({"role": "assistant", "content": [{"type": "text", "text": item["content"]}]})
else:
content = item["content"]
if isinstance(content, str):
current_user_content.append({"type": "text", "text": content})
else:
filepath = content[0]
file_type = get_file_type(filepath)
current_user_content.append({"type": file_type, file_type: filepath})
return messages
@spaces.GPU()
@torch.inference_mode()
def generate(message: dict, history: list[dict], system_prompt: str = "", max_new_tokens: int = 512) -> Iterator[str]:
if not validate_media_constraints(message):
yield ""
return
# استخراج النص من الرسالة للبحث في قاعدة المعرفة
user_text = message.get("text", "")
# البحث في قاعدة المعرفة
knowledge_context = search_knowledge_base(user_text) if user_text else ""
# إنشاء prompt محسن للدعم الفني
if user_text:
enhanced_prompt = create_technical_support_prompt(user_text, knowledge_context)
# استبدال النص الأصلي بالـ prompt المحسن
message = message.copy()
message["text"] = enhanced_prompt
messages = []
if system_prompt:
messages.append({"role": "system", "content": [{"type": "text", "text": system_prompt}]})
messages.extend(process_history(history))
messages.append({"role": "user", "content": process_new_user_message(message)})
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
)
n_tokens = inputs["input_ids"].shape[1]
if n_tokens > MAX_INPUT_TOKENS:
gr.Warning(
f"Input too long. Max {MAX_INPUT_TOKENS} tokens. Got {n_tokens} tokens. This limit is set to avoid CUDA out-of-memory errors in this Space."
)
yield ""
return
inputs = inputs.to(device=model.device, dtype=torch.bfloat16)
streamer = TextIteratorStreamer(processor, timeout=30.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=0.7, # قللنا الحرارة للردود أكثر منطقية
top_k=50, # قللنا للحصول على ردود أكثر تركيز
top_p=0.9, # قللنا للحصول على ردود أكثر دقة
min_p=0.0,
repetition_penalty=1.1, # زدنا شوي لتجنب التكرار
disable_compile=True,
)
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
output = ""
for delta in streamer:
output += delta
yield output
# Examples for the chat interface (with additional inputs: system_prompt, max_new_tokens)
examples = [
["أهلاً، شنو خدمات شركتكم؟", "", 800],
["عندي مشكلة بالواي فاي، ما يظهر عندي", "", 600],
["شگد أسعار باقات الإنترنت عندكم؟", "", 700],
["الإنترنت عندي ضعيف، شنو الحل؟", "", 600],
["كيف أقدر أغير كلمة سر الراوتر؟", "", 500],
["وين مقر الشركة ومعلومات التواصل؟", "", 400]
]
# System prompt محسن للدعم الفني العراقي
system_prompt = """أنت مساعد دعم فني ذكي لشركة NBTEL العراقية.
تعليمات مهمة:
- تحدث باللهجة العراقية البغدادية الودية والطبيعية
- كن مساعد ومرح في التعامل مع العملاء
- ركز على حل المشاكل التقنية وتقديم المساعدة العملية
- استخدم المعلومات من قاعدة المعرفة عندما تكون متوفرة
- إذا ما تعرف جواب محدد، اعترف واطلب التواصل مع الدعم المباشر
أسلوب الكلام العراقي:
- استخدم "شلونك" بدلاً من "كيف حالك"
- استخدم "شنو" بدلاً من "ماذا"
- استخدم "وين" بدلاً من "أين"
- استخدم "احنا" بدلاً من "نحن"
- استخدم "اكو" بدلاً من "يوجد"
- استخدم "شگد" بدلاً من "كم"
كن ودود ومساعد دائماً!"""
# Create the chat interface
demo = gr.ChatInterface(
fn=generate,
type="messages",
textbox=gr.MultimodalTextbox(
file_types=list(IMAGE_FILE_TYPES + VIDEO_FILE_TYPES + AUDIO_FILE_TYPES),
file_count="multiple",
autofocus=True,
placeholder="اكتب استفسارك هنا... مثل: عندي مشكلة بالإنترنت، أو شنو خدماتكم؟"
),
multimodal=True,
additional_inputs=[
gr.Textbox(
label="System Prompt (اختياري)",
value=system_prompt,
lines=3,
visible=False # مخفي للمستخدم العادي
),
gr.Slider(
label="Max New Tokens",
minimum=100,
maximum=2048,
step=50,
value=1024,
visible=False # مخفي للمستخدم العادي
),
],
title="🤖 مساعد NBTEL الذكي - الدعم الفني",
description=f"""
🌟 **أهلاً وسهلاً بيك في مساعد NBTEL الذكي!**
احنا هنا نساعدك في:
• 🔧 حل مشاكل الإنترنت والواي فاي
• 📋 معلومات عن خدماتنا وأسعارنا
• 📞 التواصل والدعم الفني
• 🛠️ إرشادات تقنية مفصلة
{"✅ **نظام المعرفة متصل** - عندي معلومات شاملة عن الشركة والخدمات" if RAG_ENABLED else "⚠️ **نظام المعرفة غير متصل** - راح أساعدك بالمعلومات العامة"}
**للدعم المباشر**: 📞 6337 | 📱 واتساب: 0773 633 7777
""",
examples=examples,
stop_btn="إيقاف",
css="""
.gradio-container, .chatbot, .chatbot * {
direction: rtl !important;
text-align: right !important;
unicode-bidi: plaintext !important;
font-family: 'Tajawal', 'Cairo', 'Segoe UI', sans-serif;
}
.chatbot .message {
padding: 15px !important;
margin: 10px !important;
border-radius: 15px !important;
}
.chatbot .message.user {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
}
.chatbot .message.bot {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%) !important;
color: white !important;
}
.title {
color: #2c3e50 !important;
font-size: 2em !important;
font-weight: bold !important;
text-align: center !important;
}
"""
)
if __name__ == "__main__":
demo.launch()