import os
import shutil
import uuid
import zipfile
import json
import sys
import traceback
from typing import Dict, List
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, WebSocket, WebSocketDisconnect, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import psutil
import platform
# محاولة استيراد CrewAI
try:
from crew_manager import ProjectManagerCrew
CREWAI_AVAILABLE = True
print("✅ CrewAI متوفر")
except ImportError as e:
print(f"⚠️ CrewAI غير متوفر: {e}")
CREWAI_AVAILABLE = False
app = FastAPI(title="مركز الوكلاء الذكي", description="نظام إدارة المشاريع بالذكاء الاصطناعي")
# إعداد CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
UPLOAD_DIR = "projects"
os.makedirs(UPLOAD_DIR, exist_ok=True)
# تخزين الجلسات
sessions: Dict[str, Dict] = {}
class UserRequest(BaseModel):
project_id: str
message: str
# WebSocket للدردشة المباشرة
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, project_id: str):
await websocket.accept()
self.active_connections[project_id] = websocket
def disconnect(self, project_id: str):
if project_id in self.active_connections:
del self.active_connections[project_id]
async def send_message(self, message: str, project_id: str):
if project_id in self.active_connections:
await self.active_connections[project_id].send_text(json.dumps({
"type": "manager",
"message": message
}))
manager = ConnectionManager()
# ============ وظائف المساعدة ============
def create_simple_project_manager(project_path: str):
"""إنشاء مدير مشروع بسيط إذا كان CrewAI غير متوفر"""
class SimpleManager:
def __init__(self, path):
self.path = path
self.conversation = []
def chat_with_manager(self, message, conversation_history=None):
return f"👋 مرحباً! أنا المدير البسيط. تم تحميل مشروعك في {self.path}. رسالتك: {message}"
def create_development_plan(self, user_request):
return f"📋 خطة تطوير بسيطة للمشروع في {self.path}. الطلب: {user_request}"
return SimpleManager(project_path)
def get_simple_response(project_name: str, message: str) -> str:
"""إنشاء رد بسيط بدون نماذج ذكاء اصطناعي"""
responses = [
f"👋 مرحباً! أنا مدير مشروع {project_name}.",
f"💬 رسالتك: '{message}'",
"📊 أقوم بتحليل مشروعك...",
"💡 اقتراح: رفع ملف ZIP لمشروعك للحصول على تحليل مفصل.",
"🛠️ يمكنني مساعدتك في: تحليل الكود، خطط التطوير، النصائح التقنية.",
"🚀 الميزات القادمة: تحليل ذكي، اقتراحات تلقائية، دعم متعدد اللغات.",
"📝 لمزيد من المساعدة، ارفع مشروعك واطرح أسئلة محددة."
]
return "\n\n".join(responses)
def get_dir_size(path):
"""حساب حجم المجلد بالبايت"""
if not os.path.exists(path):
return 0
total = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if os.path.exists(fp):
total += os.path.getsize(fp)
return total / (1024**2) # تحويل إلى ميجابايت
def count_files(path):
"""عد عدد الملفات في المجلد"""
if not os.path.exists(path):
return 0
count = 0
for _, _, filenames in os.walk(path):
count += len(filenames)
return count
def check_for_issues(upload_dir_info, sessions, env_vars):
"""فحص المشاكل المحتملة"""
issues = []
# فحص مجلد الرفع
if not upload_dir_info["exists"]:
issues.append({
"level": "critical",
"title": "مجلد الرفع غير موجود",
"description": f"المجلد {UPLOAD_DIR} غير موجود. لن يتمكن المستخدمون من رفع المشاريع.",
"solution": "إنشاء المجلد تلقائياً أو تعديل مساره في الإعدادات."
})
elif not os.access(upload_dir_info["path"], os.W_OK):
issues.append({
"level": "critical",
"title": "صلاحيات كتابة غير كافية",
"description": f"لا يمكن الكتابة في المجلد {UPLOAD_DIR}.",
"solution": "تعديل صلاحيات المجلد إلى 755 أو 777."
})
# فحص توكن Hugging Face
if "HF_TOKEN" not in env_vars or "غير موجود" in env_vars.get("HF_TOKEN", ""):
issues.append({
"level": "high",
"title": "HF_TOKEN غير موجود",
"description": "مفتاح Hugging Face غير موجود. لن تعمل النماذج اللغوية.",
"solution": "إضافة HF_TOKEN إلى متغيرات البيئة أو إعدادات السرية."
})
# فحص المساحة الحرة
try:
disk_usage = psutil.disk_usage('/')
if disk_usage.percent > 90:
issues.append({
"level": "high",
"title": "مساحة التخزين منخفضة",
"description": f"تبقى {100 - disk_usage.percent}% فقط من المساحة الحرة.",
"solution": "حذف المشاريع القديمة أو توسيع مساحة التخزين."
})
except:
pass
# فحص الذاكرة
try:
memory = psutil.virtual_memory()
if memory.percent > 85:
issues.append({
"level": "medium",
"title": "استخدام عالي للذاكرة",
"description": f"الذاكرة المستخدمة: {memory.percent}%",
"solution": "تحسين كفاءة الكود أو إضافة المزيد من الذاكرة."
})
except:
pass
# فحص الجلسات القديمة
for pid, session in sessions.items():
if session.get("status") == "error":
issues.append({
"level": "low",
"title": "جلسة معطلة",
"description": f"المشروع {pid} ({session.get('name')}) في حالة خطأ.",
"solution": "حذف الجلسة أو إعادة معالجتها."
})
return issues
def generate_recommendations(upload_dir_info, system_info, env_vars):
"""توليد توصيات تحسين"""
recommendations = []
# توصيات المساحة
if upload_dir_info["size_mb"] > 100:
recommendations.append({
"priority": "high",
"title": "تنظيف المشاريع القديمة",
"description": f"حجم مجلد المشاريع: {upload_dir_info['size_mb']:.2f} ميجابايت",
"action": "تفعيل حذف تلقائي للمشاريع القديمة بعد فترة زمنية."
})
# توصيات الذاكرة
if system_info["memory_percent"] > 70:
recommendations.append({
"priority": "medium",
"title": "تحسين استخدام الذاكرة",
"description": f"الذاكرة المستخدمة: {system_info['memory_percent']}%",
"action": "تفعيل ضغط الذاكرة أو تقليل حجم التخزين المؤقت."
})
# توصيات CPU
if system_info["cpu_percent"] > 80:
recommendations.append({
"priority": "medium",
"title": "تحسين استخدام المعالج",
"description": f"استخدام المعالج: {system_info['cpu_percent']}%",
"action": "تفعيل معالجة غير متزامنة أو تقليل التعقيد الحسابي."
})
# توصيات الأمان
if "HF_TOKEN" in env_vars and "✅ موجود" in env_vars["HF_TOKEN"]:
recommendations.append({
"priority": "low",
"title": "تدوير المفاتيح الأمنية",
"description": "HF_TOKEN قيد الاستخدام منذ فترة",
"action": "تغيير مفتاح Hugging Face دورياً للأمان."
})
# توصيات النسخ الاحتياطي
if upload_dir_info["project_count"] > 0:
recommendations.append({
"priority": "low",
"title": "تفعيل النسخ الاحتياطي",
"description": f"يوجد {upload_dir_info['project_count']} مشروع",
"action": "تنفيذ نظام نسخ احتياطي تلقائي للمشاريع."
})
return recommendations
# تعريف وقت بدء التطبيق
app_start_time = datetime.now()
# ============ HTML الرئيسي ============
@app.get("/")
async def root():
html_content = """
مركز الوكلاء الذكي
مرحباً بك في مركز الوكلاء الذكي
نظام متكامل لتحليل وتطوير المشاريع البرمجية باستخدام الذكاء الاصطناعي
رفع المشاريع
ارفع ملف ZIP لمشروعك ليقوم النظام بتحليله
دردشة ذكية
تحدث مع المدير الذكي للحصول على نصائح وتحليلات
تحليل متقدم
احصل على خطط تطوير وتحسينات لمشروعك
المشاريع المرفوعة
0
رفع مشروع جديد
اسحب وأفلت ملف ZIP هنا
أو انقر لاختيار ملف
دردشة مع المدير الذكي
"""
return HTMLResponse(content=html_content)
# ============ API Endpoints ============
@app.get("/api/test")
async def test_api():
"""اختبار API"""
return {
"status": "success",
"message": "✅ نظام مركز الوكلاء يعمل بنجاح!",
"timestamp": datetime.now().isoformat(),
"crewai_available": CREWAI_AVAILABLE,
"sessions_count": len(sessions)
}
@app.get("/api/test-connection")
async def test_connection():
"""اختبار الاتصال مع الخادم"""
return {
"status": "success",
"message": "✅ الاتصال ناجح",
"time": datetime.now().isoformat(),
"total_projects": len(sessions),
"upload_dir": UPLOAD_DIR,
"upload_dir_exists": os.path.exists(UPLOAD_DIR),
"crewai_available": CREWAI_AVAILABLE
}
@app.post("/api/upload-zip")
async def upload_zip(file: UploadFile = File(...)):
"""رفع ملف ZIP لمشروع جديد"""
try:
if not file.filename.endswith('.zip'):
raise HTTPException(status_code=400, detail="يجب أن يكون الملف بصيغة ZIP")
# إنشاء معرف فريد للمشروع
project_id = str(uuid.uuid4())
project_dir = os.path.join(UPLOAD_DIR, project_id)
os.makedirs(project_dir, exist_ok=True)
# حفظ الملف المرفوع
zip_path = os.path.join(project_dir, file.filename)
with open(zip_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# فك ضغط الملف
extract_dir = os.path.join(project_dir, "extracted")
os.makedirs(extract_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# إنشاء جلسة للمشروع
project_name = file.filename.replace('.zip', '')
sessions[project_id] = {
"name": project_name,
"path": extract_dir,
"status": "active",
"conversation": [
{
"role": "system",
"content": f"👋 تم إنشاء المشروع '{project_name}' بنجاح. جاهز للتحليل والدردشة.",
"timestamp": datetime.now().isoformat()
}
],
"crew": None,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
print(f"✅ تم رفع المشروع: {project_name} (ID: {project_id})")
print(f"📁 المسار: {extract_dir}")
print(f"📊 عدد الجلسات النشطة: {len(sessions)}")
return {
"status": "success",
"message": f"تم رفع المشروع '{project_name}' بنجاح",
"project_id": project_id,
"project_name": project_name,
"extracted_path": extract_dir,
"session_created": True,
"total_projects": len(sessions)
}
except Exception as e:
print(f"❌ خطأ في رفع الملف: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"فشل رفع الملف: {str(e)}")
@app.get("/api/projects")
async def get_projects():
"""الحصول على قائمة المشاريع"""
projects = []
for project_id, session in sessions.items():
projects.append({
"id": project_id,
"name": session.get("name", "غير معروف"),
"status": session.get("status", "unknown"),
"conversation_count": len(session.get("conversation", [])),
"created_at": session.get("created_at", ""),
"has_crew": session.get("crew") is not None
})
print(f"📋 إرجاع {len(projects)} مشروع")
# ترتيب المشاريع من الأحدث إلى الأقدم
projects.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return {"projects": projects}
@app.get("/api/simple-chat")
async def simple_chat(project_id: str, message: str):
"""دردشة مبسطة بدون CrewAI"""
if project_id not in sessions:
raise HTTPException(status_code=404, detail="المشروع غير موجود")
project_name = sessions[project_id].get("name", "غير معروف")
# إنشاء رد مبسط
simple_response = f"""
👋 مرحباً! أنا مدير المشروع المبسط.
**مشروعك:** {project_name}
**رسالتك:** {message}
## 💡 كيف يمكنني مساعدتك:
1. **تحليل المشروع**: أستطيع تحليل ملفات مشروعك
2. **خطط التطوير**: تقديم اقتراحات للتحسين
3. **نصائح تقنية**: إرشادات برمجية
4. **هيكلة المشروع**: تنظيم الملفات والمجلدات
## 📋 للمساعدة الأفضل:
- ارفع ملف ZIP لمشروعك
- اطرح أسئلة محددة
- حدد التقنيات المستخدمة
*ملاحظة: النظام قيد التطوير، الميزات الكاملة قريباً!*
"""
# تحديث المحادثة
sessions[project_id]["conversation"].append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
sessions[project_id]["conversation"].append({
"role": "manager",
"content": simple_response,
"timestamp": datetime.now().isoformat()
})
return {
"status": "success",
"response": simple_response,
"conversation_length": len(sessions[project_id]["conversation"])
}
@app.post("/api/start-discussion")
async def start_discussion(request: UserRequest):
"""بدء نقاش حول المشروع"""
project_id = request.project_id
if project_id not in sessions:
raise HTTPException(status_code=404, detail="المشروع غير موجود")
try:
# إذا كان CrewAI غير متوفر، استخدم الرد المبسط
if not CREWAI_AVAILABLE:
return await simple_chat(project_id, request.message)
project_path = sessions[project_id]["path"]
crew = sessions[project_id]["crew"]
if not crew:
try:
crew = ProjectManagerCrew(project_path)
sessions[project_id]["crew"] = crew
except Exception as e:
print(f"⚠️ خطأ في إنشاء Crew: {e}")
return await simple_chat(project_id, request.message)
# بدء عملية التطوير
try:
result = crew.create_development_plan(request.message)
except Exception as e:
print(f"⚠️ خطأ في create_development_plan: {e}")
return await simple_chat(project_id, request.message)
# حفظ في المحادثة
sessions[project_id]["conversation"].append({
"role": "user",
"content": request.message,
"timestamp": datetime.now().isoformat()
})
sessions[project_id]["conversation"].append({
"role": "system",
"content": str(result),
"timestamp": datetime.now().isoformat()
})
# تحديث الحالة والوقت
sessions[project_id]["status"] = "processing_complete"
sessions[project_id]["updated_at"] = datetime.now().isoformat()
return {
"status": "success",
"result": str(result),
"message": "تمت معالجة طلبك بنجاح"
}
except Exception as e:
print(f"❌ خطأ في start_discussion: {e}")
traceback.print_exc()
# استخدم الرد المبسط في حالة الخطأ
return await simple_chat(project_id, request.message)
@app.post("/api/chat")
async def chat_with_manager(request: UserRequest):
"""الدردشة مع المدير"""
project_id = request.project_id
if project_id not in sessions:
raise HTTPException(status_code=404, detail="المشروع غير موجود")
try:
# إذا كان CrewAI غير متوفر، استخدم الرد المبسط
if not CREWAI_AVAILABLE:
return await simple_chat(project_id, request.message)
if sessions[project_id]["crew"] is None:
project_path = sessions[project_id]["path"]
try:
crew = ProjectManagerCrew(project_path)
sessions[project_id]["crew"] = crew
except Exception as e:
print(f"⚠️ خطأ في إنشاء Crew: {e}")
return await simple_chat(project_id, request.message)
else:
crew = sessions[project_id]["crew"]
conversation = sessions[project_id]["conversation"]
# الحصول على رد من المدير
try:
response = crew.chat_with_manager(request.message, conversation)
except Exception as e:
print(f"⚠️ خطأ في الدردشة مع Crew: {e}")
return await simple_chat(project_id, request.message)
# تحديث المحادثة
sessions[project_id]["conversation"].append({
"role": "user",
"content": request.message,
"timestamp": datetime.now().isoformat()
})
sessions[project_id]["conversation"].append({
"role": "manager",
"content": str(response),
"timestamp": datetime.now().isoformat()
})
# تحديث وقت التعديل
sessions[project_id]["updated_at"] = datetime.now().isoformat()
return {
"status": "success",
"response": str(response),
"conversation_length": len(sessions[project_id]["conversation"])
}
except Exception as e:
print(f"❌ خطأ في chat_with_manager: {e}")
# استخدم الرد المبسط في حالة الخطأ
return await simple_chat(project_id, request.message)
@app.get("/api/conversation/{project_id}")
async def get_conversation(project_id: str):
"""الحصول على محادثة المشروع"""
if project_id not in sessions:
raise HTTPException(status_code=404, detail="المشروع غير موجود")
return {
"status": "success",
"project_id": project_id,
"project_name": sessions[project_id].get("name", ""),
"conversation": sessions[project_id]["conversation"],
"total_messages": len(sessions[project_id]["conversation"])
}
@app.get("/api/project/{project_id}/files")
async def get_project_files(project_id: str):
"""الحصول على ملفات المشروع"""
if project_id not in sessions:
raise HTTPException(status_code=404, detail="المشروع غير موجود")
project_path = sessions[project_id]["path"]
files = []
try:
for root, dirs, filenames in os.walk(project_path):
for filename in filenames:
file_path = os.path.join(root, filename)
rel_path = os.path.relpath(file_path, project_path)
# الحصول على حجم الملف
size = os.path.getsize(file_path)
size_mb = size / (1024 * 1024)
# الحصول على امتداد الملف
_, ext = os.path.splitext(filename)
files.append({
"name": filename,
"path": rel_path,
"size_mb": round(size_mb, 2),
"extension": ext,
"directory": root.replace(project_path, '').lstrip('/')
})
return {
"status": "success",
"project_id": project_id,
"total_files": len(files),
"files": files[:50] # إرجاع أول 50 ملف فقط
}
except Exception as e:
print(f"❌ خطأ في قراءة الملفات: {e}")
raise HTTPException(status_code=500, detail=f"فشل قراءة الملفات: {str(e)}")
@app.get("/api/project/{project_id}/analyze")
async def analyze_project(project_id: str):
"""تحليل المشروع"""
if project_id not in sessions:
raise HTTPException(status_code=404, detail="المشروع غير موجود")
project_path = sessions[project_id]["path"]
try:
# تحليل بسيط للمشروع
file_count = 0
file_types = {}
total_size = 0
for root, dirs, filenames in os.walk(project_path):
file_count += len(filenames)
for filename in filenames:
file_path = os.path.join(root, filename)
total_size += os.path.getsize(file_path)
# تحليل أنواع الملفات
_, ext = os.path.splitext(filename)
file_types[ext] = file_types.get(ext, 0) + 1
# تحليل هيكل المجلدات
dirs = []
for item in os.listdir(project_path):
item_path = os.path.join(project_path, item)
if os.path.isdir(item_path):
dirs.append(item)
return {
"status": "success",
"project_id": project_id,
"analysis": {
"file_count": file_count,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"file_types": file_types,
"top_directories": dirs[:10],
"has_readme": os.path.exists(os.path.join(project_path, "README.md")),
"has_requirements": os.path.exists(os.path.join(project_path, "requirements.txt")),
"has_package_json": os.path.exists(os.path.join(project_path, "package.json")),
"has_dockerfile": any(f.lower().startswith('dockerfile') for f in os.listdir(project_path))
}
}
except Exception as e:
print(f"❌ خطأ في تحليل المشروع: {e}")
raise HTTPException(status_code=500, detail=f"فشل تحليل المشروع: {str(e)}")
@app.websocket("/ws/{project_id}")
async def websocket_endpoint(websocket: WebSocket, project_id: str):
"""WebSocket للدردشة المباشرة"""
await manager.connect(websocket, project_id)
if project_id not in sessions:
await websocket.send_text(json.dumps({
"type": "error",
"message": "المشروع غير موجود"
}))
await websocket.close()
return
try:
# إرسال رسالة ترحيبية
welcome_message = "👋 مرحباً! أنا مدير المشروع الذكي. كيف يمكنني مساعدتك اليوم؟"
await websocket.send_text(json.dumps({
"type": "manager",
"message": welcome_message
}))
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
if message_data["type"] == "user_message":
user_message = message_data["message"]
# إرسال رد مؤقت
await websocket.send_text(json.dumps({
"type": "status",
"message": "⚙️ المدير يفكر في رد..."
}))
# الحصول على الرد باستخدام API العادي
try:
# استخدام API العادي للحصول على الرد
import aiohttp
import asyncio
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://localhost:7860/api/chat",
json={"project_id": project_id, "message": user_message},
timeout=30
) as response:
if response.status == 200:
data = await response.json()
response_text = data.get("response", "❌ لم أستطع فهم سؤالك.")
else:
response_text = "❌ حدث خطأ في الخادم."
except:
# استخدام رد بديل إذا فشل الاتصال
response_text = f"💬 رسالتك: '{user_message}'\n\nأنا هنا للمساعدة! يمكنك استخدام واجهة الدردشة العادية."
# إرسال الرد
await websocket.send_text(json.dumps({
"type": "manager",
"message": response_text
}))
except WebSocketDisconnect:
manager.disconnect(project_id)
except Exception as e:
print(f"❌ خطأ في WebSocket: {e}")
await websocket.send_text(json.dumps({
"type": "error",
"message": f"حدث خطأ: {str(e)}"
}))
manager.disconnect(project_id)
@app.get("/debug")
async def debug():
"""صفحة تصحيح"""
# جمع معلومات النظام
system_info = {
"system": platform.system(),
"release": platform.release(),
"python_version": platform.python_version(),
"processor": platform.processor(),
"memory_total_gb": round(psutil.virtual_memory().total / (1024**3), 2),
"memory_used_gb": round(psutil.virtual_memory().used / (1024**3), 2),
"memory_percent": psutil.virtual_memory().percent,
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent(interval=1),
"disk_usage": {
"total_gb": round(psutil.disk_usage('/').total / (1024**3), 2),
"used_gb": round(psutil.disk_usage('/').used / (1024**3), 2),
"free_gb": round(psutil.disk_usage('/').free / (1024**3), 2),
"percent": psutil.disk_usage('/').percent
}
}
# تحليل مجلد المشاريع
upload_dir_info = {
"path": UPLOAD_DIR,
"exists": os.path.exists(UPLOAD_DIR),
"absolute_path": os.path.abspath(UPLOAD_DIR) if os.path.exists(UPLOAD_DIR) else None,
"permissions": oct(os.stat(UPLOAD_DIR).st_mode)[-3:] if os.path.exists(UPLOAD_DIR) else None,
"size_mb": get_dir_size(UPLOAD_DIR) if os.path.exists(UPLOAD_DIR) else 0,
"project_count": len([d for d in os.listdir(UPLOAD_DIR) if os.path.isdir(os.path.join(UPLOAD_DIR, d))]) if os.path.exists(UPLOAD_DIR) else 0,
"projects": []
}
if os.path.exists(UPLOAD_DIR):
try:
projects = []
for project_id in os.listdir(UPLOAD_DIR):
project_path = os.path.join(UPLOAD_DIR, project_id)
if os.path.isdir(project_path):
project_info = {
"id": project_id,
"size_mb": round(get_dir_size(project_path), 2),
"file_count": count_files(project_path),
"created": datetime.fromtimestamp(os.path.getctime(project_path)).isoformat() if os.path.exists(project_path) else None,
"modified": datetime.fromtimestamp(os.path.getmtime(project_path)).isoformat() if os.path.exists(project_path) else None
}
projects.append(project_info)
upload_dir_info["projects"] = sorted(projects, key=lambda x: x["size_mb"], reverse=True)[:10] # أول 10 مشاريع
except Exception as e:
upload_dir_info["error"] = str(e)
# تحليل الجلسات النشطة
active_sessions = []
for pid, session in sessions.items():
session_info = {
"id": pid,
"name": session.get("name", "غير معروف"),
"status": session.get("status", "غير معروف"),
"conversation_count": len(session.get("conversation", [])),
"has_crew": session.get("crew") is not None,
"path_exists": os.path.exists(session.get("path", "")) if session.get("path") else False,
"size_mb": round(get_dir_size(session.get("path", "")), 2) if session.get("path") and os.path.exists(session.get("path")) else 0
}
active_sessions.append(session_info)
# تحليل المتغيرات البيئية
env_vars = {}
important_vars = ["HF_TOKEN", "OPENAI_API_KEY", "SERPER_API_KEY", "DEBUG", "PYTHONPATH"]
for var in important_vars:
value = os.getenv(var)
if value:
if var.endswith("_TOKEN") or var.endswith("_KEY"):
env_vars[var] = f"✅ موجود ({len(value)} حرف)" if value else "❌ غير موجود"
else:
env_vars[var] = value
# تحليل الموديولات المثبتة
installed_modules = {}
important_modules = ["fastapi", "uvicorn", "crewai", "langchain", "huggingface_hub", "requests", "websockets"]
for module in important_modules:
try:
import importlib
importlib.import_module(module)
installed_modules[module] = "✅ مثبت"
except ImportError:
installed_modules[module] = "❌ غير مثبت"
# إحصائيات API
api_stats = {
"total_projects": len(sessions),
"active_websockets": len(manager.active_connections),
"total_conversations": sum(len(s.get("conversation", [])) for s in sessions.values()),
"avg_conversation_length": round(sum(len(s.get("conversation", [])) for s in sessions.values()) / len(sessions), 2) if sessions else 0
}
# تحليل الذاكرة للمشاريع
memory_info = {
"process_memory_mb": round(psutil.Process().memory_info().rss / (1024**2), 2),
"process_memory_percent": psutil.Process().memory_percent(),
"open_files": len(psutil.Process().open_files()),
"threads": psutil.Process().num_threads(),
"cpu_times": str(psutil.Process().cpu_times())
}
# تحليل الشبكة
try:
net_info = psutil.net_io_counters()
network_info = {
"bytes_sent_mb": round(net_info.bytes_sent / (1024**2), 2),
"bytes_recv_mb": round(net_info.bytes_recv / (1024**2), 2),
"packets_sent": net_info.packets_sent,
"packets_recv": net_info.packets_recv
}
except:
network_info = {"error": "لا يمكن قراءة معلومات الشبكة"}
# معلومات التطبيق
app_info = {
"start_time": datetime.now().isoformat(),
"uptime_seconds": (datetime.now() - app_start_time).total_seconds() if 'app_start_time' in globals() else 0,
"version": "1.0.0",
"endpoints": [
{"path": "/", "method": "GET", "description": "الواجهة الرئيسية"},
{"path": "/api/upload-zip", "method": "POST", "description": "رفع مشروع"},
{"path": "/api/projects", "method": "GET", "description": "قائمة المشاريع"},
{"path": "/api/chat", "method": "POST", "description": "الدردشة مع المدير"},
{"path": "/ws/{project_id}", "method": "WebSocket", "description": "دردشة مباشرة"},
{"path": "/debug", "method": "GET", "description": "صفحة التصحيح"}
]
}
# تحقق من المشاكل
issues = check_for_issues(upload_dir_info, sessions, env_vars)
# توليد التوصيات
recommendations = generate_recommendations(upload_dir_info, system_info, env_vars)
return {
"status": "active",
"timestamp": datetime.now().isoformat(),
# معلومات النظام
"system": system_info,
# معلومات التطبيق
"application": app_info,
# معلومات التخزين
"storage": {
"upload_directory": upload_dir_info,
"active_sessions": active_sessions
},
# إحصائيات API
"api_statistics": api_stats,
# معلومات الموديولات
"modules": {
"crewai_available": CREWAI_AVAILABLE,
"important_modules": installed_modules
},
# المتغيرات البيئية
"environment": {
"important_variables": env_vars,
"python_path": os.getenv("PYTHONPATH", "غير محدد"),
"current_working_dir": os.getcwd(),
"script_location": os.path.abspath(__file__)
},
# معلومات الذاكرة والأداء
"performance": {
"memory": memory_info,
"network": network_info
},
# المشاكل المحتملة
"issues": issues,
# التوصيات
"recommendations": recommendations
}