Spaces:
Paused
Paused
| """ | |
| خدمة التنبؤ بالأسعار | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import joblib | |
| import os | |
| from datetime import datetime, timedelta | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score | |
| import config | |
| class PricePrediction: | |
| """خدمة التنبؤ بالأسعار باستخدام التعلم الآلي""" | |
| def __init__(self): | |
| """تهيئة خدمة التنبؤ بالأسعار""" | |
| self.model_path = config.PRICE_PREDICTION_MODEL | |
| self.model = self._load_model() | |
| self.scaler = None | |
| self.materials_data = self._load_materials_data() | |
| self.market_indices = self._load_market_indices() | |
| def _load_model(self): | |
| """تحميل نموذج التنبؤ المدرب مسبقاً""" | |
| try: | |
| if os.path.exists(self.model_path): | |
| model = joblib.load(self.model_path) | |
| return model | |
| else: | |
| # إذا لم يكن النموذج موجوداً، قم بإنشاء نموذج جديد | |
| model = RandomForestRegressor( | |
| n_estimators=100, | |
| max_depth=15, | |
| min_samples_split=5, | |
| min_samples_leaf=2, | |
| random_state=42 | |
| ) | |
| return model | |
| except Exception as e: | |
| print(f"خطأ في تحميل نموذج التنبؤ: {str(e)}") | |
| return RandomForestRegressor(random_state=42) | |
| def _load_materials_data(self): | |
| """تحميل بيانات المواد وأسعارها التاريخية""" | |
| # محاكاة تحميل البيانات من مصدر بيانات | |
| materials_data = { | |
| 'خرسانة': { | |
| 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
| 'سعر': [750, 740, 735, 730, 720, 715, 710, 700, 695, 690, 685, 680], | |
| 'وحدة': 'م3' | |
| }, | |
| 'حديد تسليح': { | |
| 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
| 'سعر': [5500, 5450, 5400, 5350, 5300, 5250, 5200, 5150, 5100, 5050, 5000, 4950], | |
| 'وحدة': 'طن' | |
| }, | |
| 'إسمنت': { | |
| 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
| 'سعر': [25, 25, 24.5, 24.5, 24, 24, 23.5, 23.5, 23, 23, 22.5, 22.5], | |
| 'وحدة': 'كيس' | |
| }, | |
| 'رمل': { | |
| 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
| 'سعر': [140, 140, 135, 135, 130, 130, 125, 125, 120, 120, 115, 115], | |
| 'وحدة': 'م3' | |
| }, | |
| 'بلوك خرساني': { | |
| 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
| 'سعر': [11, 11, 10.5, 10.5, 10, 10, 9.5, 9.5, 9, 9, 8.5, 8.5], | |
| 'وحدة': 'قطعة' | |
| } | |
| } | |
| return materials_data | |
| def _load_market_indices(self): | |
| """تحميل مؤشرات السوق المؤثرة على الأسعار""" | |
| # محاكاة تحميل البيانات من مصدر بيانات | |
| market_indices = { | |
| 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
| 'مؤشر_البناء': [105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94], | |
| 'مؤشر_النفط': [80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69], | |
| 'مؤشر_سعر_الصرف': [3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75], | |
| 'مؤشر_التضخم': [2.5, 2.4, 2.3, 2.2, 2.1, 2.0, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4] | |
| } | |
| return market_indices | |
| def train(self, training_data=None): | |
| """ | |
| تدريب نموذج التنبؤ بالأسعار | |
| المعلمات: | |
| training_data: بيانات التدريب (اختياري)، إذا لم يتم توفيرها سيتم استخدام البيانات المتاحة | |
| إرجاع: | |
| مؤشرات أداء النموذج | |
| """ | |
| # تجهيز بيانات التدريب | |
| if training_data is None: | |
| # استخدام البيانات المتاحة لتوليد مجموعة تدريب | |
| X, y = self._prepare_training_data() | |
| else: | |
| X, y = self._extract_features_target(training_data) | |
| # تقسيم البيانات إلى تدريب واختبار | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42) | |
| # تطبيع البيانات | |
| self.scaler = StandardScaler() | |
| X_train_scaled = self.scaler.fit_transform(X_train) | |
| X_test_scaled = self.scaler.transform(X_test) | |
| # تدريب النموذج | |
| self.model.fit(X_train_scaled, y_train) | |
| # تقييم النموذج | |
| y_pred = self.model.predict(X_test_scaled) | |
| # حساب مؤشرات الأداء | |
| mae = mean_absolute_error(y_test, y_pred) | |
| rmse = np.sqrt(mean_squared_error(y_test, y_pred)) | |
| r2 = r2_score(y_test, y_pred) | |
| # حفظ النموذج | |
| try: | |
| joblib.dump(self.model, self.model_path) | |
| joblib.dump(self.scaler, os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl')) | |
| except Exception as e: | |
| print(f"خطأ في حفظ النموذج: {str(e)}") | |
| return { | |
| 'mae': mae, | |
| 'rmse': rmse, | |
| 'r2': r2 | |
| } | |
| def _prepare_training_data(self): | |
| """تجهيز بيانات التدريب من البيانات المتاحة""" | |
| # توليد بيانات تدريب افتراضية | |
| data = [] | |
| target = [] | |
| # استخدام بيانات المواد وأسعارها التاريخية | |
| for material_name, material_info in self.materials_data.items(): | |
| for i in range(len(material_info['تاريخ'])): | |
| # استخراج المؤشرات في التاريخ المقابل | |
| date_index = self.market_indices['تاريخ'].index(material_info['تاريخ'][i]) if material_info['تاريخ'][i] in self.market_indices['تاريخ'] else 0 | |
| # تكوين ميزات التدريب (المؤشرات السوقية والشهر) | |
| features = [ | |
| material_info['تاريخ'][i].month, # الشهر | |
| self.market_indices['مؤشر_البناء'][date_index], | |
| self.market_indices['مؤشر_النفط'][date_index], | |
| self.market_indices['مؤشر_سعر_الصرف'][date_index], | |
| self.market_indices['مؤشر_التضخم'][date_index] | |
| ] | |
| # إضافة معرّف للمادة (تمثيل رقمي) | |
| material_id = list(self.materials_data.keys()).index(material_name) | |
| features.append(material_id) | |
| data.append(features) | |
| target.append(material_info['سعر'][i]) | |
| # إضافة ضوضاء عشوائية لزيادة حجم البيانات | |
| for _ in range(5): | |
| noisy_features = features.copy() | |
| for j in range(1, 5): # إضافة ضوضاء للمؤشرات فقط | |
| noisy_features[j] += np.random.normal(0, 0.5) | |
| noisy_price = material_info['سعر'][i] * (1 + np.random.normal(0, 0.02)) # ضوضاء 2% | |
| data.append(noisy_features) | |
| target.append(noisy_price) | |
| return np.array(data), np.array(target) | |
| def _extract_features_target(self, training_data): | |
| """استخراج الميزات والأهداف من بيانات التدريب""" | |
| # استخراج الميزات والأهداف من البيانات المقدمة | |
| features = [] | |
| target = [] | |
| for item in training_data: | |
| features.append([ | |
| item['date'].month, # الشهر | |
| item['building_index'], | |
| item['oil_index'], | |
| item['exchange_rate'], | |
| item['inflation_rate'], | |
| item['material_id'] | |
| ]) | |
| target.append(item['price']) | |
| return np.array(features), np.array(target) | |
| def predict_prices(self, materials, prediction_date=None, market_conditions=None): | |
| """ | |
| التنبؤ بأسعار المواد | |
| المعلمات: | |
| materials: قائمة المواد المطلوب التنبؤ بأسعارها | |
| prediction_date: تاريخ التنبؤ (اختياري)، إذا لم يتم توفيره سيتم استخدام التاريخ الحالي | |
| market_conditions: ظروف السوق (اختياري)، إذا لم يتم توفيرها سيتم استخدام آخر قيم متاحة | |
| إرجاع: | |
| قاموس بأسعار المواد المتنبأ بها | |
| """ | |
| if prediction_date is None: | |
| prediction_date = datetime.now() | |
| if market_conditions is None: | |
| # استخدام آخر قيم متاحة للمؤشرات | |
| market_conditions = { | |
| 'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0], | |
| 'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0], | |
| 'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], | |
| 'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] | |
| } | |
| # التحقق من وجود المواد في البيانات | |
| material_names = list(self.materials_data.keys()) | |
| valid_materials = [m for m in materials if m in material_names] | |
| if not valid_materials: | |
| return {} | |
| # تحميل المعايير إذا كانت متوفرة | |
| scaler_path = os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl') | |
| if self.scaler is None and os.path.exists(scaler_path): | |
| try: | |
| self.scaler = joblib.load(scaler_path) | |
| except Exception as e: | |
| print(f"خطأ في تحميل المعايير: {str(e)}") | |
| # إنشاء معايير جديدة | |
| X, _ = self._prepare_training_data() | |
| self.scaler = StandardScaler() | |
| self.scaler.fit(X) | |
| # إعداد ميزات التنبؤ | |
| features = [] | |
| for material in valid_materials: | |
| material_id = material_names.index(material) | |
| material_features = [ | |
| prediction_date.month, # الشهر | |
| market_conditions['مؤشر_البناء'], | |
| market_conditions['مؤشر_النفط'], | |
| market_conditions['مؤشر_سعر_الصرف'], | |
| market_conditions['مؤشر_التضخم'], | |
| material_id | |
| ] | |
| features.append(material_features) | |
| # تطبيع الميزات | |
| if self.scaler is not None: | |
| features_scaled = self.scaler.transform(features) | |
| else: | |
| features_scaled = features | |
| # التنبؤ بالأسعار | |
| predicted_prices = self.model.predict(features_scaled) | |
| # إرجاع النتائج | |
| results = {} | |
| for i, material in enumerate(valid_materials): | |
| # تطبيق عامل تصحيح (2% عشوائية) | |
| correction_factor = 1.0 + np.random.uniform(-0.02, 0.02) | |
| price = max(0, predicted_prices[i] * correction_factor) | |
| results[material] = { | |
| 'سعر': price, | |
| 'وحدة': self.materials_data[material]['وحدة'], | |
| 'تاريخ_التنبؤ': prediction_date.strftime('%Y-%m-%d'), | |
| 'هامش_الخطأ': '±5%' # تقدير هامش الخطأ | |
| } | |
| return results | |
| def get_price_trends(self, material, periods=6): | |
| """ | |
| الحصول على اتجاهات الأسعار المستقبلية | |
| المعلمات: | |
| material: المادة المطلوب التنبؤ باتجاهات أسعارها | |
| periods: عدد الفترات المستقبلية (الشهور) | |
| إرجاع: | |
| قائمة بالأسعار المتوقعة للفترات المستقبلية | |
| """ | |
| if material not in self.materials_data: | |
| return [] | |
| # الحصول على التاريخ الحالي | |
| current_date = datetime.now() | |
| # التنبؤ بالأسعار للفترات المستقبلية | |
| price_trends = [] | |
| for i in range(periods): | |
| prediction_date = current_date + timedelta(days=30 * (i + 1)) | |
| # افتراض تغيرات طفيفة في المؤشرات مع مرور الوقت | |
| market_conditions = { | |
| 'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0] * (1 + 0.01 * i), # زيادة 1% شهرياً | |
| 'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0] * (1 + 0.005 * i), # زيادة 0.5% شهرياً | |
| 'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], # ثابت | |
| 'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] * (1 + 0.01 * i) # زيادة 1% شهرياً | |
| } | |
| # التنبؤ بالسعر | |
| predicted_price = self.predict_prices([material], prediction_date, market_conditions) | |
| price_trends.append({ | |
| 'تاريخ': prediction_date.strftime('%Y-%m'), | |
| 'سعر': predicted_price[material]['سعر'] if material in predicted_price else 0 | |
| }) | |
| return price_trends | |
| def analyze_factors(self, material): | |
| """ | |
| تحليل العوامل المؤثرة على سعر المادة | |
| المعلمات: | |
| material: المادة المطلوب تحليلها | |
| إرجاع: | |
| قاموس بالعوامل المؤثرة وأهميتها النسبية | |
| """ | |
| if material not in self.materials_data or not hasattr(self.model, 'feature_importances_'): | |
| return {} | |
| # الحصول على أهمية الميزات من النموذج | |
| feature_importances = self.model.feature_importances_ | |
| # أسماء الميزات | |
| feature_names = ['الشهر', 'مؤشر البناء', 'مؤشر النفط', 'سعر الصرف', 'معدل التضخم', 'نوع المادة'] | |
| # ترتيب الميزات حسب الأهمية | |
| importance_pairs = [(name, importance) for name, importance in zip(feature_names, feature_importances)] | |
| importance_pairs.sort(key=lambda x: x[1], reverse=True) | |
| # إرجاع العوامل المؤثرة وأهميتها | |
| factors = {} | |
| for name, importance in importance_pairs: | |
| factors[name] = round(importance * 100, 2) # تحويل إلى نسبة مئوية | |
| return { | |
| 'العوامل_المؤثرة': factors, | |
| 'المادة': material, | |
| 'وحدة': self.materials_data[material]['وحدة'], | |
| 'سعر_حالي': self.materials_data[material]['سعر'][0], | |
| 'اتجاه_السعر': self._get_price_trend(material) | |
| } | |
| def _get_price_trend(self, material): | |
| """تحديد اتجاه سعر المادة بناءً على البيانات التاريخية""" | |
| if material not in self.materials_data: | |
| return "غير معروف" | |
| prices = self.materials_data[material]['سعر'] | |
| if len(prices) < 2: | |
| return "غير معروف" | |
| # حساب متوسط التغير الشهري | |
| price_changes = [(prices[i] - prices[i+1]) / prices[i+1] * 100 for i in range(len(prices)-1)] | |
| avg_monthly_change = sum(price_changes) / len(price_changes) | |
| if avg_monthly_change > 1: | |
| return "ارتفاع حاد" | |
| elif avg_monthly_change > 0.2: | |
| return "ارتفاع معتدل" | |
| elif avg_monthly_change > -0.2: | |
| return "استقرار" | |
| elif avg_monthly_change > -1: | |
| return "انخفاض معتدل" | |
| else: | |
| return "انخفاض حاد" | |
| def export_price_forecast(self, materials, periods=6, output_file=None): | |
| """ | |
| تصدير توقعات الأسعار إلى ملف | |
| المعلمات: | |
| materials: قائمة المواد المطلوب التنبؤ بأسعارها | |
| periods: عدد الفترات المستقبلية (الشهور) | |
| output_file: مسار ملف الإخراج (اختياري) | |
| إرجاع: | |
| مسار الملف المصدر أو البيانات مباشرة إذا لم يتم تحديد ملف | |
| """ | |
| # التحقق من وجود المواد في البيانات | |
| valid_materials = [m for m in materials if m in self.materials_data] | |
| if not valid_materials: | |
| return None | |
| # إعداد بيانات التوقعات | |
| forecast_data = [] | |
| for material in valid_materials: | |
| # الحصول على اتجاهات الأسعار | |
| price_trends = self.get_price_trends(material, periods) | |
| for trend in price_trends: | |
| forecast_data.append({ | |
| 'المادة': material, | |
| 'الوحدة': self.materials_data[material]['وحدة'], | |
| 'التاريخ': trend['تاريخ'], | |
| 'السعر المتوقع': trend['سعر'], | |
| 'هامش الخطأ': '±5%' | |
| }) | |
| # تحويل البيانات إلى DataFrame | |
| forecast_df = pd.DataFrame(forecast_data) | |
| # تصدير البيانات إلى ملف إذا تم تحديده | |
| if output_file: | |
| try: | |
| ext = os.path.splitext(output_file)[1].lower() | |
| if ext == '.csv': | |
| forecast_df.to_csv(output_file, index=False, encoding='utf-8-sig') | |
| elif ext in ['.xlsx', '.xls']: | |
| forecast_df.to_excel(output_file, index=False) | |
| elif ext == '.json': | |
| forecast_df.to_json(output_file, orient='records', force_ascii=False) | |
| else: | |
| print(f"تنسيق غير مدعوم: {ext}") | |
| return None | |
| return output_file | |
| except Exception as e: | |
| print(f"خطأ في تصدير توقعات الأسعار: {str(e)}") | |
| return None | |
| return forecast_df |