import os import pathlib import tempfile from collections.abc import Iterator from threading import Thread import av import gradio as gr import spaces import torch from transformers import AutoModelForImageTextToText, AutoProcessor from transformers.generation.streamers import TextIteratorStreamer # استيراد نظام RAG from simple_rag import SimpleRAG # Model configuration model_id = "anaspro/Shako-4B-it-v5" processor = AutoProcessor.from_pretrained(model_id) model = AutoModelForImageTextToText.from_pretrained( model_id, device_map="auto", torch_dtype=torch.bfloat16 ) # Supported file types IMAGE_FILE_TYPES = (".jpg", ".jpeg", ".png", ".webp") VIDEO_FILE_TYPES = (".mp4", ".mov", ".webm") AUDIO_FILE_TYPES = (".mp3", ".wav") # Video processing settings TARGET_FPS = int(os.getenv("TARGET_FPS", "3")) MAX_FRAMES = int(os.getenv("MAX_FRAMES", "30")) MAX_INPUT_TOKENS = int(os.getenv("MAX_INPUT_TOKENS", "10_000")) # تهيئة نظام RAG للدعم الفني print("🔄 جاري تهيئة نظام RAG للدعم الفني...") try: rag_system = SimpleRAG() # محاولة تحميل فهرس موجود if not rag_system.load_index(): print("🔄 إنشاء فهرس جديد...") # تحميل ملف شركة NBTEL nbtel_file = "./data/nbtel_company_profile.md" if os.path.exists(nbtel_file): print(f"📁 وجد ملف البيانات: {nbtel_file}") documents = rag_system.load_markdown_file(nbtel_file) if documents: rag_system.add_documents(documents) rag_system.build_index() rag_system.save_index() print("✅ تم إنشاء فهرس RAG بنجاح") else: print("⚠️ لم يتم استخراج أي مستندات من الملف") else: print(f"⚠️ لم يتم العثور على ملف البيانات: {nbtel_file}") # إنشاء بيانات تجريبية بسيطة sample_docs = [ { 'title': 'معلومات أساسية عن NBTEL', 'content': 'شركة NBTEL عراقية متخصصة في خدمات الإنترنت والاتصالات. نقدم خدمات WiFi و FTTX في محافظات نينوى وكركوك وصلاح الدين.', 'source': 'fallback' }, { 'title': 'معلومات التواصل', 'content': 'للدعم الفني: 6337، واتساب: 0773 633 7777، إيميل: Info@nbtel.iq', 'source': 'fallback' } ] rag_system.add_documents(sample_docs) rag_system.build_index() rag_system.save_index() print("✅ تم إنشاء فهرس تجريبي") print(f"✅ نظام RAG جاهز - {len(rag_system.documents)} مستند") RAG_ENABLED = True except Exception as e: print(f"❌ خطأ في تهيئة نظام RAG: {str(e)}") print("⚠️ سيتم تشغيل النظام بدون RAG") rag_system = None RAG_ENABLED = False def get_file_type(path: str) -> str: if path.endswith(IMAGE_FILE_TYPES): return "image" if path.endswith(VIDEO_FILE_TYPES): return "video" if path.endswith(AUDIO_FILE_TYPES): return "audio" error_message = f"Unsupported file type: {path}" raise ValueError(error_message) def search_knowledge_base(query: str) -> str: """البحث في قاعدة المعرفة باستخدام RAG""" if not RAG_ENABLED or rag_system is None: return "" try: # البحث في قاعدة المعرفة context = rag_system.get_context_for_query(query, max_results=3) if context and "لم أجد معلومات" not in context: return f"\n\n📚 معلومات من قاعدة المعرفة:\n{context}" else: return "" except Exception as e: print(f"خطأ في البحث في قاعدة المعرفة: {str(e)}") return "" def create_technical_support_prompt(user_message: str, knowledge_context: str = "") -> str: """إنشاء prompt محسن للدعم الفني""" base_prompt = """أنت مساعد دعم فني ذكي لشركة NBTEL العراقية. هويتك ومهامك: - اسمك: مساعد NBTEL الذكي - تتحدث باللهجة العراقية الودية والمرحة - خبير في خدمات الإنترنت والاتصالات والدعم الفني - تساعد العملاء في حل مشاكلهم التقنية - تقدم معلومات عن خدمات وأسعار الشركة قواعد التعامل: 1. رحب بالعميل بطريقة عراقية ودية (أهلاً وسهلاً، مرحبا بيك، الخ) 2. استخدم المعلومات من قاعدة المعرفة إن وجدت 3. اطرح أسئلة توضيحية لفهم المشكلة بدقة 4. قدم حلول عملية خطوة بخطوة 5. استخدم أمثلة عراقية مفهومة 6. كن صبور ومساعد مع العملاء 7. إذا ما تعرف الجواب، اعترف واطلب التواصل مع الدعم المباشر أسلوب الكلام: - استخدم "احنا" بدلاً من "نحن" - استخدم "شلونك" بدلاً من "كيف حالك" - استخدم "شنو" بدلاً من "ماذا" - استخدم "وين" بدلاً من "أين" - استخدم "اكو" بدلاً من "يوجد" - اجعل الكلام طبيعي ومرح معلومات الشركة الأساسية: - شركة NBTEL عراقية متخصصة بخدمات الإنترنت والاتصالات - نخدم محافظات نينوى، كركوك، وصلاح الدين - نقدم خدمات WiFi و FTTX (الكيبل الضوئي) - مقرنا الرئيسي في الموصل - رقم الدعم الفني: 6337 - واتساب: 0773 633 7777 """ if knowledge_context: enhanced_prompt = f"""{base_prompt} {knowledge_context} الاستفسار: {user_message} جاوب بطريقة ودية وعملية، واستخدم المعلومات اللي فوق إذا كانت مفيدة:""" else: enhanced_prompt = f"""{base_prompt} الاستفسار: {user_message} جاوب بطريقة ودية وعملية:""" return enhanced_prompt def count_files_in_new_message(paths: list[str]) -> tuple[int, int]: video_count = 0 non_video_count = 0 for path in paths: if path.endswith(VIDEO_FILE_TYPES): video_count += 1 else: non_video_count += 1 return video_count, non_video_count def validate_media_constraints(message: dict) -> bool: video_count, non_video_count = count_files_in_new_message(message["files"]) if video_count > 1: gr.Warning("Only one video is supported.") return False if video_count == 1 and non_video_count > 0: gr.Warning("Mixing images and videos is not allowed.") return False return True def extract_frames_to_tempdir( video_path: str, target_fps: float, max_frames: int | None = None, parent_dir: str | None = None, prefix: str = "frames_", ) -> str: temp_dir = tempfile.mkdtemp(prefix=prefix, dir=parent_dir) container = av.open(video_path) video_stream = container.streams.video[0] if video_stream.duration is None or video_stream.time_base is None: raise ValueError("video_stream is missing duration or time_base") time_base = video_stream.time_base duration = float(video_stream.duration * time_base) interval = 1.0 / target_fps total_frames = int(duration * target_fps) if max_frames is not None: total_frames = min(total_frames, max_frames) target_times = [i * interval for i in range(total_frames)] target_index = 0 for frame in container.decode(video=0): if frame.pts is None: continue timestamp = float(frame.pts * time_base) if target_index < len(target_times) and abs(timestamp - target_times[target_index]) < (interval / 2): frame_path = pathlib.Path(temp_dir) / f"frame_{target_index:04d}.jpg" frame.to_image().save(frame_path) target_index += 1 if max_frames is not None and target_index >= max_frames: break container.close() return temp_dir def process_new_user_message(message: dict) -> list[dict]: if not message["files"]: return [{"type": "text", "text": message["text"]}] file_types = [get_file_type(path) for path in message["files"]] if len(file_types) == 1 and file_types[0] == "video": gr.Info(f"Video will be processed at {TARGET_FPS} FPS, max {MAX_FRAMES} frames in this Space.") temp_dir = extract_frames_to_tempdir( message["files"][0], target_fps=TARGET_FPS, max_frames=MAX_FRAMES, ) paths = sorted(pathlib.Path(temp_dir).glob("*.jpg")) return [ {"type": "text", "text": message["text"]}, *[{"type": "image", "image": path.as_posix()} for path in paths], ] return [ {"type": "text", "text": message["text"]}, *[{"type": file_type, file_type: path} for path, file_type in zip(message["files"], file_types, strict=True)], ] def process_history(history: list[dict]) -> list[dict]: messages = [] current_user_content: list[dict] = [] for item in history: if item["role"] == "assistant": if current_user_content: messages.append({"role": "user", "content": current_user_content}) current_user_content = [] messages.append({"role": "assistant", "content": [{"type": "text", "text": item["content"]}]}) else: content = item["content"] if isinstance(content, str): current_user_content.append({"type": "text", "text": content}) else: filepath = content[0] file_type = get_file_type(filepath) current_user_content.append({"type": file_type, file_type: filepath}) return messages @spaces.GPU() @torch.inference_mode() def generate(message: dict, history: list[dict], system_prompt: str = "", max_new_tokens: int = 512) -> Iterator[str]: if not validate_media_constraints(message): yield "" return # استخراج النص من الرسالة للبحث في قاعدة المعرفة user_text = message.get("text", "") # البحث في قاعدة المعرفة knowledge_context = search_knowledge_base(user_text) if user_text else "" # إنشاء prompt محسن للدعم الفني if user_text: enhanced_prompt = create_technical_support_prompt(user_text, knowledge_context) # استبدال النص الأصلي بالـ prompt المحسن message = message.copy() message["text"] = enhanced_prompt messages = [] if system_prompt: messages.append({"role": "system", "content": [{"type": "text", "text": system_prompt}]}) messages.extend(process_history(history)) messages.append({"role": "user", "content": process_new_user_message(message)}) inputs = processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ) n_tokens = inputs["input_ids"].shape[1] if n_tokens > MAX_INPUT_TOKENS: gr.Warning( f"Input too long. Max {MAX_INPUT_TOKENS} tokens. Got {n_tokens} tokens. This limit is set to avoid CUDA out-of-memory errors in this Space." ) yield "" return inputs = inputs.to(device=model.device, dtype=torch.bfloat16) streamer = TextIteratorStreamer(processor, timeout=30.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( inputs, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.7, # قللنا الحرارة للردود أكثر منطقية top_k=50, # قللنا للحصول على ردود أكثر تركيز top_p=0.9, # قللنا للحصول على ردود أكثر دقة min_p=0.0, repetition_penalty=1.1, # زدنا شوي لتجنب التكرار disable_compile=True, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() output = "" for delta in streamer: output += delta yield output # Examples for the chat interface (with additional inputs: system_prompt, max_new_tokens) examples = [ ["أهلاً، شنو خدمات شركتكم؟", "", 800], ["عندي مشكلة بالواي فاي، ما يظهر عندي", "", 600], ["شگد أسعار باقات الإنترنت عندكم؟", "", 700], ["الإنترنت عندي ضعيف، شنو الحل؟", "", 600], ["كيف أقدر أغير كلمة سر الراوتر؟", "", 500], ["وين مقر الشركة ومعلومات التواصل؟", "", 400] ] # System prompt محسن للدعم الفني العراقي system_prompt = """أنت مساعد دعم فني ذكي لشركة NBTEL العراقية. تعليمات مهمة: - تحدث باللهجة العراقية البغدادية الودية والطبيعية - كن مساعد ومرح في التعامل مع العملاء - ركز على حل المشاكل التقنية وتقديم المساعدة العملية - استخدم المعلومات من قاعدة المعرفة عندما تكون متوفرة - إذا ما تعرف جواب محدد، اعترف واطلب التواصل مع الدعم المباشر أسلوب الكلام العراقي: - استخدم "شلونك" بدلاً من "كيف حالك" - استخدم "شنو" بدلاً من "ماذا" - استخدم "وين" بدلاً من "أين" - استخدم "احنا" بدلاً من "نحن" - استخدم "اكو" بدلاً من "يوجد" - استخدم "شگد" بدلاً من "كم" كن ودود ومساعد دائماً!""" # Create the chat interface demo = gr.ChatInterface( fn=generate, type="messages", textbox=gr.MultimodalTextbox( file_types=list(IMAGE_FILE_TYPES + VIDEO_FILE_TYPES + AUDIO_FILE_TYPES), file_count="multiple", autofocus=True, placeholder="اكتب استفسارك هنا... مثل: عندي مشكلة بالإنترنت، أو شنو خدماتكم؟" ), multimodal=True, additional_inputs=[ gr.Textbox( label="System Prompt (اختياري)", value=system_prompt, lines=3, visible=False # مخفي للمستخدم العادي ), gr.Slider( label="Max New Tokens", minimum=100, maximum=2048, step=50, value=1024, visible=False # مخفي للمستخدم العادي ), ], title="🤖 مساعد NBTEL الذكي - الدعم الفني", description=f""" 🌟 **أهلاً وسهلاً بيك في مساعد NBTEL الذكي!** احنا هنا نساعدك في: • 🔧 حل مشاكل الإنترنت والواي فاي • 📋 معلومات عن خدماتنا وأسعارنا • 📞 التواصل والدعم الفني • 🛠️ إرشادات تقنية مفصلة {"✅ **نظام المعرفة متصل** - عندي معلومات شاملة عن الشركة والخدمات" if RAG_ENABLED else "⚠️ **نظام المعرفة غير متصل** - راح أساعدك بالمعلومات العامة"} **للدعم المباشر**: 📞 6337 | 📱 واتساب: 0773 633 7777 """, examples=examples, stop_btn="إيقاف", css=""" .gradio-container, .chatbot, .chatbot * { direction: rtl !important; text-align: right !important; unicode-bidi: plaintext !important; font-family: 'Tajawal', 'Cairo', 'Segoe UI', sans-serif; } .chatbot .message { padding: 15px !important; margin: 10px !important; border-radius: 15px !important; } .chatbot .message.user { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; } .chatbot .message.bot { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%) !important; color: white !important; } .title { color: #2c3e50 !important; font-size: 2em !important; font-weight: bold !important; text-align: center !important; } """ ) if __name__ == "__main__": demo.launch()