Spaces:
No application file
No application file
| import os | |
| import json | |
| import time | |
| import asyncio | |
| import logging | |
| import hashlib | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Union | |
| from concurrent.futures import ThreadPoolExecutor | |
| from fastapi import FastAPI, HTTPException, Request, UploadFile, File, WebSocket, WebSocketDisconnect, Depends | |
| from fastapi.responses import StreamingResponse, HTMLResponse, FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.security import APIKeyHeader | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from langdetect import detect, DetectorFactory | |
| import numpy as np | |
| import pandas as pd | |
| import cv2 | |
| import torch | |
| from PIL import Image | |
| import moviepy.editor as mp | |
| # تحميل نماذج الذكاء الاصطناعي | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from diffusers import StableDiffusionPipeline, EulerDiscreteScheduler | |
| from tensorflow.keras.models import load_model | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from transformers import BitsAndBytesConfig | |
| # التهيئة الأساسية | |
| DetectorFactory.seed = 0 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("MarkAI") | |
| app = FastAPI( | |
| title="MarkAI - الذكاء الاصطناعي المتكامل", | |
| version="2.0", | |
| description="منصة متكاملة للذكاء الاصطناعي تدعم توليد النصوص، الأكواد، الصور والفيديوهات مع نظام ذاكرة متقدم", | |
| contact={ | |
| "name": "Ibrahim Lasfar", | |
| "email": "ibrahim@markai.com" | |
| }, | |
| license_info={ | |
| "name": "MIT License", | |
| } | |
| ) | |
| # إعدادات CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # تهيئة مجلدات التخزين | |
| os.makedirs("uploads", exist_ok=True) | |
| os.makedirs("memory/conversations", exist_ok=True) | |
| os.makedirs("memory/projects", exist_ok=True) | |
| os.makedirs("memory/code", exist_ok=True) | |
| os.makedirs("memory/backups", exist_ok=True) | |
| # 1. نماذج اللغات المدعومة (محدثة مع النماذج الكبيرة) | |
| LANGUAGE_MODELS = { | |
| # النماذج الصغيرة (افتراضية) | |
| "en": "gpt2-medium", | |
| "ar": "arbml/gpt2-arabic-poetry", | |
| "zh": "bert-base-chinese", | |
| "ja": "colorfulscoop/gpt2-small-ja", | |
| "fr": "dbmdz/gpt2-french", | |
| "de": "dbmdz/gpt2-german", | |
| "it": "LorenzoDeMattei/GePpeTto", | |
| "hi": "surajpai/GPT2-Hindi", | |
| "code": "codeparrot/codeparrot-small", | |
| # النماذج الكبيرة | |
| "en-large": "EleutherAI/gpt-j-6B", | |
| "ar-large": "bigscience/bloom-7b1", | |
| "code-large": "tiiuae/falcon-7b" | |
| } | |
| # 2. نظام الأمان والمفاتيح | |
| API_KEY_HEADER = APIKeyHeader(name="X-API-KEY") | |
| def load_api_keys(): | |
| try: | |
| with open("memory/api_keys.json", "r") as f: | |
| return json.load(f) | |
| except: | |
| return {"demo_key": "demo123"} # مفتاح تجريبي افتراضي | |
| def save_api_keys(keys): | |
| with open("memory/api_keys.json", "w") as f: | |
| json.dump(keys, f) | |
| def authenticate(api_key: str): | |
| keys = load_api_keys() | |
| return api_key in keys.values() | |
| # 3. نظام الذاكرة المتقدم | |
| class AIMemory: | |
| def __init__(self): | |
| self.conversations = {} | |
| self.projects = {} | |
| self.code_repository = {} | |
| self.load_all_data() | |
| def load_all_data(self): | |
| """تحميل جميع البيانات من الملفات""" | |
| try: | |
| # تحميل المحادثات | |
| for conv_file in os.listdir("memory/conversations"): | |
| if conv_file.endswith(".json"): | |
| conv_id = conv_file.split(".")[0] | |
| with open(f"memory/conversations/{conv_file}", "r", encoding="utf-8") as f: | |
| self.conversations[conv_id] = json.load(f) | |
| # تحميل المشاريع | |
| if os.path.exists("memory/projects/projects.json"): | |
| with open("memory/projects/projects.json", "r", encoding="utf-8") as f: | |
| self.projects = json.load(f) | |
| # تحميل مستودع الأكواد | |
| if os.path.exists("memory/code/code_repository.json"): | |
| with open("memory/code/code_repository.json", "r", encoding="utf-8") as f: | |
| self.code_repository = json.load(f) | |
| except Exception as e: | |
| logger.error(f"Error loading data: {str(e)}") | |
| def create_conversation(self, initial_prompt: str) -> str: | |
| """إنشاء محادثة جديدة مع تسمية تلقائية""" | |
| conv_id = hashlib.md5(f"{initial_prompt}{datetime.now()}".encode()).hexdigest()[:10] | |
| conv_name = initial_prompt[:30] + "..." if len(initial_prompt) > 30 else initial_prompt | |
| conversation = { | |
| "id": conv_id, | |
| "name": conv_name, | |
| "created_at": str(datetime.now()), | |
| "updated_at": str(datetime.now()), | |
| "messages": [], | |
| "context": [], | |
| "status": "active" | |
| } | |
| self.conversations[conv_id] = conversation | |
| self.save_conversation(conv_id) | |
| return conv_id | |
| def save_conversation(self, conv_id: str): | |
| """حفظ محادثة معينة""" | |
| if conv_id in self.conversations: | |
| try: | |
| with open(f"memory/conversations/{conv_id}.json", "w", encoding="utf-8") as f: | |
| json.dump(self.conversations[conv_id], f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving conversation {conv_id}: {str(e)}") | |
| def add_message(self, conv_id: str, role: str, content: str, metadata: dict = {}): | |
| """إضافة رسالة إلى المحادثة""" | |
| if conv_id not in self.conversations: | |
| raise ValueError("المحادثة غير موجودة") | |
| message = { | |
| "role": role, | |
| "content": content, | |
| "timestamp": str(datetime.now()), | |
| "metadata": metadata | |
| } | |
| self.conversations[conv_id]["messages"].append(message) | |
| self.conversations[conv_id]["updated_at"] = str(datetime.now()) | |
| self.save_conversation(conv_id) | |
| def get_conversation_context(self, conv_id: str, max_messages: int = 10) -> List[dict]: | |
| """الحصول على سياق المحادثة""" | |
| if conv_id not in self.conversations: | |
| return [] | |
| return self.conversations[conv_id]["messages"][-max_messages:] | |
| def create_project(self, name: str, description: str, project_type: str) -> str: | |
| """إنشاء مشروع جديد""" | |
| project_id = hashlib.md5(f"{name}{datetime.now()}".encode()).hexdigest()[:8] | |
| project = { | |
| "id": project_id, | |
| "name": name, | |
| "description": description, | |
| "type": project_type, | |
| "created_at": str(datetime.now()), | |
| "updated_at": str(datetime.now()), | |
| "status": "active", | |
| "files": [], | |
| "conversations": [] | |
| } | |
| self.projects[project_id] = project | |
| self.save_projects() | |
| return project_id | |
| def save_projects(self): | |
| """حفظ جميع المشاريع""" | |
| try: | |
| with open("memory/projects/projects.json", "w", encoding="utf-8") as f: | |
| json.dump(self.projects, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving projects: {str(e)}") | |
| def save_code_snippet(self, code: str, language: str, purpose: str, metadata: dict = {}): | |
| """حفظ جزء من الكود في المستودع""" | |
| code_id = hashlib.md5(f"{code}{datetime.now()}".encode()).hexdigest()[:8] | |
| snippet = { | |
| "id": code_id, | |
| "code": code, | |
| "language": language, | |
| "purpose": purpose, | |
| "metadata": metadata, | |
| "created_at": str(datetime.now()), | |
| "usage_count": 0 | |
| } | |
| self.code_repository[code_id] = snippet | |
| self.save_code_repository() | |
| return code_id | |
| def save_code_repository(self): | |
| """حفظ مستودع الأكواد""" | |
| try: | |
| with open("memory/code/code_repository.json", "w", encoding="utf-8") as f: | |
| json.dump(self.code_repository, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving code repository: {str(e)}") | |
| def backup_data(self): | |
| """إنشاء نسخة احتياطية لجميع البيانات""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_dir = f"memory/backups/{timestamp}" | |
| os.makedirs(backup_dir, exist_ok=True) | |
| try: | |
| # نسخ المحادثات | |
| os.makedirs(f"{backup_dir}/conversations", exist_ok=True) | |
| for conv_id, conv_data in self.conversations.items(): | |
| with open(f"{backup_dir}/conversations/{conv_id}.json", "w", encoding="utf-8") as f: | |
| json.dump(conv_data, f, ensure_ascii=False, indent=2) | |
| # نسخ المشاريع | |
| with open(f"{backup_dir}/projects.json", "w", encoding="utf-8") as f: | |
| json.dump(self.projects, f, ensure_ascii=False, indent=2) | |
| # نسخ الأكواد | |
| with open(f"{backup_dir}/code_repository.json", "w", encoding="utf-8") as f: | |
| json.dump(self.code_repository, f, ensure_ascii=False, indent=2) | |
| return backup_dir | |
| except Exception as e: | |
| logger.error(f"Error during backup: {str(e)}") | |
| return None | |
| memory = AIMemory() | |
| # 4. نظام التقييم والتحليل | |
| class AnalyticsEngine: | |
| def __init__(self): | |
| self.sentiment_model = pipeline("sentiment-analysis") | |
| self.tfidf = TfidfVectorizer() | |
| def analyze_sentiment(self, text: str) -> dict: | |
| """تحليل المشاعر للنص""" | |
| try: | |
| result = self.sentiment_model(text)[0] | |
| return { | |
| "sentiment": result["label"], | |
| "score": result["score"], | |
| "positive": result["label"] == "POSITIVE", | |
| "negative": result["label"] == "NEGATIVE" | |
| } | |
| except Exception as e: | |
| logger.warning(f"Sentiment analysis failed, using fallback: {str(e)}") | |
| # Fallback basic sentiment analysis | |
| positive_words = ["good", "great", "excellent", "happy", "جيد", "رائع", "ممتاز", "سعيد"] | |
| negative_words = ["bad", "terrible", "awful", "sad", "سيء", "فظيع", "مزعج", "حزين"] | |
| positive_count = sum(text.lower().count(word) for word in positive_words) | |
| negative_count = sum(text.lower().count(word) for word in negative_words) | |
| if positive_count > negative_count: | |
| return {"sentiment": "POSITIVE", "score": positive_count/(positive_count+negative_count+1)} | |
| elif negative_count > positive_count: | |
| return {"sentiment": "NEGATIVE", "score": negative_count/(positive_count+negative_count+1)} | |
| else: | |
| return {"sentiment": "NEUTRAL", "score": 0.5} | |
| def evaluate_response(self, prompt: str, response: str) -> dict: | |
| """تقييم جودة الرد""" | |
| # تحليل طول الرد | |
| length_score = min(len(response.split()) / 100, 1.0) | |
| # تحليل التنوع | |
| unique_words = len(set(response.split())) | |
| diversity_score = min(unique_words / 50, 1.0) | |
| # تحليل الصلة بالموضوع | |
| try: | |
| vectors = self.tfidf.fit_transform([prompt, response]) | |
| relevance_score = cosine_similarity(vectors[0:1], vectors[1:2])[0][0] | |
| except Exception as e: | |
| logger.warning(f"TF-IDF analysis failed: {str(e)}") | |
| relevance_score = 0.7 # قيمة افتراضية في حالة الخطأ | |
| # تحليل المشاعر | |
| sentiment = self.analyze_sentiment(response) | |
| return { | |
| "length_score": length_score, | |
| "diversity_score": diversity_score, | |
| "relevance_score": relevance_score, | |
| "sentiment": sentiment, | |
| "overall_score": (length_score + diversity_score + relevance_score + sentiment["score"]) / 4 | |
| } | |
| analytics = AnalyticsEngine() | |
| # 5. المحرك الأساسي للذكاء الاصطناعي | |
| class AIEngine: | |
| def __init__(self): | |
| self.executor = ThreadPoolExecutor(max_workers=8) | |
| self.models = {} | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_quant_type="nf4" | |
| ) | |
| async def load_model(self, model_type: str, model_name: str = None, use_large: bool = False): | |
| """تحميل نموذج معين""" | |
| model_key = f"{model_type}-large" if use_large else model_type | |
| if model_key not in self.models: | |
| try: | |
| if model_type == "text": | |
| model_name = model_name or (LANGUAGE_MODELS.get(f"{model_type}-large") if use_large else LANGUAGE_MODELS.get("en")) | |
| if use_large: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| quantization_config=self.quantization_config, | |
| device_map="auto", | |
| torch_dtype=torch.float16 | |
| ) | |
| else: | |
| model = AutoModelForCausalLM.from_pretrained(model_name).to(self.device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.models[model_key] = {"tokenizer": tokenizer, "model": model} | |
| elif model_type == "image": | |
| scheduler = EulerDiscreteScheduler.from_pretrained("stabilityai/stable-diffusion-2", subfolder="scheduler") | |
| model = StableDiffusionPipeline.from_pretrained( | |
| "stabilityai/stable-diffusion-2", | |
| scheduler=scheduler, | |
| torch_dtype=torch.float16 | |
| ).to(self.device) | |
| self.models[model_key] = model | |
| elif model_type == "code": | |
| if use_large: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| LANGUAGE_MODELS["code-large"], | |
| quantization_config=self.quantization_config, | |
| device_map="auto", | |
| torch_dtype=torch.float16 | |
| ) | |
| else: | |
| model = AutoModelForCausalLM.from_pretrained(LANGUAGE_MODELS["code"]).to(self.device) | |
| tokenizer = AutoTokenizer.from_pretrained(LANGUAGE_MODELS["code-large"] if use_large else LANGUAGE_MODELS["code"]) | |
| self.models[model_key] = {"tokenizer": tokenizer, "model": model} | |
| logger.info(f"تم تحميل النموذج بنجاح: {model_key}") | |
| except Exception as e: | |
| logger.error(f"خطأ في تحميل النموذج: {str(e)}") | |
| raise | |
| return self.models[model_key] | |
| async def generate_text(self, prompt: str, lang: str = None, max_length: int = 300, use_large: bool = False) -> str: | |
| """توليد نص بناء على المطالبة""" | |
| if not lang: | |
| try: | |
| lang = detect(prompt) | |
| except: | |
| lang = "en" | |
| model_name = LANGUAGE_MODELS.get(f"{lang}-large" if use_large else lang, | |
| LANGUAGE_MODELS.get("en-large" if use_large else "en")) | |
| model = await self.load_model("text", model_name, use_large) | |
| inputs = model["tokenizer"](prompt, return_tensors="pt").to(self.device) | |
| outputs = model["model"].generate(**inputs, max_length=max_length, do_sample=True, top_k=50, top_p=0.95) | |
| return model["tokenizer"].decode(outputs[0], skip_special_tokens=True) | |
| async def generate_code(self, prompt: str, language: str = "python", max_length: int = 500, use_large: bool = False) -> str: | |
| """توليد كود برمجي""" | |
| model = await self.load_model("code", use_large=use_large) | |
| prompt = f"# Language: {language}\n# Description: {prompt}\n# Code:\n" | |
| inputs = model["tokenizer"](prompt, return_tensors="pt").to(self.device) | |
| outputs = model["model"].generate(**inputs, max_length=max_length, do_sample=True, top_k=50, top_p=0.95) | |
| generated_code = model["tokenizer"].decode(outputs[0], skip_special_tokens=True) | |
| # حفظ الكود في المستودع | |
| code_id = memory.save_code_snippet( | |
| code=generated_code, | |
| language=language, | |
| purpose=prompt[:100], | |
| metadata={ | |
| "generated_at": str(datetime.now()), | |
| "model_used": "large" if use_large else "base" | |
| } | |
| ) | |
| return generated_code | |
| async def generate_image(self, prompt: str, save_path: str = None) -> str: | |
| """توليد صورة من النص""" | |
| model = await self.load_model("image") | |
| if not save_path: | |
| save_path = f"uploads/generated_image_{int(time.time())}.png" | |
| image = model(prompt).images[0] | |
| image.save(save_path) | |
| return save_path | |
| async def generate_video(self, prompt: str, duration: int = 5, fps: int = 24) -> str: | |
| """توليد فيديو من النص (محاكاة)""" | |
| save_path = f"uploads/generated_video_{int(time.time())}.mp4" | |
| # إنشاء فيديو مع نص (استخدام صورة سوداء كخلفية) | |
| clip = mp.ColorClip(size=(640, 480), color=(0, 0, 0), duration=duration) | |
| txt_clip = mp.TextClip(prompt, fontsize=24, color='white', size=clip.size).set_position('center').set_duration(duration) | |
| video = mp.CompositeVideoClip([clip, txt_clip]) | |
| video.write_videofile(save_path, fps=fps) | |
| return save_path | |
| async def analyze_code(self, code: str, language: str = "python") -> dict: | |
| """تحليل الكود وإعطاء تقييم""" | |
| # تحليل أساسي للكود | |
| analysis = { | |
| "length": len(code.split("\n")), | |
| "complexity": "low", | |
| "quality": "medium", | |
| "issues": [], | |
| "suggestions": [] | |
| } | |
| # تحليل أولي | |
| if len(code.split("\n")) > 50: | |
| analysis["complexity"] = "high" | |
| analysis["suggestions"].append("Consider breaking this into smaller functions/modules") | |
| if "TODO" in code or "FIXME" in code: | |
| analysis["issues"].append("Contains unfinished tasks (TODO/FIXME)") | |
| analysis["quality"] = "low" | |
| if language == "python" and "print(" in code: | |
| analysis["suggestions"].append("Consider using logging instead of print statements for production code") | |
| return analysis | |
| async def improve_code(self, code: str, language: str, improvements: List[str]) -> str: | |
| """تحسين الكود بناء على طلبات محددة""" | |
| improved_code = code | |
| # تطبيق التحسينات الأساسية | |
| if "add_comments" in improvements: | |
| improved_code = f"# Improved by MarkAI at {datetime.now()}\n# Original code with enhancements\n\n{improved_code}" | |
| if "optimize" in improvements: | |
| improved_code = improved_code.replace("for i in range(len(", "for item in ") | |
| improved_code = improved_code.replace(".append(", " += [") | |
| if "add_error_handling" in improvements and language == "python": | |
| improved_code = f"try:\n {improved_code.replace('\n', '\n ')}\nexcept Exception as e:\n print(f\"An error occurred: {e}\")" | |
| return improved_code | |
| engine = AIEngine() | |
| # 6. نظام التفكير والتخطيط | |
| class ThinkingEngine: | |
| def __init__(self): | |
| self.planning_steps = { | |
| "text": { | |
| "ar": [ | |
| "🔍 تحليل الطلب والمتطلبات...", | |
| "🧠 معالجة البيانات والبحث...", | |
| "📚 استرجاع المعلومات ذات الصلة...", | |
| "✨ توليد الإجابة المثلى..." | |
| ], | |
| "en": [ | |
| "🔍 Analyzing request and requirements...", | |
| "🧠 Processing data and researching...", | |
| "📚 Retrieving relevant information...", | |
| "✨ Generating optimal response..." | |
| ] | |
| }, | |
| "code": { | |
| "ar": [ | |
| "🔍 تحليل متطلبات الكود...", | |
| "🧠 تصميم الخوارزمية...", | |
| "📚 البحث عن الحلول المثلى...", | |
| "✨ كتابة وتوليد الكود..." | |
| ], | |
| "en": [ | |
| "🔍 Analyzing code requirements...", | |
| "🧠 Designing algorithm...", | |
| "📚 Researching optimal solutions...", | |
| "✨ Writing and generating code..." | |
| ] | |
| }, | |
| "image": { | |
| "ar": [ | |
| "🔍 تحليل وصف الصورة...", | |
| "🧠 تكوين المفاهيم الفنية...", | |
| "🎨 رسم العناصر الأساسية...", | |
| "✨ إضافة اللمسات النهائية..." | |
| ], | |
| "en": [ | |
| "🔍 Analyzing image description...", | |
| "🧠 Composing artistic concepts...", | |
| "🎨 Sketching basic elements...", | |
| "✨ Adding final touches..." | |
| ] | |
| }, | |
| "video": { | |
| "ar": [ | |
| "🔍 تحليل السيناريو...", | |
| "🎬 إعداد القصة والمشاهد...", | |
| "🎞️ تركيب العناصر المرئية...", | |
| "✨ إضافة المؤثرات والصوت..." | |
| ], | |
| "en": [ | |
| "🔍 Analyzing scenario...", | |
| "🎬 Preparing storyboard and scenes...", | |
| "🎞️ Composing visual elements...", | |
| "✨ Adding effects and sound..." | |
| ] | |
| }, | |
| "project": { | |
| "ar": [ | |
| "🔍 تحليل متطلبات المشروع...", | |
| "📝 تحديد الهيكل الأساسي...", | |
| "🛠️ إعداد الملفات والموارد...", | |
| "✨ إنشاء المشروع الجديد..." | |
| ], | |
| "en": [ | |
| "🔍 Analyzing project requirements...", | |
| "📝 Defining basic structure...", | |
| "🛠️ Preparing files and resources...", | |
| "✨ Creating new project..." | |
| ] | |
| } | |
| } | |
| def get_thinking_steps(self, task_type: str, lang: str = "en") -> List[str]: | |
| """الحصول على خطوات التفكير حسب نوع المهمة واللغة""" | |
| return self.planning_steps.get(task_type, self.planning_steps["text"]).get(lang, self.planning_steps["text"]["en"]) | |
| async def generate_plan(self, prompt: str, task_type: str = "text") -> dict: | |
| """إنشاء خطة تنفيذية للمهمة""" | |
| try: | |
| lang = detect(prompt) | |
| except: | |
| lang = "en" | |
| steps = self.get_thinking_steps(task_type, lang) | |
| plan = { | |
| "task": prompt, | |
| "type": task_type, | |
| "language": lang, | |
| "steps": steps, | |
| "estimated_time": "30 seconds", # يمكن جعل هذا أكثر دقة | |
| "required_resources": ["CPU", "GPU"] if task_type in ["image", "video"] else ["CPU"], | |
| "created_at": str(datetime.now()) | |
| } | |
| return plan | |
| thinker = ThinkingEngine() | |
| # 7. نماذج طلبات API | |
| class GenerationRequest(BaseModel): | |
| prompt: str | |
| content_type: str = "text" # text, code, image, video, project | |
| language: Optional[str] = None | |
| conversation_id: Optional[str] = None | |
| improvements: Optional[List[str]] = None | |
| use_large_model: bool = False # إضافة خيار استخدام النماذج الكبيرة | |
| class ConversationRequest(BaseModel): | |
| initial_prompt: str | |
| project_id: Optional[str] = None | |
| use_large_model: bool = False | |
| class ProjectRequest(BaseModel): | |
| name: str | |
| description: str | |
| project_type: str # web, mobile, desktop, ai, other | |
| class CodeImprovementRequest(BaseModel): | |
| code: str | |
| language: str | |
| improvements: List[str] = Field(..., example=["add_comments", "optimize", "add_error_handling"]) | |
| use_large_model: bool = False | |
| # 8. نظام إدارة المحادثات عبر WebSocket | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: Dict[str, WebSocket] = {} | |
| async def connect(self, conversation_id: str, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections[conversation_id] = websocket | |
| def disconnect(self, conversation_id: str): | |
| if conversation_id in self.active_connections: | |
| del self.active_connections[conversation_id] | |
| async def send_message(self, conversation_id: str, message: str): | |
| if conversation_id in self.active_connections: | |
| await self.active_connections[conversation_id].send_text(message) | |
| manager = ConnectionManager() | |
| # 9. نقاط النهاية الأساسية | |
| async def start_conversation(request: ConversationRequest): | |
| """بدء محادثة جديدة""" | |
| conv_id = memory.create_conversation(request.initial_prompt) | |
| if request.project_id and request.project_id in memory.projects: | |
| memory.projects[request.project_id]["conversations"].append(conv_id) | |
| memory.save_projects() | |
| # إضافة الرسالة الأولى | |
| memory.add_message( | |
| conv_id=conv_id, | |
| role="user", | |
| content=request.initial_prompt, | |
| metadata={ | |
| "type": "text", | |
| "project_id": request.project_id, | |
| "use_large_model": request.use_large_model | |
| } | |
| ) | |
| return {"conversation_id": conv_id, "name": memory.conversations[conv_id]["name"]} | |
| async def websocket_conversation(websocket: WebSocket, conversation_id: str): | |
| """محادثة في الوقت الحقيقي عبر WebSocket""" | |
| await manager.connect(conversation_id, websocket) | |
| try: | |
| while True: | |
| data = await websocket.receive_text() | |
| message = json.loads(data) | |
| if message["type"] == "user_message": | |
| # حفظ رسالة المستخدم | |
| memory.add_message( | |
| conv_id=conversation_id, | |
| role="user", | |
| content=message["content"], | |
| metadata={ | |
| "type": message.get("content_type", "text"), | |
| "use_large_model": message.get("use_large_model", False) | |
| } | |
| ) | |
| # إنشاء خطة للرد | |
| content_type = message.get("content_type", "text") | |
| plan = await thinker.generate_plan(message["content"], content_type) | |
| # إرسال خطوات التفكير | |
| for step in plan["steps"]: | |
| await manager.send_message(conversation_id, json.dumps({ | |
| "type": "thinking", | |
| "content": step | |
| })) | |
| await asyncio.sleep(1) | |
| # توليد الرد | |
| use_large = message.get("use_large_model", False) | |
| if content_type == "text": | |
| response = await engine.generate_text( | |
| message["content"], | |
| use_large=use_large | |
| ) | |
| elif content_type == "code": | |
| response = await engine.generate_code( | |
| message["content"], | |
| message.get("language", "python"), | |
| use_large=use_large | |
| ) | |
| elif content_type == "image": | |
| image_path = await engine.generate_image(message["content"]) | |
| response = f"IMAGE_GENERATED:{image_path}" | |
| elif content_type == "video": | |
| video_path = await engine.generate_video(message["content"]) | |
| response = f"VIDEO_GENERATED:{video_path}" | |
| else: | |
| response = "نوع المحتوى غير مدعوم" | |
| # تحليل الرد | |
| evaluation = analytics.evaluate_response(message["content"], response) | |
| # حفظ الرد | |
| memory.add_message( | |
| conv_id=conversation_id, | |
| role="assistant", | |
| content=response, | |
| metadata={ | |
| "type": content_type, | |
| "evaluation": evaluation, | |
| "plan": plan, | |
| "model_used": "large" if use_large else "base" | |
| } | |
| ) | |
| # إرسال الرد النهائي | |
| await manager.send_message(conversation_id, json.dumps({ | |
| "type": "assistant_response", | |
| "content": response, | |
| "evaluation": evaluation | |
| })) | |
| except WebSocketDisconnect: | |
| manager.disconnect(conversation_id) | |
| except Exception as e: | |
| logger.error(f"WebSocket error: {str(e)}") | |
| await manager.send_message(conversation_id, json.dumps({ | |
| "type": "error", | |
| "content": f"حدث خطأ: {str(e)}" | |
| })) | |
| async def create_project(request: ProjectRequest, api_key: str = Depends(API_KEY_HEADER)): | |
| """إنشاء مشروع جديد""" | |
| if not authenticate(api_key): | |
| raise HTTPException(status_code=403, detail="غير مصرح به") | |
| project_id = memory.create_project(request.name, request.description, request.project_type) | |
| # إنشاء مجلد المشروع | |
| project_dir = f"projects/{project_id}" | |
| os.makedirs(project_dir, exist_ok=True) | |
| # إنشاء ملفات أساسية | |
| with open(f"{project_dir}/README.md", "w", encoding="utf-8") as f: | |
| f.write(f"# {request.name}\n\n{request.description}\n\nCreated by MarkAI at {datetime.now()}") | |
| return {"project_id": project_id, "path": project_dir} | |
| async def improve_code(request: CodeImprovementRequest): | |
| """تحسين الكود المقدم""" | |
| analysis = await engine.analyze_code(request.code, request.language) | |
| improved_code = await engine.improve_code(request.code, request.language, request.improvements) | |
| # حفظ الكود المحسن | |
| code_id = memory.save_code_snippet( | |
| code=improved_code, | |
| language=request.language, | |
| purpose="Improved code", | |
| metadata={ | |
| "original_code": request.code, | |
| "improvements": request.improvements, | |
| "analyzed_at": str(datetime.now()), | |
| "model_used": "large" if request.use_large_model else "base" | |
| } | |
| ) | |
| return { | |
| "improved_code": improved_code, | |
| "analysis": analysis, | |
| "code_id": code_id | |
| } | |
| async def list_conversations(project_id: Optional[str] = None): | |
| """الحصول على قائمة المحادثات""" | |
| if project_id and project_id in memory.projects: | |
| convs = [memory.conversations[cid] for cid in memory.projects[project_id]["conversations"] if cid in memory.conversations] | |
| else: | |
| convs = list(memory.conversations.values()) | |
| return {"conversations": convs} | |
| async def list_projects(): | |
| """الحصول على قائمة المشاريع""" | |
| return {"projects": list(memory.projects.values())} | |
| async def list_code_snippets(language: Optional[str] = None): | |
| """الحصول على قائمة الأكواد المحفوظة""" | |
| snippets = list(memory.code_repository.values()) | |
| if language: | |
| snippets = [s for s in snippets if s["language"].lower() == language.lower()] | |
| return {"snippets": snippets} | |
| # 10. نظام النسخ الاحتياطي التلقائي | |
| async def backup_scheduler(): | |
| while True: | |
| await asyncio.sleep(3600) # كل ساعة | |
| try: | |
| backup_dir = memory.backup_data() | |
| if backup_dir: | |
| logger.info(f"تم إنشاء نسخة احتياطية في: {backup_dir}") | |
| except Exception as e: | |
| logger.error(f"فشل النسخ الاحتياطي: {str(e)}") | |
| # 11. واجهة المستخدم | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads") | |
| templates = Jinja2Templates(directory="templates") | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def chat_interface(request: Request, conversation_id: str): | |
| if conversation_id not in memory.conversations: | |
| raise HTTPException(status_code=404, detail="المحادثة غير موجودة") | |
| return templates.TemplateResponse("chat.html", { | |
| "request": request, | |
| "conversation": memory.conversations[conversation_id] | |
| }) | |
| # 12. بدء المهام الجانبية | |
| async def startup_event(): | |
| asyncio.create_task(backup_scheduler()) | |
| # تحميل النماذج الأساسية مسبقاً | |
| await engine.load_model("text") | |
| await engine.load_model("code") | |
| logger.info("تم بدء تشغيل MarkAI بنجاح") | |
| # 13. ملفات إضافية لتهيئة Hugging Face Spaces | |
| async def serve_app(): | |
| return FileResponse("static/index.html") | |
| async def favicon(): | |
| return FileResponse("static/favicon.ico") | |
| # 14. تشغيل التطبيق | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860, reload=True) |