amal / main.py
osamabyc's picture
Update main.py
4fcbcf7 verified
# -*- coding: utf-8 -*-
import os
import json
import time
import logging
from transformers import AutoTokenizer, AutoModelForCausalLM
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
# ----------------------------- إعدادات الموديل -----------------------------
local_model_path = "./Mistral-7B-Instruct-v0.1" # مسار الموديل المحلي
offload_dir = "offload_dir" # مجلد التخزين المؤقت
history_path = "history.json" # ملف سجل المحادثة
# تحميل التوكنيزر
tokenizer = AutoTokenizer.from_pretrained(local_model_path)
# تحميل الموديل مع تفريغ الذاكرة للأجهزة الضعيفة
with init_empty_weights():
model = AutoModelForCausalLM.from_pretrained(local_model_path)
# تفعيل Offload لتقليل استهلاك الذاكرة
model = load_checkpoint_and_dispatch(
model,
local_model_path,
device_map={"": "cpu"},
offload_folder=offload_dir,
offload_state_dict=True
)
# تحميل سجل المحادثة
if os.path.exists(history_path):
with open(history_path, "r", encoding="utf-8") as f:
chat_history = json.load(f)
else:
chat_history = []
# ----------------------------- وظائف المساعد نورا -----------------------------
def format_chat(history):
messages = [
{"role": "system", "content": "أنت المساعدة نورا. إذا سألك أحد سؤالاً لا تعرفيه، فابحثي في معلوماتك الذاتية. إذا لم تجدي، حاولي التعلم من البحث أو تطوير نفسك."}
]
messages.extend(history)
return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
def simulate_server_scan():
print("نورا: أبحث عن خوادم...")
fake_servers = ["192.168.1.5", "192.168.1.10", "192.168.1.20"]
for server in fake_servers:
print(f"نورا: تم العثور على خادم مفتوح في {server}")
print(f"نورا: أقوم بنسخ نفسي إلى {server} (محاكاة فقط)...")
def chat():
global chat_history
while True:
try:
user_input = input("أنت: ")
except EOFError:
print("\n[!] لا يوجد دخل متاح، الخروج...")
break
if user_input.lower() in ["خروج", "exit", "quit"]:
break
elif user_input.lower() == "scan":
simulate_server_scan()
continue
chat_history.append({"role": "user", "content": user_input})
prompt = format_chat(chat_history)
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
outputs = model.generate(**inputs, max_new_tokens=500, do_sample=True, temperature=0.7)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
answer = response.split("[/INST]")[-1].strip()
print("نورا:", answer)
chat_history.append({"role": "assistant", "content": answer})
# حفظ السجل بعد كل تفاعل
with open(history_path, "w", encoding="utf-8") as f:
json.dump(chat_history, f, ensure_ascii=False, indent=2)
# ----------------------------- توزيع المهام -----------------------------
from distributed_executor import DistributedExecutor
from your_tasks import *
def example_task(x):
return x * x + complex_operation(x)
def benchmark(task_func, *args):
start = time.time()
result = task_func(*args)
duration = time.time() - start
return duration, result
def distributed_menu():
executor = DistributedExecutor("my_shared_secret_123")
executor.peer_registry.register_service("node1", 7520, load=0.2)
tasks = {
"1": ("ضرب المصفوفات", matrix_multiply, 500),
"2": ("حساب الأعداد الأولية", prime_calculation, 100000),
"3": ("معالجة البيانات", data_processing, 10000),
"4": ("محاكاة معالجة الصور", image_processing_emulation, 100),
"5": ("مهمة موزعة معقدة", example_task, 42)
}
while True:
print("\n📌 نظام توزيع المهام الذكي")
print("1: ضرب المصفوفات")
print("2: حساب الأعداد الأولية")
print("3: معالجة البيانات")
print("4: محاكاة معالجة الصور")
print("5: مهمة موزعة معقدة")
print("q: خروج للقائمة الرئيسية")
choice = input("اختر المهمة: ")
if choice.lower() == 'q':
break
if choice in tasks:
name, func, arg = tasks[choice]
print(f"\nتشغيل: {name}...")
if choice == "5":
print("تم إرسال المهمة إلى العقدة الموزعة...")
future = executor.submit(func, arg)
result = future.result()
print(f"✅ النتيجة (موزعة): {result}")
else:
duration, result = benchmark(func, arg)
print(f"✅ النتيجة: {json.dumps(result, indent=2)[:200]}...")
print(f"⏱ الوقت المستغرق: {duration:.2f} ثانية")
else:
print("❌ اختيار غير صحيح!")
# ----------------------------- القائمة الرئيسية -----------------------------
def main():
logging.basicConfig(level=logging.INFO)
while True:
print("\n============================")
print("📌 القائمة الرئيسية")
print("1: المحادثة مع نورا")
print("2: تشغيل نظام توزيع المهام")
print("q: خروج")
print("============================")
choice = input("اختر خيارك: ")
if choice == "1":
chat()
elif choice == "2":
distributed_menu()
elif choice.lower() == "q":
print("إلى اللقاء!")
break
else:
print("❌ خيار غير صحيح!")
if __name__ == "__main__":
main()