diff --git "a/modules/cost_risk_analyzer.py" "b/modules/cost_risk_analyzer.py" --- "a/modules/cost_risk_analyzer.py" +++ "b/modules/cost_risk_analyzer.py" @@ -1,939 +1,1279 @@ -""" -فئة للتعامل مع نماذج اللغة الكبيرة (LLM) لتحليل المناقصات -""" - -import os -import json -import re -import requests -import numpy as np -import logging -from typing import Dict, List, Any, Union, Tuple, Optional -from datetime import datetime - -logger = logging.getLogger(__name__) - -class LLMProcessor: - """ - فئة للتعامل مع نماذج اللغة الكبيرة (LLM) لتحليل المناقصات - """ +"deviation_percentage": deviation_percentage, + "risk_level": risk_level, + "impact": "تأثير على هامش الربح وزيادة التكاليف الإجمالية", + "mitigation": self._generate_overrun_mitigation(category, deviation_percentage) + }) + + # ترتيب المخاطر حسب نسبة التجاوز + return sorted(overrun_risks, key=lambda x: x["deviation_percentage"], reverse=True) - def __init__(self, config: Dict[str, Any] = None): + def _analyze_cashflow_risks(self, costs: Dict[str, Any]) -> List[Dict[str, Any]]: """ - تهيئة معالج نماذج اللغة الكبيرة + تحليل مخاطر التدفق النقدي المعاملات: ---------- - config : Dict[str, Any], optional - إعدادات معالج نماذج اللغة الكبيرة + costs : Dict[str, Any] + بيانات التكاليف + + المخرجات: + -------- + List[Dict[str, Any]] + قائمة مخاطر التدفق النقدي """ - self.config = config or {} - self.model_name = self.config.get('model_name', "claude-3-haiku-20240307") - self.use_rag = self.config.get('use_rag', True) - self.temperature = self.config.get('temperature', 0.7) - self.max_tokens = self.config.get('max_tokens', 4096) - - # الحصول على مفتاح واجهة برمجة التطبيقات من متغيرات البيئة أو الإعدادات - self.api_key = self.config.get('api_key') or os.getenv("ANTHROPIC_API_KEY") - - if not self.api_key: - logger.warning("لم يتم تحديد مفتاح واجهة برمجة التطبيقات لـ Anthropic. لن تعمل وظائف LLM.") - - # تهيئة قاعدة بيانات المتجهات إذا كان استخدام RAG مفعلاً - if self.use_rag: - try: - from utils.vector_db import VectorDB - self.vector_db = VectorDB(self.config.get('vector_db_config', {})) - logger.info("تم تهيئة قاعدة بيانات المتجهات بنجاح.") - except ImportError as e: - logger.error(f"فشل في تهيئة قاعدة بيانات المتجهات: {str(e)}") - self.use_rag = False - - logger.info(f"تم تهيئة معالج نماذج اللغة الكبيرة: {self.model_name}, RAG: {self.use_rag}") + cashflow_risks = [] + + # استخراج بيانات التدفق النقدي + cashflow = costs.get("cashflow", []) + + if not cashflow: + # إنشاء مخاطر افتراضية للتدفق النقدي + cashflow_risks = [ + { + "type": "تأخر الدفعات", + "probability": 0.4, + "impact": 0.35, + "risk_score": 0.14, + "description": "تأخر في استلام الدفعات من العميل", + "mitigation": "وضع شروط دفع واضحة في العقد وتوفير تمويل احتياطي" + }, + { + "type": "فجوة تمويلية", + "probability": 0.3, + "impact": 0.4, + "risk_score": 0.12, + "description": "عدم كفاية التدفق النقدي لتغطية النفقات في بعض الفترات", + "mitigation": "تأمين تسهيلات ائتمانية وتحسين إدارة التدفق النقدي" + }, + { + "type": "تغير أسعار الفائدة", + "probability": 0.2, + "impact": 0.25, + "risk_score": 0.05, + "description": "زيادة تكاليف التمويل بسبب ارتفاع أسعار الفائدة", + "mitigation": "تأمين قروض بأسعار فائدة ثابتة والتفاوض على شروط تمويل مناسبة" + } + ] + else: + # تحليل بيانات التدفق النقدي لاستخراج المخاطر + negative_periods = [] + consecutive_negative = 0 + total_periods = len(cashflow) + + for i, period in enumerate(cashflow): + net_flow = period.get("net_cashflow", 0) + + if net_flow < 0: + consecutive_negative += 1 + negative_periods.append(i + 1) # رقم الفترة (مبتدئًا من 1) + else: + consecutive_negative = 0 + + # تقييم المخاطر بناءً على التحليل + if len(negative_periods) > 0: + risk = { + "type": "تدفق نقدي سلبي", + "probability": min(0.7, len(negative_periods) / total_periods), + "impact": min(0.8, 0.2 + (len(negative_periods) / total_periods) * 0.6), + "affected_periods": negative_periods, + "description": f"تدفق نقدي سلبي في {len(negative_periods)} فترات من أصل {total_periods}", + "mitigation": "تحسين جدولة الدفعات وترشيد النفقات في الفترات الحرجة" + } + risk["risk_score"] = risk["probability"] * risk["impact"] + cashflow_risks.append(risk) + + if consecutive_negative >= 2: + risk = { + "type": "تدفق نقدي سلبي متتالي", + "probability": min(0.8, 0.3 + (consecutive_negative / total_periods) * 0.5), + "impact": min(0.9, 0.4 + (consecutive_negative / total_periods) * 0.5), + "consecutive_periods": consecutive_negative, + "description": f"تدفق نقدي سلبي متتالي لمدة {consecutive_negative} فترات", + "mitigation": "تأمين تمويل احتياطي وإعادة التفاوض على شروط الدفع" + } + risk["risk_score"] = risk["probability"] * risk["impact"] + cashflow_risks.append(risk) + + # ترتيب المخاطر حسب درجة المخاطرة + return sorted(cashflow_risks, key=lambda x: x.get("risk_score", 0), reverse=True) - def analyze_requirements(self, requirements: List[Dict[str, Any]], context: Dict[str, Any] = None) -> Dict[str, Any]: + def _analyze_profit_impact(self, financial_risks: List[Dict[str, Any]], costs: Dict[str, Any]) -> Dict[str, Any]: """ - تحليل المتطلبات باستخدام نموذج اللغة الكبيرة + تحليل تأثير المخاطر على الأرباح المعاملات: ---------- - requirements : List[Dict[str, Any]] - قائمة المتطلبات المستخرجة من المستندات - context : Dict[str, Any], optional - معلومات السياق الإضافية + financial_risks : List[Dict[str, Any]] + قائمة المخاطر المالية + costs : Dict[str, Any] + بيانات التكاليف المخرجات: -------- Dict[str, Any] - نتائج تحليل المتطلبات + تحليل تأثير المخاطر على الأرباح """ - context = context or {} + # استخراج القيمة التعاقدية وإجمالي التكاليف + contract_value = costs.get("contract_value", 0) + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) + total_cost = sum(direct_costs.values()) + sum(indirect_costs.values()) - try: - # إعداد الاستعلام بناءً على المتطلبات والسياق - prompt = self._prepare_requirements_prompt(requirements, context) - - # استرجاع معلومات إضافية من قاعدة البيانات إذا كان استخدام RAG مفعلاً - if self.use_rag: - retrieval_results = self.vector_db.search( - query=self._create_search_query(requirements), - collection="requirements", - limit=5 - ) - prompt += "\n\nمعلومات إضافية مسترجعة:\n" + self._format_retrieval_results(retrieval_results) + if contract_value == 0 or total_cost == 0: + return { + "expected_profit": 0, + "profit_margin": 0, + "risk_adjusted_profit": 0, + "risk_adjusted_margin": 0, + "total_risk_impact": 0, + "probability_of_negative_profit": 0 + } + + # حساب الربح المتوقع وهامش الربح + expected_profit = contract_value - total_cost + profit_margin = (expected_profit / contract_value) * 100 + + # حساب إجمالي تأثير المخاطر + total_risk_impact = sum(risk.get("financial_impact", 0) * risk.get("probability", 0.5) for risk in financial_risks) + + # حساب الربح المعدل بالمخاطر + risk_adjusted_profit = expected_profit - total_risk_impact + risk_adjusted_margin = (risk_adjusted_profit / contract_value) * 100 + + # حساب احتمالية الربح السلبي + if expected_profit > 0: + probability_of_negative_profit = min(0.95, total_risk_impact / expected_profit) + else: + probability_of_negative_profit = 0.95 + + return { + "expected_profit": expected_profit, + "profit_margin": profit_margin, + "risk_adjusted_profit": risk_adjusted_profit, + "risk_adjusted_margin": risk_adjusted_margin, + "total_risk_impact": total_risk_impact, + "probability_of_negative_profit": probability_of_negative_profit + } + + def _generate_risk_management_plan(self, financial_risks: List[Dict[str, Any]], + cost_overrun_risks: List[Dict[str, Any]], + cashflow_risks: List[Dict[str, Any]]) -> List[str]: + """ + إعداد خطة إدارة المخاطر المالية + + المعاملات: + ---------- + financial_risks : List[Dict[str, Any]] + قائمة المخاطر المالية + cost_overrun_risks : List[Dict[str, Any]] + قائمة مخاطر تجاوز التكاليف + cashflow_risks : List[Dict[str, Any]] + قائمة مخاطر التدفق النقدي + + المخرجات: + -------- + List[str] + خطة إدارة المخاطر المالية + """ + management_plan = ["خطة إدارة المخاطر المالية:"] + + # إضافة استراتيجيات للمخاطر المالية العامة + if financial_risks: + management_plan.append("\n1. إدارة المخاطر المالية العامة:") + for i, risk in enumerate(financial_risks[:3]): # أعلى 3 مخاطر + management_plan.append(f" - {risk.get('name', risk.get('type', 'مخاطرة'))}: {risk.get('mitigation', '')}") + + # إضافة استراتيجيات لمخاطر تجاوز التكاليف + if cost_overrun_risks: + management_plan.append("\n2. إدارة مخاطر تجاوز التكاليف:") + for i, risk in enumerate(cost_overrun_risks[:3]): # أعلى 3 مخاطر + management_plan.append(f" - {risk.get('category', 'فئة تكلفة')}: {risk.get('mitigation', '')}") + + # إضافة استراتيجيات لمخاطر التدفق النقدي + if cashflow_risks: + management_plan.append("\n3. إدارة مخاطر التدفق النقدي:") + for i, risk in enumerate(cashflow_risks): + management_plan.append(f" - {risk.get('type', 'مخاطرة')}: {risk.get('mitigation', '')}") + + # إضافة إجراءات عامة + management_plan.extend([ + "\n4. إجراءات عامة لإدارة المخاطر المالية:", + " - إنشاء احتياطي مالي للطوارئ بنسبة 5-10% من قيمة المشروع", + " - مراقبة التكاليف بشكل أسبوعي وتحديث توقعات التدفق النقدي شهريًا", + " - تأمين تسهيلات ائتمانية احتياطية للتعامل مع فجوات التدفق النقدي", + " - مراجعة عقود التوريد ووضع آليات لمواجهة تقلبات الأسعار", + " - إعداد سيناريوهات بديلة وخطط طوارئ للمخاطر المالية العالية" + ]) + + return management_plan + + def _forecast_monthly_costs(self, costs: Dict[str, Any], forecast_months: int) -> List[Dict[str, Any]]: + """ + التنبؤ بالتكاليف الشهرية + + المعاملات: + ---------- + costs : Dict[str, Any] + بيانات التكاليف + forecast_months : int + عدد الأشهر المراد التنبؤ بها + + المخرجات: + -------- + List[Dict[str, Any]] + التنبؤ بالتكاليف الشهرية + """ + monthly_forecast = [] + + # استخراج بيانات التكاليف الشهرية السابقة + historical_monthly = costs.get("monthly", []) + + if not historical_monthly and len(historical_monthly) < 3: + # إذا لم تكن هناك بيانات تاريخية كافية، قم بإنشاء توقعات افتراضية + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) + total_cost = sum(direct_costs.values()) + sum(indirect_costs.values()) - # استدعاء النموذج - response = self._call_llm(prompt) + # توزيع التكاليف على الأشهر بشكل افتراضي + monthly_cost = total_cost / forecast_months - # معالجة الاستجابة - analysis = self._parse_requirements_response(response) + for month in range(1, forecast_months + 1): + # إضافة تذبذب عشوائي للواقعية + random_factor = 1 + (np.random.random() * 0.2 - 0.1) # بين 0.9 و 1.1 + + forecast = { + "month": month, + "total_cost": monthly_cost * random_factor, + "direct_cost": monthly_cost * 0.7 * random_factor, + "indirect_cost": monthly_cost * 0.3 * random_factor, + "variance": (random_factor - 1) * 100, + "cumulative_cost": monthly_cost * random_factor * month + } + + monthly_forecast.append(forecast) + else: + # استخدام البيانات التاريخية للتنبؤ + # حساب متوسط النمو الشهري + growth_rates = [] + for i in range(1, len(historical_monthly)): + prev_cost = historical_monthly[i-1].get("total_cost", 0) + curr_cost = historical_monthly[i].get("total_cost", 0) + if prev_cost > 0: + growth_rate = (curr_cost - prev_cost) / prev_cost + growth_rates.append(growth_rate) - logger.info("تم تحليل المتطلبات بنجاح.") - return analysis + # حساب متوسط معدل النمو + avg_growth_rate = np.mean(growth_rates) if growth_rates else 0.02 - except Exception as e: - logger.error(f"فشل في تحليل المتطلبات: {str(e)}") + # الحصول على آخر تكلفة شهرية + last_month = historical_monthly[-1] + last_month_cost = last_month.get("total_cost", 0) + last_month_direct = last_month.get("direct_cost", last_month_cost * 0.7) + last_month_indirect = last_month.get("indirect_cost", last_month_cost * 0.3) + last_month_number = last_month.get("month", 0) + last_month_cumulative = last_month.get("cumulative_cost", last_month_cost) + + # التنبؤ بالأشهر القادمة + for i in range(1, forecast_months + 1): + month_number = last_month_number + i + + # حساب التكلفة المتوقعة مع إضافة تذبذب عشوائي + random_factor = 1 + (np.random.random() * 0.1 - 0.05) # بين 0.95 و 1.05 + growth_factor = (1 + avg_growth_rate) * random_factor + + total_cost = last_month_cost * growth_factor ** i + direct_cost = last_month_direct * growth_factor ** i + indirect_cost = last_month_indirect * growth_factor ** i + + forecast = { + "month": month_number, + "total_cost": total_cost, + "direct_cost": direct_cost, + "indirect_cost": indirect_cost, + "variance": (growth_factor - 1) * 100, + "cumulative_cost": last_month_cumulative + sum(monthly_forecast[j]["total_cost"] for j in range(i-1)) + total_cost + } + + monthly_forecast.append(forecast) + + return monthly_forecast + + def _forecast_cost_distribution(self, costs: Dict[str, Any], forecast_months: int) -> Dict[str, Any]: + """ + التنبؤ بتوزيع التكاليف + + المعاملات: + ---------- + costs : Dict[str, Any] + بيانات التكاليف + forecast_months : int + عدد الأشهر المراد التنبؤ بها + + المخرجات: + -------- + Dict[str, Any] + التنبؤ بتوزيع التكاليف + """ + # استخراج بيانات توزيع التكاليف + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) + + if not direct_costs and not indirect_costs: + # إنشاء توزيع افتراضي للتكاليف return { - "summary": "حدث خطأ أثناء تحليل المتطلبات", - "error": str(e), - "technical": [], - "financial": [], - "legal": [], - "local_content": [], - "total_count": len(requirements), - "mandatory_count": 0, - "avg_difficulty": 0, - "local_content_percentage": 0 + "direct": { + "مواد": 40, + "عمالة": 30, + "معدات": 20, + "مقاولين": 10 + }, + "indirect": { + "إدارة": 40, + "منشآت": 25, + "تأمين": 20, + "أخرى": 15 + }, + "overall": { + "مواد": 30, + "عمالة": 25, + "معدات": 15, + "مقاولين": 5, + "إدارة": 10, + "منشآت": 5, + "تأمين": 5, + "أخرى": 5 + } } + + # حساب إجمالي التكاليف + total_direct = sum(direct_costs.values()) + total_indirect = sum(indirect_costs.values()) + total_cost = total_direct + total_indirect + + # حساب توزيع التكاليف المباشرة + direct_distribution = {} + for category, amount in direct_costs.items(): + direct_distribution[category] = (amount / total_direct * 100) if total_direct > 0 else 0 + + # حساب توزيع التكاليف غير المباشرة + indirect_distribution = {} + for category, amount in indirect_costs.items(): + indirect_distribution[category] = (amount / total_indirect * 100) if total_indirect > 0 else 0 + + # حساب التوزيع الإجمالي + overall_distribution = {} + for category, amount in direct_costs.items(): + overall_distribution[category] = (amount / total_cost * 100) if total_cost > 0 else 0 + + for category, amount in indirect_costs.items(): + overall_distribution[category] = (amount / total_cost * 100) if total_cost > 0 else 0 + + return { + "direct": direct_distribution, + "indirect": indirect_distribution, + "overall": overall_distribution + } - def analyze_local_content(self, local_content_data: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: + def _forecast_deviations(self, costs: Dict[str, Any], forecast_months: int) -> List[Dict[str, Any]]: """ - تحليل بيانات المحتوى المحلي باستخدام نموذج اللغة الكبيرة + التنبؤ بالانحرافات المحتملة المعاملات: ---------- - local_content_data : Dict[str, Any] - بيانات المحتوى المحلي المستخرجة - context : Dict[str, Any], optional - معلومات السياق الإضافية + costs : Dict[str, Any] + بيانات التكاليف + forecast_months : int + عدد الأشهر المراد التنبؤ بها المخرجات: -------- - Dict[str, Any] - نتائج تحليل المحتوى المحلي + List[Dict[str, Any]] + التنبؤ بالانحرافات المحتملة """ - context = context or {} + deviation_forecast = [] - try: - # إعداد الاستعلام بناءً على بيانات المحتوى المحلي والسياق - prompt = self._prepare_local_content_prompt(local_content_data, context) - - # استرجاع معلومات إضافية من قاعدة البيانات إذا كان استخدام RAG مفعلاً - if self.use_rag: - retrieval_results = self.vector_db.search( - query=self._create_search_query(local_content_data), - collection="local_content", - limit=5 - ) - prompt += "\n\nمعلومات إضافية مسترجعة:\n" + self._format_retrieval_results(retrieval_results) + # استخراج التكاليف الفعلية والمخططة + actual_costs = costs.get("actual", {}) + planned_costs = costs.get("planned", {}) + + # إذا لم تكن هناك بيانات كافية، قم بإنشاء توقعات افتراضية + if not actual_costs or not planned_costs: + # القوائم الافتراضية للفئات + categories = ["مواد", "عمالة", "معدات", "مقاولين", "إدارة", "منشآت", "تأمين"] - # استدعاء النموذج - response = self._call_llm(prompt) + for category in categories: + # إنشاء انحراف افتراضي + random_deviation = np.random.normal(0, 5) # انحراف عشوائي بمتوسط 0 وانحراف معياري 5 + + deviation_forecast.append({ + "category": category, + "expected_deviation_percentage": random_deviation, + "confidence_level": "متوسط", + "impact_level": "منخفض" if abs(random_deviation) < 5 else "متوسط" if abs(random_deviation) < 10 else "مرتفع", + "recommendation": self._generate_deviation_recommendation(category, random_deviation) + }) + else: + # تحليل الانحرافات السابقة واستخدامها للتنبؤ + for category in set(list(actual_costs.keys()) + list(planned_costs.keys())): + actual = actual_costs.get(category, 0) + planned = planned_costs.get(category, 0) + + if planned > 0: + historical_deviation = ((actual - planned) / planned) * 100 + + # استخدام الانحراف التاريخي للتنبؤ بالانحراف المستقبلي + # يمكن إضافة عوامل أخرى هنا للتحسين + expected_deviation = historical_deviation * (1 + np.random.normal(0, 0.2)) + + confidence_level = "مرتفع" if abs(historical_deviation) < 5 else "متوسط" if abs(historical_deviation) < 15 else "منخفض" + impact_level = "منخفض" if abs(expected_deviation) < 5 else "متوسط" if abs(expected_deviation) < 10 else "مرتفع" + + deviation_forecast.append({ + "category": category, + "historical_deviation_percentage": historical_deviation, + "expected_deviation_percentage": expected_deviation, + "confidence_level": confidence_level, + "impact_level": impact_level, + "recommendation": self._generate_deviation_recommendation(category, expected_deviation) + }) + + # ترتيب الانحرافات حسب القيمة المطلقة للانحراف المتوقع + return sorted(deviation_forecast, key=lambda x: abs(x["expected_deviation_percentage"]), reverse=True) + + def _analyze_procurement_costs(self, procurement_data: Dict[str, Any], costs: Dict[str, Any]) -> Dict[str, Any]: + """ + تحليل تكاليف المشتريات + + المعاملات: + ---------- + procurement_data : Dict[str, Any] + بيانات المشتريات + costs : Dict[str, Any] + بيانات التكاليف - # معالجة الاستجابة - analysis = self._parse_local_content_response(response) + المخرجات: + -------- + Dict[str, Any] + نتائج تحليل تكاليف المشتريات + """ + # استخراج بيانات المشتريات + items = procurement_data.get("items", []) + + if not items: + return { + "total_procurement_cost": 0, + "local_procurement_cost": 0, + "import_procurement_cost": 0, + "local_percentage": 0, + "categories": {}, + "cost_efficiency": 0 + } + + # حساب إجمالي تكاليف المشتريات + total_cost = sum(item.get("cost", 0) for item in items) + local_cost = sum(item.get("cost", 0) for item in items if item.get("source_type") == "local") + import_cost = sum(item.get("cost", 0) for item in items if item.get("source_type") == "import") + + # حساب توزيع التكاليف حسب الفئة + categories = {} + for item in items: + category = item.get("category", "أخرى") + cost = item.get("cost", 0) - logger.info("تم تحليل بيانات المحتوى المحلي بنجاح.") - return analysis + if category in categories: + categories[category] += cost + else: + categories[category] = cost + + # حساب كفاءة التكلفة + reference_cost = self.reference_data.get("avg_procurement_cost", total_cost * 1.1) + cost_efficiency = ((reference_cost - total_cost) / reference_cost) * 100 if reference_cost > 0 else 0 + + return { + "total_procurement_cost": total_cost, + "local_procurement_cost": local_cost, + "import_procurement_cost": import_cost, + "local_percentage": (local_cost / total_cost * 100) if total_cost > 0 else 0, + "categories": categories, + "cost_efficiency": cost_efficiency + } + + def _analyze_suppliers(self, procurement_data: Dict[str, Any]) -> Dict[str, Any]: + """ + تحليل الموردين + + المعاملات: + ---------- + procurement_data : Dict[str, Any] + بيانات المشتريات - except Exception as e: - logger.error(f"فشل في تحليل بيانات المحتوى المحلي: {str(e)}") + المخرجات: + -------- + Dict[str, Any] + نتائج تحليل الموردين + """ + # استخراج بيانات الموردين + suppliers = procurement_data.get("suppliers", []) + items = procurement_data.get("items", []) + + if not suppliers or not items: return { - "estimated_local_content": 0, - "required_local_content": 0, - "required_materials": [], - "improvement_strategies": [] + "total_suppliers": 0, + "local_suppliers": 0, + "supplier_diversity": 0, + "supplier_concentration": {}, + "supplier_performance": [] + } + + # حساب إحصائيات الموردين + total_suppliers = len(suppliers) + local_suppliers = len([s for s in suppliers if s.get("location", "").startswith("SA")]) + + # حساب تنوع الموردين + categories = set(s.get("category", "") for s in suppliers) + supplier_diversity = len(categories) / max(1, total_suppliers) * 100 + + # حساب تركيز الموردين + supplier_items = {} + for item in items: + supplier_id = item.get("supplier_id", "") + cost = item.get("cost", 0) + + if supplier_id in supplier_items: + supplier_items[supplier_id] += cost + else: + supplier_items[supplier_id] = cost + + total_cost = sum(supplier_items.values()) + supplier_concentration = {} + + for supplier_id, cost in supplier_items.items(): + supplier_name = next((s.get("name", supplier_id) for s in suppliers if s.get("id") == supplier_id), supplier_id) + supplier_concentration[supplier_name] = (cost / total_cost * 100) if total_cost > 0 else 0 + + # تقييم أداء الموردين + supplier_performance = [] + for supplier in suppliers: + performance = { + "id": supplier.get("id", ""), + "name": supplier.get("name", ""), + "reliability": supplier.get("reliability", 3), + "delivery_time": supplier.get("avg_delivery_time", 30), + "quality": supplier.get("quality_rating", 3), + "price_competitiveness": supplier.get("price_rating", 3), + "overall_rating": (supplier.get("reliability", 3) + supplier.get("quality_rating", 3) + supplier.get("price_rating", 3)) / 3 } + supplier_performance.append(performance) + + # ترتيب الموردين حسب التقييم العام + supplier_performance.sort(key=lambda x: x["overall_rating"], reverse=True) + + return { + "total_suppliers": total_suppliers, + "local_suppliers": local_suppliers, + "local_percentage": (local_suppliers / total_suppliers * 100) if total_suppliers > 0 else 0, + "supplier_diversity": supplier_diversity, + "supplier_concentration": supplier_concentration, + "supplier_performance": supplier_performance + } - def analyze_supply_chain(self, supply_chain_data: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: + def _analyze_local_content(self, procurement_data: Dict[str, Any]) -> Dict[str, Any]: """ - تحليل بيانات سلسلة الإمداد باستخدام نموذج اللغة الكبيرة + تحليل المحتوى المحلي المعاملات: ---------- - supply_chain_data : Dict[str, Any] - بيانات سلسلة الإمداد المستخرجة - context : Dict[str, Any], optional - معلومات السياق الإضافية + procurement_data : Dict[str, Any] + بيانات المشتريات المخرجات: -------- Dict[str, Any] - نتائج تحليل سلسلة الإمداد + نتائج تحليل المحتوى المحلي + """ + # استخراج بيانات المشتريات + items = procurement_data.get("items", []) + + if not items: + return { + "local_content_percentage": 0, + "local_content_by_category": {}, + "local_content_trend": [], + "local_content_gap": 0, + "improvement_potential": 0 + } + + # حساب نسبة المحتوى المحلي + total_cost = sum(item.get("cost", 0) for item in items) + local_content = sum(item.get("cost", 0) * item.get("local_content_contribution", 0) / 100 for item in items) + + local_content_percentage = (local""" +محلل تكاليف ومخاطر المشاريع +يقوم بتحليل التكاليف وتقييم المخاطر المالية للمشاريع +""" + +import logging +import json +import os +from typing import Dict, List, Any, Tuple, Optional, Union +import numpy as np +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +class CostRiskAnalyzer: + """ + محلل تكاليف ومخاطر المشاريع + """ + + def __init__(self, config=None): + """ + تهيئة محلل التكاليف والمخاطر + + المعاملات: + ---------- + config : Dict, optional + إعدادات المحلل """ - context = context or {} + self.config = config or {} + # تحميل بيانات المخاطر والتكاليف المرجعية + self.reference_data = self._load_reference_data() + + logger.info("تم تهيئة محلل تكاليف ومخاطر المشاريع") + + def analyze_costs(self, project_data: Dict[str, Any]) -> Dict[str, Any]: + """ + تحليل تكاليف المشروع + + المعاملات: + ---------- + project_data : Dict[str, Any] + بيانات المشروع + + المخرجات: + -------- + Dict[str, Any] + نتائج تحليل التكاليف + """ try: - # إعداد الاستعلام بناءً على بيانات سلسلة الإمداد والسياق - prompt = self._prepare_supply_chain_prompt(supply_chain_data, context) - - # استرجاع معلومات إضافية من قاعدة البيانات إذا كان استخدام RAG مفعلاً - if self.use_rag: - retrieval_results = self.vector_db.search( - query=self._create_search_query(supply_chain_data), - collection="supply_chain", - limit=5 - ) - prompt += "\n\nمعلومات إضافية مسترجعة:\n" + self._format_retrieval_results(retrieval_results) + logger.info("بدء تحليل تكاليف المشروع") + + # استخراج بيانات التكاليف + costs = project_data.get("costs", {}) - # استدعاء النموذج - response = self._call_llm(prompt) + # تحليل هيكل التكاليف + cost_structure = self._analyze_cost_structure(costs) - # معالجة الاستجابة - analysis = self._parse_supply_chain_response(response) + # تحليل المؤشرات المالية + financial_indicators = self._analyze_financial_indicators(costs, project_data) + + # تحليل الانحرافات + deviations = self._analyze_cost_deviations(costs) + + # تحديد نقاط القوة والضعف + strengths_weaknesses = self._identify_cost_strengths_weaknesses(cost_structure, financial_indicators) + + # إعداد التوصيات + recommendations = self._generate_cost_recommendations(strengths_weaknesses, deviations) + + # إعداد النتائج + results = { + "cost_structure": cost_structure, + "financial_indicators": financial_indicators, + "deviations": deviations, + "strengths_weaknesses": strengths_weaknesses, + "recommendations": recommendations + } - logger.info("تم تحليل بيانات سلسلة الإمداد بنجاح.") - return analysis + logger.info("اكتمل تحليل تكاليف المشروع") + return results except Exception as e: - logger.error(f"فشل في تحليل بيانات سلسلة الإمداد: {str(e)}") + logger.error(f"فشل في تحليل تكاليف المشروع: {str(e)}") return { - "supply_chain_risks": [], - "optimizations": [], - "local_suppliers_availability": 0 + "error": str(e), + "cost_structure": {}, + "financial_indicators": {}, + "deviations": [], + "strengths_weaknesses": {"strengths": [], "weaknesses": []}, + "recommendations": [] } - def analyze_risks(self, extracted_text: str, context: Dict[str, Any] = None) -> Dict[str, Any]: + def analyze_risks(self, project_data: Dict[str, Any]) -> Dict[str, Any]: """ - تحليل المخاطر من نص المناقصة + تحليل مخاطر المشروع المعاملات: ---------- - extracted_text : str - النص المستخرج من المناقصة - context : Dict[str, Any], optional - معلومات السياق الإضافية + project_data : Dict[str, Any] + بيانات المشروع المخرجات: -------- Dict[str, Any] نتائج تحليل المخاطر """ - context = context or {} - try: - # إعداد الاستعلام - prompt = self._prepare_risk_analysis_prompt(extracted_text, context) - - # استرجاع معلومات إضافية من قاعدة البيانات إذا كان استخدام RAG مفعلاً - if self.use_rag: - # استخراج كلمات مفتاحية من النص - keywords = self._extract_keywords(extracted_text) - query = " ".join(keywords[:10]) # استخدام أهم 10 كلمات مفتاحية - - retrieval_results = self.vector_db.search( - query=query, - collection="risks", - limit=5 - ) - prompt += "\n\nمعلومات إضافية مسترجعة:\n" + self._format_retrieval_results(retrieval_results) + logger.info("بدء تحليل مخاطر المشروع") + + # استخراج بيانات المخاطر + risks = project_data.get("risks", []) + costs = project_data.get("costs", {}) + + # تحليل المخاطر المالية + financial_risks = self._analyze_financial_risks(risks, costs) - # استدعاء النموذج - response = self._call_llm(prompt) + # تحليل مخاطر تجاوز التكاليف + cost_overrun_risks = self._analyze_cost_overrun_risks(costs) - # معالجة الاستجابة - analysis = self._parse_risk_response(response) + # تحليل مخاطر التدفق النقدي + cashflow_risks = self._analyze_cashflow_risks(costs) + + # تحليل تأثير المخاطر على الأرباح + profit_impact = self._analyze_profit_impact(financial_risks, costs) + + # إعداد خطة إدارة المخاطر المالية + risk_management_plan = self._generate_risk_management_plan( + financial_risks, cost_overrun_risks, cashflow_risks + ) + + # إعداد النتائج + results = { + "financial_risks": financial_risks, + "cost_overrun_risks": cost_overrun_risks, + "cashflow_risks": cashflow_risks, + "profit_impact": profit_impact, + "risk_management_plan": risk_management_plan + } - logger.info("تم تحليل المخاطر بنجاح.") - return analysis + logger.info("اكتمل تحليل مخاطر المشروع") + return results except Exception as e: - logger.error(f"فشل في تحليل المخاطر: {str(e)}") + logger.error(f"فشل في تحليل مخاطر المشروع: {str(e)}") return { - "all_risks": [], - "technical_risks": [], + "error": str(e), "financial_risks": [], - "supply_chain_risks": [], - "legal_risks": [], - "mitigation_plan": [], - "avg_severity": 0 + "cost_overrun_risks": [], + "cashflow_risks": [], + "profit_impact": {}, + "risk_management_plan": [] } - def generate_summary(self, extracted_data: Dict[str, Any], analysis_results: Dict[str, Any]) -> Dict[str, Any]: + def forecast_costs(self, project_data: Dict[str, Any], forecast_months: int = 12) -> Dict[str, Any]: """ - إعداد ملخص شامل للمناقصة ونتائج التحليل + التنبؤ بتكاليف المشروع المعاملات: ---------- - extracted_data : Dict[str, Any] - البيانات المستخرجة من المستندات - analysis_results : Dict[str, Any] - نتائج التحليلات المختلفة + project_data : Dict[str, Any] + بيانات المشروع + forecast_months : int, optional + عدد الأشهر المراد التنبؤ بها (افتراضي: 12) المخرجات: -------- Dict[str, Any] - الملخص والتوصيات + نتائج التنبؤ بالتكاليف """ try: - # إعداد الاستعلام بناءً على البيانات المستخرجة ونتائج التحليل - prompt = self._prepare_summary_prompt(extracted_data, analysis_results) + logger.info(f"بدء التنبؤ بتكاليف المشروع لمدة {forecast_months} أشهر") - # استرجاع معلومات إضافية من قاعدة البيانات إذا كان استخدام RAG مفعلاً - if self.use_rag: - # إنشاء استعلام متجه من البيانات المستخرجة - query = self._create_search_query(extracted_data) - - retrieval_results = self.vector_db.search( - query=query, - collection="summaries", - limit=3 - ) - prompt += "\n\nأمثلة مشابهة من المناقصات السابقة:\n" + self._format_retrieval_results(retrieval_results) + # استخراج بيانات التكاليف + costs = project_data.get("costs", {}) - # استدعاء النموذج - response = self._call_llm(prompt) + # التنبؤ بالتكاليف الشهرية + monthly_forecast = self._forecast_monthly_costs(costs, forecast_months) - # معالجة الاستجابة - summary = self._parse_summary_response(response) + # التنبؤ بتوزيع التكاليف + cost_distribution_forecast = self._forecast_cost_distribution(costs, forecast_months) - logger.info("تم إعداد الملخص بنجاح.") - return summary + # التنبؤ بالانحرافات المحتملة + deviation_forecast = self._forecast_deviations(costs, forecast_months) + + # إعداد النتائج + results = { + "monthly_forecast": monthly_forecast, + "cost_distribution_forecast": cost_distribution_forecast, + "deviation_forecast": deviation_forecast, + "forecast_period": { + "months": forecast_months, + "start_date": datetime.now().strftime("%Y-%m-%d"), + "end_date": (datetime.now() + timedelta(days=30 * forecast_months)).strftime("%Y-%m-%d") + } + } + + logger.info("اكتمل التنبؤ بتكاليف المشروع") + return results except Exception as e: - logger.error(f"فشل في إعداد الملخص: {str(e)}") + logger.error(f"فشل في التنبؤ بتكاليف المشروع: {str(e)}") return { - "executive_summary": "حدث خطأ أثناء إعداد الملخص", - "recommendations": [], - "bid_strategy": {}, - "error": str(e) + "error": str(e), + "monthly_forecast": [], + "cost_distribution_forecast": {}, + "deviation_forecast": [] } - def _call_llm(self, prompt: str) -> str: + def generate_procurement_report(self, project_data: Dict[str, Any]) -> Dict[str, Any]: """ - استدعاء نموذج اللغة الكبيرة + إنشاء تقرير المشتريات المعاملات: ---------- - prompt : str - الاستعلام المُرسل للنموذج + project_data : Dict[str, Any] + بيانات المشروع المخرجات: -------- - str - استجابة النموذج + Dict[str, Any] + تقرير المشتريات """ - if not self.api_key: - logger.warning("لم يتم تحديد مفتاح واجهة برمجة التطبيقات.") - return "لم يتم تحديد مفتاح واجهة برمجة التطبيقات. يرجى تكوين مفتاح API في الإعدادات." - try: - headers = { - "x-api-key": self.api_key, - "content-type": "application/json", - "anthropic-version": "2023-06-01" - } + logger.info("بدء إنشاء تقرير المشتريات") - data = { - "model": self.model_name, - "messages": [{"role": "user", "content": prompt}], - "temperature": self.temperature, - "max_tokens": self.max_tokens - } + # استخراج بيانات المشتريات + procurement_data = project_data.get("procurement", {}) + costs = project_data.get("costs", {}) + + # تحليل تكاليف المشتريات + procurement_costs = self._analyze_procurement_costs(procurement_data, costs) + + # تحليل الموردين + suppliers_analysis = self._analyze_suppliers(procurement_data) - response = requests.post( - "https://api.anthropic.com/v1/messages", - headers=headers, - json=data + # تحليل المحتوى المحلي + local_content_analysis = self._analyze_local_content(procurement_data) + + # تحديد فرص التحسين + improvement_opportunities = self._identify_procurement_improvements( + procurement_costs, suppliers_analysis, local_content_analysis ) - if response.status_code == 200: - return response.json()["content"][0]["text"] - else: - logger.error(f"فشل في استدعاء النموذج: {response.status_code} - {response.text}") - return f"فشل في استدعاء النموذج: {response.status_code}" - + # إعداد النتائج + results = { + "procurement_costs": procurement_costs, + "suppliers_analysis": suppliers_analysis, + "local_content_analysis": local_content_analysis, + "improvement_opportunities": improvement_opportunities, + "summary": self._generate_procurement_summary( + procurement_costs, suppliers_analysis, local_content_analysis + ) + } + + logger.info("اكتمل إنشاء تقرير المشتريات") + return results + except Exception as e: - logger.error(f"خطأ في استدعاء النموذج: {str(e)}") - return f"خطأ في استدعاء النموذج: {str(e)}" - - def _prepare_requirements_prompt(self, requirements: List[Dict[str, Any]], context: Dict[str, Any]) -> str: - """ - إعداد استعلام لتحليل المتطلبات - """ - prompt = """ - أنت خبير في تحليل المناقصات والعقود. يرجى تحليل المتطلبات التالية وتقديم نظرة ثاقبة حول جودتها واكتمالها ووضوحها وأي فجوات أو مخاطر محتملة. - - المتطلبات: - """ - - for i, req in enumerate(requirements): - prompt += f"\n{i+1}. {req.get('title', 'متطلب')}: {req.get('description', '')}" - prompt += f"\n الفئة: {req.get('category', 'عامة')}, الأهمية: {req.get('importance', 'عادية')}" - - prompt += """ - - يرجى تقديم التحليل التالي: - 1. ملخص شامل للمتطلبات وجودتها العامة - 2. تصنيف المتطلبات إلى الفئات التالية: - - متطلبات فنية - - متطلبات مالية - - متطلبات قانونية - - متطلبات المحتوى المحلي - 3. تحديد عدد المتطلبات الإلزامية - 4. تقييم متوسط صعوبة التنفيذ (على مقياس من 1 إلى 5) - 5. تقدير نسبة متطلبات المحتوى المحلي من إجمالي المتطلبات - 6. تحديد أي فجوات أو تناقضات في المتطلبات - 7. اقتراح تحسينات للمتطلبات غير الواضحة - - قدم الإجابة بتنسيق JSON التالي: - { - "summary": "ملخص شامل للمتطلبات", - "technical": [{"title": "عنوان المتطلب", "description": "وصف المتطلب", "analysis": "تحليل المتطلب", "importance": "الأهمية", "difficulty": 3, "gaps": "الفجوات", "improvements": "التحسينات المقترحة"}], - "financial": [...], - "legal": [...], - "local_content": [...], - "total_count": 10, - "mandatory_count": 5, - "avg_difficulty": 3.5, - "local_content_percentage": 25 - } - """ - - if context: - prompt += "\n\nمعلومات سياقية إضافية:\n" - for key, value in context.items(): - prompt += f"{key}: {value}\n" - - return prompt + logger.error(f"فشل في إنشاء تقرير المشتريات: {str(e)}") + return { + "error": str(e), + "procurement_costs": {}, + "suppliers_analysis": {}, + "local_content_analysis": {}, + "improvement_opportunities": [], + "summary": "" + } - def _prepare_local_content_prompt(self, local_content_data: Dict[str, Any], context: Dict[str, Any]) -> str: - """ - إعداد استعلام لتحليل المحتوى المحلي + def _analyze_cost_structure(self, costs: Dict[str, Any]) -> Dict[str, Any]: """ - prompt = """ - أنت خبير في تحليل المحتوى المحلي وسلاسل الإمداد في المملكة العربية السعودية. يرجى تحليل البيانات التالية وتقديم توصيات لتحسين نسبة المحتوى المحلي. + تحليل هيكل التكاليف - بيانات المحتوى المحلي: + المعاملات: + ---------- + costs : Dict[str, Any] + بيانات التكاليف + + المخرجات: + -------- + Dict[str, Any] + نتائج تحليل هيكل التكاليف """ + # استخراج تفاصيل التكاليف + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) - prompt += f"\nنسبة المحتوى المحلي المطلوبة: {local_content_data.get('required_percentage', 'غير محدد')}" - prompt += f"\nالفئة: {local_content_data.get('category', 'غير محدد')}" - - if 'materials' in local_content_data: - prompt += "\n\nالمواد والمنتجات المطلوبة:" - for material in local_content_data['materials']: - prompt += f"\n- {material.get('name', '')} - الكمية: {material.get('quantity', '')}, وحدة القياس: {material.get('unit', '')}" + # حساب إجمالي التكاليف + total_direct = sum(direct_costs.values()) + total_indirect = sum(indirect_costs.values()) + total_cost = total_direct + total_indirect - if 'services' in local_content_data: - prompt += "\n\nالخدمات المطلوبة:" - for service in local_content_data['services']: - prompt += f"\n- {service.get('name', '')} - {service.get('description', '')}" + if total_cost == 0: + return { + "total_cost": 0, + "direct_percentage": 0, + "indirect_percentage": 0, + "direct_breakdown": {}, + "indirect_breakdown": {}, + "cost_per_category": {} + } - prompt += """ + # حساب النسب المئوية + direct_percentage = (total_direct / total_cost) * 100 + indirect_percentage = (total_indirect / total_cost) * 100 + + # تحليل تفاصيل التكاليف المباشرة + direct_breakdown = {} + for category, amount in direct_costs.items(): + direct_breakdown[category] = { + "amount": amount, + "percentage_of_direct": (amount / total_direct) * 100 if total_direct > 0 else 0, + "percentage_of_total": (amount / total_cost) * 100 + } - يرجى تقديم التحليل التالي: - 1. تقدير نسبة المحتوى المحلي المتوقعة بناءً على البيانات المقدمة - 2. تحليل مدى توافق النسبة المتوقعة مع النسبة المطلوبة - 3. تحديد المواد والمنتجات التي يمكن توريدها محلياً - 4. اقتراح استراتيجيات لزيادة نسبة المحتوى المحلي - 5. تحديد الموردين المحليين المحتملين (إن وجدت معلومات) + # تحليل تفاصيل التكاليف غير المباشرة + indirect_breakdown = {} + for category, amount in indirect_costs.items(): + indirect_breakdown[category] = { + "amount": amount, + "percentage_of_indirect": (amount / total_indirect) * 100 if total_indirect > 0 else 0, + "percentage_of_total": (amount / total_cost) * 100 + } - قدم الإجابة بتنسيق JSON التالي: - { - "estimated_local_content": 35.5, - "required_local_content": 40, - "required_materials": [ - {"name": "اسم المادة", "quantity": "الكمية", "unit": "وحدة القياس", "local_availability": "متوفر محلياً؟", "potential_suppliers": ["مورد 1", "مورد 2"]} - ], - "improvement_strategies": ["استراتيجية 1", "استراتيجية 2"] + # تجميع التكاليف حسب الفئة + cost_per_category = {} + for category, amount in direct_costs.items(): + cost_per_category[f"مباشرة - {category}"] = amount + + for category, amount in indirect_costs.items(): + cost_per_category[f"غير مباشرة - {category}"] = amount + + return { + "total_cost": total_cost, + "direct_percentage": direct_percentage, + "indirect_percentage": indirect_percentage, + "direct_breakdown": direct_breakdown, + "indirect_breakdown": indirect_breakdown, + "cost_per_category": cost_per_category } - """ - - if context: - prompt += "\n\nمعلومات سياقية إضافية:\n" - for key, value in context.items(): - prompt += f"{key}: {value}\n" - - return prompt - def _prepare_supply_chain_prompt(self, supply_chain_data: Dict[str, Any], context: Dict[str, Any]) -> str: - """ - إعداد استعلام لتحليل سلسلة الإمداد - """ - prompt = """ - أنت خبير في سلاسل الإمداد والمشتريات. يرجى تحليل بيانات سلسلة الإمداد التالية وتقديم توصيات لتحسين الكفاءة وتقليل المخاطر. - - بيانات سلسلة الإمداد: - """ - - if 'materials' in supply_chain_data: - prompt += "\n\nالمواد والمنتجات:" - for material in supply_chain_data['materials']: - prompt += f"\n- {material.get('name', '')} - المصدر: {material.get('source', '')}, وقت التوريد: {material.get('lead_time', '')}" - - if 'suppliers' in supply_chain_data: - prompt += "\n\nالموردين:" - for supplier in supply_chain_data['suppliers']: - prompt += f"\n- {supplier.get('name', '')} - الموقع: {supplier.get('location', '')}, التصنيف: {supplier.get('rating', '')}" - - if 'logistics' in supply_chain_data: - prompt += "\n\nالخدمات اللوجستية:" - for logistic in supply_chain_data['logistics']: - prompt += f"\n- {logistic.get('type', '')} - المدة: {logistic.get('duration', '')}, التكلفة: {logistic.get('cost', '')}" - - prompt += """ - - يرجى تقديم التحليل التالي: - 1. تحليل المخاطر المحتملة في سلسلة الإمداد - 2. اقتراح تحسينات لتقليل المخاطر وزيادة الكفاءة - 3. تقييم مدى توفر الموردين المحليين للمواد المطلوبة - 4. اقتراح استراتيجية مشتريات فعالة - - قدم الإجابة بتنسيق JSON التالي: - { - "supply_chain_risks": [ - {"risk": "وصف المخاطرة", "severity": 4, "probability": 3, "impact": "تأثير المخاطرة", "mitigation": "إجراءات التخفيف"} - ], - "optimizations": ["تحسين 1", "تحسين 2"], - "local_suppliers_availability": 65.5, - "procurement_strategy": { - "approach": "نهج المشتريات", - "timeline": "الجدول الزمني", - "key_considerations": ["اعتبار 1", "اعتبار 2"] - } - } + def _analyze_financial_indicators(self, costs: Dict[str, Any], project_data: Dict[str, Any]) -> Dict[str, Any]: """ + تحليل المؤشرات المالية - if context: - prompt += "\n\nمعلومات سياقية إضافية:\n" - for key, value in context.items(): - prompt += f"{key}: {value}\n" - - return prompt - - def _prepare_risk_analysis_prompt(self, extracted_text: str, context: Dict[str, Any]) -> str: - """ - إعداد استعلام لتحليل المخاطر + المعاملات: + ---------- + costs : Dict[str, Any] + بيانات التكاليف + project_data : Dict[str, Any] + بيانات المشروع + + المخرجات: + -------- + Dict[str, Any] + نتائج تحليل المؤشرات المالية """ - # اختصار النص المستخرج إذا كان طويلاً جدًا - max_chars = 15000 # الحد الأقصى لطول النص - if len(extracted_text) > max_chars: - extracted_text = extracted_text[:max_chars] + "... (تم اختصار النص)" + # استخراج البيانات المالية + contract_value = project_data.get("contract_value", 0) + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) - prompt = """ - أنت خبير في تحليل المخاطر في المناقصات والمشاريع. يرجى تحليل النص التالي من وثيقة المناقصة وتحديد المخاطر المحتملة. + # حساب إجمالي التكاليف + total_direct = sum(direct_costs.values()) + total_indirect = sum(indirect_costs.values()) + total_cost = total_direct + total_indirect - نص المناقصة: - """ + if total_cost == 0 or contract_value == 0: + return { + "profit_margin": 0, + "cost_revenue_ratio": 0, + "breakeven_point": 0, + "roi": 0, + "cost_efficiency": 0 + } - prompt += f"\n{extracted_text}" + # حساب هامش الربح + profit = contract_value - total_cost + profit_margin = (profit / contract_value) * 100 - prompt += """ + # حساب نسبة التكلفة إلى الإيرادات + cost_revenue_ratio = (total_cost / contract_value) * 100 - يرجى تحديد وتحليل المخاطر التالية: - 1. المخاطر التقنية - 2. المخاطر المالية - 3. مخاطر سلسلة الإمداد - 4. المخاطر القانونية + # حساب نقطة التعادل + breakeven_point = total_indirect / (1 - (total_direct / contract_value)) - لكل مخاطرة، يرجى تقديم: - - وصف المخاطرة - - درجة الخطورة (على مقياس من 1 إلى 5) - - احتمالية الحدوث (على مقياس من 1 إلى 5) - - التأثير المحتمل - - إجراءات مقترحة للتخفيف + # حساب العائد على الاستثمار + roi = (profit / total_cost) * 100 - يرجى أيضًا تقديم خطة شاملة للتخفيف من المخاطر العالية. + # حساب كفاءة التكلفة + reference_cost_ratio = self.reference_data.get("avg_cost_revenue_ratio", 85) + cost_efficiency = ((reference_cost_ratio - cost_revenue_ratio) / reference_cost_ratio) * 100 - قدم الإجابة بتنسيق JSON التالي: - { - "all_risks": [ - {"risk": "وصف المخاطرة", "type": "نوع المخاطرة", "severity": 4, "probability": 3, "impact": "تأثير المخاطرة", "mitigation": "إجراءات التخفيف"} - ], - "technical_risks": [...], - "financial_risks": [...], - "supply_chain_risks": [...], - "legal_risks": [...], - "mitigation_plan": ["خطوة 1", "خطوة 2"], - "avg_severity": 3.5 + return { + "profit_margin": profit_margin, + "cost_revenue_ratio": cost_revenue_ratio, + "breakeven_point": breakeven_point, + "roi": roi, + "cost_efficiency": cost_efficiency } - """ - - if context: - prompt += "\n\nمعلومات سياقية إضافية:\n" - for key, value in context.items(): - prompt += f"{key}: {value}\n" - - return prompt - def _prepare_summary_prompt(self, extracted_data: Dict[str, Any], analysis_results: Dict[str, Any]) -> str: - """ - إعداد استعلام لإعداد ملخص شامل + def _analyze_cost_deviations(self, costs: Dict[str, Any]) -> List[Dict[str, Any]]: """ - prompt = """ - أنت مستشار استراتيجي في مجال المناقصات والمشاريع. يرجى إعداد ملخص تنفيذي شامل وتوصيات استراتيجية بناءً على البيانات والتحليلات التالية. + تحليل انحرافات التكاليف - البيانات المستخرجة: + المعاملات: + ---------- + costs : Dict[str, Any] + بيانات التكاليف + + المخرجات: + -------- + List[Dict[str, Any]] + قائمة انحرافات التكاليف """ + deviations = [] - # إضافة البيانات المستخرجة - for key, value in extracted_data.items(): - if isinstance(value, dict) or isinstance(value, list): - prompt += f"\n{key}: {json.dumps(value, ensure_ascii=False)[:500]}..." - else: - prompt += f"\n{key}: {str(value)[:500]}..." - - prompt += "\n\nنتائج التحليلات:" + # استخراج التكاليف الفعلية والمخططة + actual_costs = costs.get("actual", {}) + planned_costs = costs.get("planned", {}) - # إضافة نتائج التحليلات - for key, value in analysis_results.items(): - if isinstance(value, dict) or isinstance(value, list): - prompt += f"\n{key}: {json.dumps(value, ensure_ascii=False)[:500]}..." - else: - prompt += f"\n{key}: {str(value)[:500]}..." - - prompt += """ - - يرجى إعداد: - 1. ملخص تنفيذي شامل للمناقصة - 2. توصيات استراتيجية للتقدم للمناقصة - 3. استراتيجية تقديم العطاء، بما في ذلك النقاط التنافسية والأسعار المقترحة - 4. خطة عمل لتحسين فرص الفوز بالمناقصة - - قدم الإجابة بتنسيق JSON التالي: - { - "executive_summary": "ملخص تنفيذي شامل للمناقصة", - "recommendations": ["توصية 1", "توصية 2", "توصية 3"], - "bid_strategy": { - "competitive_points": ["نقطة 1", "نقطة 2"], - "proposed_price": "السعر المقترح", - "profit_margin": "هامش الربح المقترح" - }, - "action_plan": ["خطوة 1", "خطوة 2", "خطوة 3"] - } - """ + if not actual_costs or not planned_costs: + return deviations - return prompt - - def _parse_requirements_response(self, response: str) -> Dict[str, Any]: - """ - تحليل استجابة المتطلبات من النموذج - """ - try: - # البحث عن كتلة JSON في الاستجابة - json_match = re.search(r'```json\s*(.*?)```', response, re.DOTALL) - if not json_match: - json_match = re.search(r'{.*}', response, re.DOTALL) - - if json_match: - json_str = json_match.group(1) if json_match.groups() else json_match.group(0) - return json.loads(json_str) - else: - logger.warning("لم يتم العثور على JSON في استجابة تحليل المتطلبات. استخدام قيم افتراضية.") - return { - "summary": "لم يتم تحليل المتطلبات بشكل صحيح", - "technical": [], - "financial": [], - "legal": [], - "local_content": [], - "total_count": 0, - "mandatory_count": 0, - "avg_difficulty": 0, - "local_content_percentage": 0 - } - except json.JSONDecodeError as e: - logger.error(f"خطأ في تحليل استجابة JSON للمتطلبات: {str(e)}") - return { - "summary": "حدث خطأ في تحليل استجابة JSON", - "error": str(e), - "raw_response": response[:1000] + "..." if len(response) > 1000 else response, - "technical": [], - "financial": [], - "legal": [], - "local_content": [], - "total_count": 0, - "mandatory_count": 0, - "avg_difficulty": 0, - "local_content_percentage": 0 - } - - def _parse_local_content_response(self, response: str) -> Dict[str, Any]: - """ - تحليل استجابة المحتوى المحلي من النموذج - """ - try: - # البحث عن كتلة JSON في الاستجابة - json_match = re.search(r'```json\s*(.*?)```', response, re.DOTALL) - if not json_match: - json_match = re.search(r'{.*}', response, re.DOTALL) - - if json_match: - json_str = json_match.group(1) if json_match.groups() else json_match.group(0) - return json.loads(json_str) - else: - logger.warning("لم يتم العثور على JSON في استجابة تحليل المحتوى المحلي. استخدام قيم افتراضية.") - return { - "estimated_local_content": 0, - "required_local_content": 0, - "required_materials": [], - "improvement_strategies": [] - } - except json.JSONDecodeError as e: - logger.error(f"خطأ في تحليل استجابة JSON للمحتوى المحلي: {str(e)}") - return { - "estimated_local_content": 0, - "required_local_content": 0, - "required_materials": [], - "improvement_strategies": [], - "error": str(e), - "raw_response": response[:1000] + "..." if len(response) > 1000 else response - } - - def _parse_supply_chain_response(self, response: str) -> Dict[str, Any]: - """ - تحليل استجابة سلسلة الإمداد من النموذج - """ - try: - # البحث عن كتلة JSON في الاستجابة - json_match = re.search(r'```json\s*(.*?)```', response, re.DOTALL) - if not json_match: - json_match = re.search(r'{.*}', response, re.DOTALL) - - if json_match: - json_str = json_match.group(1) if json_match.groups() else json_match.group(0) - return json.loads(json_str) - else: - logger.warning("لم يتم العثور على JSON في استجابة تحليل سلسلة الإمداد. استخدام قيم افتراضية.") - return { - "supply_chain_risks": [], - "optimizations": [], - "local_suppliers_availability": 0, - "procurement_strategy": {} - } - except json.JSONDecodeError as e: - logger.error(f"خطأ في تحليل استجابة JSON لسلسلة الإمداد: {str(e)}") - return { - "supply_chain_risks": [], - "optimizations": [], - "local_suppliers_availability": 0, - "procurement_strategy": {}, - "error": str(e), - "raw_response": response[:1000] + "..." if len(response) > 1000 else response - } - - def _parse_risk_response(self, response: str) -> Dict[str, Any]: - """ - تحليل استجابة تحليل المخاطر من النموذج - """ - try: - # البحث عن كتلة JSON في الاستجابة - json_match = re.search(r'```json\s*(.*?)```', response, re.DOTALL) - if not json_match: - json_match = re.search(r'{.*}', response, re.DOTALL) - - if json_match: - json_str = json_match.group(1) if json_match.groups() else json_match.group(0) - return json.loads(json_str) - else: - logger.warning("لم يتم العثور على JSON في استجابة تحليل المخاطر. استخدام قيم افتراضية.") - return { - "all_risks": [], - "technical_risks": [], - "financial_risks": [], - "supply_chain_risks": [], - "legal_risks": [], - "mitigation_plan": [], - "avg_severity": 0 - } - except json.JSONDecodeError as e: - logger.error(f"خطأ في تحليل استجابة JSON للمخاطر: {str(e)}") - return { - "all_risks": [], - "technical_risks": [], - "financial_risks": [], - "supply_chain_risks": [], - "legal_risks": [], - "mitigation_plan": [], - "avg_severity": 0, - "error": str(e), - "raw_response": response[:1000] + "..." if len(response) > 1000 else response - } - - def _parse_summary_response(self, response: str) -> Dict[str, Any]: - """ - تحليل استجابة الملخص من النموذج - """ - try: - # البحث عن كتلة JSON في الاستجابة - json_match = re.search(r'```json\s*(.*?)```', response, re.DOTALL) - if not json_match: - json_match = re.search(r'{.*}', response, re.DOTALL) - - if json_match: - json_str = json_match.group(1) if json_match.groups() else json_match.group(0) - return json.loads(json_str) - else: - logger.warning("لم يتم العثور على JSON في استجابة الملخص. استخدام قيم افتراضية.") - return { - "executive_summary": "لم يتم إعداد الملخص بشكل صحيح", - "recommendations": [], - "bid_strategy": {}, - "action_plan": [] - } - except json.JSONDecodeError as e: - logger.error(f"خطأ في تحليل استجابة JSON للملخص: {str(e)}") - return { - "executive_summary": "حدث خطأ في تحليل استجابة JSON", - "recommendations": [], - "bid_strategy": {}, - "action_plan": [], - "error": str(e), - "raw_response": response[:1000] + "..." if len(response) > 1000 else response - } + # تحليل الانحرافات لكل فئة + for category in set(list(actual_costs.keys()) + list(planned_costs.keys())): + actual = actual_costs.get(category, 0) + planned = planned_costs.get(category, 0) + + if planned > 0: + deviation_amount = actual - planned + deviation_percentage = (deviation_amount / planned) * 100 + + deviations.append({ + "category": category, + "planned": planned, + "actual": actual, + "deviation_amount": deviation_amount, + "deviation_percentage": deviation_percentage, + "status": "تجاوز" if deviation_amount > 0 else "وفر" if deviation_amount < 0 else "مطابق" + }) + + # ترتيب الانحرافات حسب القيمة المطلقة للانحراف + return sorted(deviations, key=lambda x: abs(x["deviation_amount"]), reverse=True) - def _create_search_query(self, data) -> str: + def _identify_cost_strengths_weaknesses(self, cost_structure: Dict[str, Any], + financial_indicators: Dict[str, Any]) -> Dict[str, List[str]]: """ - إنشاء استعلام بحث من البيانات + تحديد نقاط القوة والضعف في التكاليف + + المعاملات: + ---------- + cost_structure : Dict[str, Any] + هيكل التكاليف + financial_indicators : Dict[str, Any] + المؤشرات المالية + + المخرجات: + -------- + Dict[str, List[str]] + نقاط القوة والضعف """ - if isinstance(data, list): - # إذا كانت البيانات قائمة، جمع النصوص من العناصر - texts = [] - for item in data: - if isinstance(item, dict): - texts.extend([str(v) for k, v in item.items() if isinstance(v, (str, int, float))]) - else: - texts.append(str(item)) - return " ".join(texts[:20]) # استخدام أول 20 عنصر فقط + strengths = [] + weaknesses = [] - elif isinstance(data, dict): - # إذا كانت البيانات قاموس، جمع القيم النصية - texts = [str(v) for k, v in data.items() if isinstance(v, (str, int, float))] - return " ".join(texts[:20]) # استخدام أول 20 عنصر فقط + # تحليل هيكل التكاليف + if cost_structure["direct_percentage"] < self.reference_data.get("avg_direct_percentage", 75): + strengths.append("نسبة التكاليف المباشرة أقل من المتوسط، مما يشير إلى كفاءة في إدارة الموارد المباشرة.") + else: + weaknesses.append("ارتفاع نسبة التكاليف المباشرة عن المتوسط، مما قد يشير إلى الحاجة لتحسين إدارة الموارد المباشرة.") + # تحليل المؤشرات المالية + if financial_indicators["profit_margin"] > self.reference_data.get("avg_profit_margin", 15): + strengths.append("هامش ربح أعلى من المتوسط، مما يدل على كفاءة التسعير وإدارة التكاليف.") else: - # إذا كانت البيانات نص أو قيمة أخرى - return str(data) - - def _format_retrieval_results(self, results: List[Dict[str, Any]]) -> str: - """ - تنسيق نتائج الاسترجاع من قاعدة البيانات - """ - if not results: - return "لا توجد نتائج مسترجعة." - - formatted = "" - for i, result in enumerate(results): - formatted += f"\n{i+1}. " - if 'title' in result: - formatted += f"{result['title']}: " - if 'content' in result: - content = result['content'] - # اقتصار المحتوى إذا كان طويلاً - if len(content) > 300: - content = content[:300] + "..." - formatted += content - if 'metadata' in result and isinstance(result['metadata'], dict): - formatted += f"\n معلومات إضافية: {', '.join([f'{k}: {v}' for k, v in result['metadata'].items() if k != 'text'])}" - formatted += "\n" - - return formatted + weaknesses.append("انخفاض هامش الربح عن المتوسط، مما يستدعي مراجعة استراتيجية التسعير وضبط التكاليف.") + + if financial_indicators["cost_revenue_ratio"] < self.reference_data.get("avg_cost_revenue_ratio", 85): + strengths.append("نسبة التكلفة إلى الإيرادات أقل من المتوسط، مما يشير إلى كفاءة في إدارة التكاليف.") + else: + weaknesses.append("ارتفاع نسبة التكلفة إلى الإيرادات عن المتوسط، مما يستدعي ضبط التكاليف.") + + if financial_indicators["roi"] > self.reference_data.get("avg_roi", 20): + strengths.append("العائد على الاستثمار أعلى من المتوسط، مما يدل على كفاءة استغلال الموارد المالية.") + else: + weaknesses.append("انخفاض العائد على الاستثمار عن المتوسط، مما يستدعي مراجعة توزيع الموارد المالية.") + + return { + "strengths": strengths, + "weaknesses": weaknesses + } - def _extract_keywords(self, text: str, top_n: int = 20) -> List[str]: + def _generate_cost_recommendations(self, strengths_weaknesses: Dict[str, List[str]], + deviations: List[Dict[str, Any]]) -> List[str]: """ - استخراج الكلمات المفتاحية من النص + إعداد توصيات لتحسين التكاليف المعاملات: ---------- - text : str - النص المراد استخراج الكلمات المفتاحية منه - top_n : int, optional - عدد الكلمات المفتاحية المراد استخراجها + strengths_weaknesses : Dict[str, List[str]] + نقاط القوة والضعف + deviations : List[Dict[str, Any]] + انحرافات التكاليف المخرجات: -------- List[str] - قائمة بالكلمات المفتاحية + قائمة التوصيات """ - # قائمة الكلمات الغير مهمة (stop words) باللغة العربية - arabic_stop_words = ['من', 'الى', 'إلى', 'عن', 'على', 'في', 'و', 'ا', 'ان', 'أن', 'لا', 'ما', 'هذا', 'هذه', 'ذلك', 'تلك', 'هناك', 'هنالك', 'هو', 'هي', 'هم'] + recommendations = [] + + # توصيات بناءً على نقاط الضعف + for weakness in strengths_weaknesses.get("weaknesses", []): + if "هامش الربح" in weakness: + recommendations.append("مراجعة استراتيجية التسعير وتحديد فرص زيادة الإيرادات.") + + if "التكاليف المباشرة" in weakness: + recommendations.append("تحسين إدارة الموارد المباشرة وتقليل الهدر في المواد والعمالة.") + + if "نسبة التكلفة إلى الإيرادات" in weakness: + recommendations.append("تطبيق استراتيجيات ضبط التكاليف وتحسين كفاءة العمليات.") + + if "العائد على الاستثمار" in weakness: + recommendations.append("إعادة توزيع الموارد المالية وتوجيهها نحو الأنشطة ذات العائد الأعلى.") - # تنظيف النص وتقسيمه إلى كلمات - words = re.findall(r'[\u0600-\u06FF]+|[a-zA-Z]+', text) + # توصيات بناءً على انحرافات التكاليف + high_deviations = [dev for dev in deviations if dev["status"] == "تجاوز" and dev["deviation_percentage"] > 10] - # تحويل الكلمات إلى أحرف صغيرة وإزالة الكلمات الغير مهمة - filtered_words = [word.lower() for word in words if word.lower() not in arabic_stop_words and len(word) > 2] + if high_deviations: + recommendations.append("معالجة فورية للتجاوزات الكبيرة في التكاليف، خاصة في الفئات التالية:") + for i, deviation in enumerate(high_deviations[:3]): + recommendations.append(f" - {deviation['category']}: تجاوز بنسبة {deviation['deviation_percentage']:.1f}%") - # حساب تكرار الكلمات - word_counts = {} - for word in filtered_words: - word_counts[word] = word_counts.get(word, 0) + 1 + # توصيات عامة + recommendations.extend([ + "تنفيذ نظام متكامل لمراقبة التكاليف ومتابعتها بشكل دوري.", + "تحليل سلسلة القيمة لتحديد الأنشطة غير الضرورية وتقليل تكاليفها.", + "تطبيق مبدأ التحسين المستمر في إدارة التكاليف ومراجعتها دوريًا." + ]) - # ترتيب الكلمات حسب التكرار والحصول على أهم الكلمات - sorted_words = [word for word, count in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)] + return recommendations + + def _analyze_financial_risks(self, risks: List[Dict[str, Any]], costs: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + تحليل المخاطر المالية - return sorted_words[:top_n] + المعاملات: + ---------- + risks : List[Dict[str, Any]] + قائمة المخاطر + costs : Dict[str, Any] + بيانات التكاليف + + المخرجات: + -------- + List[Dict[str, Any]] + قائمة المخاطر المالية + """ + financial_risks = [] + + # فلترة المخاطر المالية + for risk in risks: + if risk.get("type") == "financial" or risk.get("category") == "financial": + financial_risks.append(risk) + + # إذا لم تكن هناك مخاطر مالية محددة، قم بإنشاء مخاطر افتراضية + if not financial_risks: + # حساب إجمالي التكاليف + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) + total_cost = sum(direct_costs.values()) + sum(indirect_costs.values()) + + # إنشاء مخاطر افتراضية + financial_risks = [ + { + "name": "ارتفاع أسعار المواد", + "type": "financial", + "probability": 0.4, + "impact": 0.3, + "risk_score": 0.12, + "financial_impact": total_cost * 0.05, + "description": "ارتفاع غير متوقع في أسعار المواد الأساسية", + "mitigation": "تأمين عقود توريد بأسعار ثابتة لفترات طويلة" + }, + { + "name": "تأخر الدفعات", + "type": "financial", + "probability": 0.3, + "impact": 0.4, + "risk_score": 0.12, + "financial_impact": total_cost * 0.03, + "description": "تأخر في استلام الدفعات من العميل", + "mitigation": "وضع شروط دفع واضحة في العقد وتوفير تمويل احتياطي" + }, + { + "name": "تقلبات أسعار العملات", + "type": "financial", + "probability": 0.25, + "impact": 0.25, + "risk_score": 0.0625, + "financial_impact": total_cost * 0.02, + "description": "تقلبات في أسعار صرف العملات للمشتريات الدولية", + "mitigation": "استخدام آليات التحوط وتفضيل المشتريات المحلية" + } + ] + + # إثراء بيانات المخاطر المالية + for risk in financial_risks: + # حساب درجة المخاطرة إذا لم تكن موجودة + if "risk_score" not in risk: + probability = risk.get("probability", 0.5) + impact = risk.get("impact", 0.5) + risk["risk_score"] = probability * impact + + # تقدير التأثير المالي إذا لم يكن موجودًا + if "financial_impact" not in risk: + # حساب إجمالي التكاليف + direct_costs = costs.get("direct", {}) + indirect_costs = costs.get("indirect", {}) + total_cost = sum(direct_costs.values()) + sum(indirect_costs.values()) + + risk["financial_impact"] = total_cost * risk["impact"] * 0.1 + + # إضافة استراتيجية تخفيف إذا لم تكن موجودة + if "mitigation" not in risk: + risk["mitigation"] = self._generate_default_mitigation(risk) + + # ترتيب المخاطر حسب درجة المخاطرة + return sorted(financial_risks, key=lambda x: x.get("risk_score", 0), reverse=True) - def _split_text_into_chunks(self, text: str, max_length: int = 4000) -> List[str]: + def _analyze_cost_overrun_risks(self, costs: Dict[str, Any]) -> List[Dict[str, Any]]: """ - تقسيم النص إلى أجزاء أصغر لمعالجتها بشكل منفصل + تحليل مخاطر تجاوز التكاليف المعاملات: ---------- - text : str - النص المراد تقسيمه - max_length : int, optional - الحد الأقصى لطول كل جزء + costs : Dict[str, Any] + بيانات التكاليف المخرجات: -------- - List[str] - قائمة بأجزاء النص + List[Dict[str, Any]] + قائمة مخاطر تجاوز التكاليف """ - # إذا كان النص قصيراً، إرجاعه كما هو - if len(text) <= max_length: - return [text] + # استخراج التكاليف الفعلية والمخططة + actual_costs = costs.get("actual", {}) + planned_costs = costs.get("planned", {}) - # تقسيم النص إلى فقرات - paragraphs = text.split('\n') + overrun_risks = [] - chunks = [] - current_chunk = "" + if not actual_costs or not planned_costs: + return overrun_risks - for paragraph in paragraphs: - # إذا كانت إضافة الفقرة ستجعل الجزء الحالي أطول من الحد الأقصى - if len(current_chunk) + len(paragraph) > max_length: - # إذا كان الجزء الحالي غير فارغ، إضافته إلى القائمة - if current_chunk: - chunks.append(current_chunk) + # تحليل الانحرافات لكل فئة وتحديد المخاطر + for category in set(list(actual_costs.keys()) + list(planned_costs.keys())): + actual = actual_costs.get(category, 0) + planned = planned_costs.get(category, 0) + + if planned > 0: + deviation_percentage = ((actual - planned) / planned) * 100 - # إذا كانت الفقرة نفسها أطول من الحد الأقصى، تقسيمها - if len(paragraph) > max_length: - # تقسيم الفقرة إلى جمل - sentences = re.split(r'(?<=[.!?])\s+', paragraph) + # إذا كان هناك تجاوز، أضف المخاطرة + if deviation_percentage > 5: + risk_level = "مرتفع" if deviation_percentage > 20 else "متوسط" if deviation_percentage > 10 else "منخفض" - current_chunk = "" - for sentence in sentences: - if len(current_chunk) + len(sentence) > max_length: - if current_chunk: - chunks.append(current_chunk) - - # إذا كانت الجملة نفسها أطول من الحد الأقصى، تقسيمها - if len(sentence) > max_length: - # تقسيم الجملة إلى أجزاء بطول الحد الأقصى - for i in range(0, len(sentence), max_length): - chunks.append(sentence[i:i+max_length]) - else: - current_chunk = sentence - else: - current_chunk += " " + sentence if current_chunk else sentence - else: - current_chunk = paragraph - else: - current_chunk += "\n" + paragraph if current_chunk else paragraph - - # إضافة الجزء الأخير إذا لم يكن فارغاً - if current_chunk: - chunks.append(current_chunk) - - return chunks \ No newline at end of file + overrun_risks.append({ + "category": category, + "planned": planned, + "actual": actual, + "deviation_percentage": deviation_percentage, \ No newline at end of file