""" file_analyzer.py - الإصدار المحسن والمصحح وصف: خدمة ويب بسيطة تستقبل تحميل ملفات (MS Office: .docx, .xlsx, .pptx), PDF, .txt, .json, .xml ثم تقوم بتحليلها وإرجاع نتيجة تحليل JSON تحتوي على: نوع الملف، حجم، نص مستخرج، إحصاءات بسيطة (عدد الكلمات، الكلمات الشائعة)، وملاحظات خاصة بكل نوع (عدد الصفحات للـ PDF، أوراق العمل في Excel ..) المتطلبات (ثبتها قبل التشغيل): pip install flask werkzeug python-docx openpyxl python-pptx pypdf2 langdetect chardet ملاحظات: - PyPDF2 يستخدم لاستخراج نص من PDF (بسيط). لنتائج أفضل مع PDF الممسوحة ضوئياً استخدم OCR مثل pytesseract (لم يُدرج هنا). - هذا سكربت تعليمي — تأكد من تشغيله في بيئة آمنة عند استقبال ملفات من الخارج. تشغيل: python file_analyzer.py ثم افتح http://127.0.0.1:4580 وادخل إلى endpoint /upload (POST) لرفع ملف (form field name = file) """ from flask import Flask, request, jsonify from werkzeug.utils import secure_filename import os import io import json import xml.etree.ElementTree as ET from collections import Counter import chardet import re import zipfile from datetime import datetime import hashlib # استيراد المكتبات مع معالجة الأخطاء try: from docx import Document DOCX_AVAILABLE = True except ImportError: DOCX_AVAILABLE = False print("⚠️ تحذير: python-docx غير مثبت - لن يتم دعم ملفات Word") try: from openpyxl import load_workbook EXCEL_AVAILABLE = True except ImportError: EXCEL_AVAILABLE = False print("⚠️ تحذير: openpyxl غير مثبت - لن يتم دعم ملفات Excel") try: from pptx import Presentation PPTX_AVAILABLE = True except ImportError: PPTX_AVAILABLE = False print("⚠️ تحذير: python-pptx غير مثبت - لن يتم دعم ملفات PowerPoint") try: from PyPDF2 import PdfReader PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ تحذير: PyPDF2 غير مثبت - لن يتم دعم ملفات PDF") try: from langdetect import detect, DetectorFactory DetectorFactory.seed = 0 # ثابت للنتائج المتوقعة في الكشف عن اللغة LANGDETECT_AVAILABLE = True except ImportError: LANGDETECT_AVAILABLE = False print("⚠️ تحذير: langdetect غير مثبت - لن يتم دعم كشف اللغة") # تحديد الصيغ المدعومة بناءً على المكتبات المتوفرة ALLOWED_EXTENSIONS = set(['txt', 'json', 'xml']) if DOCX_AVAILABLE: ALLOWED_EXTENSIONS.add('docx') if EXCEL_AVAILABLE: ALLOWED_EXTENSIONS.add('xlsx') if PPTX_AVAILABLE: ALLOWED_EXTENSIONS.add('pptx') if PDF_AVAILABLE: ALLOWED_EXTENSIONS.add('pdf') UPLOAD_FOLDER = 'uploads' MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB MAX_MEMORY_CHUNK = 50 * 1024 * 1024 # 50MB للقراءة التدريجية os.makedirs(UPLOAD_FOLDER, exist_ok=True) app = Flask(__name__) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE # إعداد سجل الأخطاء import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def allowed_file(filename): """التحقق من أن امتداد الملف مسموح به""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def safe_read_file(file_stream, max_size=MAX_MEMORY_CHUNK): """قراءة الملف بشكل آمن مع تحديد حد أقصى للحجم""" content = b'' bytes_read = 0 while bytes_read < max_size: chunk = file_stream.read(8192) # قراءة قطع صغيرة if not chunk: break content += chunk bytes_read += len(chunk) if bytes_read >= max_size: logger.warning(f"File exceeded memory limit, truncated to {max_size} bytes") break return content def check_zip_bomb(file_path, max_ratio=100, max_files=10000): """الكشف عن هجمات Zip Bomb للملفات المضغوطة (docx, xlsx, pptx)""" if file_path.endswith(('.docx', '.xlsx', '.pptx')): try: with zipfile.ZipFile(file_path, 'r') as zf: total_size = 0 uncompressed_size = 0 file_count = 0 for info in zf.infolist(): file_count += 1 uncompressed_size += info.file_size total_size += info.compress_size if file_count > max_files: raise ValueError("Too many files in archive - possible zip bomb") if total_size > 0 and uncompressed_size / total_size > max_ratio: raise ValueError("Compression ratio too high - possible zip bomb") except Exception as e: logger.error(f"Zip bomb check failed: {str(e)}") raise def get_text_stats(text): """تحليل النص وإرجاع إحصاءات مفصلة""" # تنظيف نصي متقدم words = [w for w in ''.join(ch if (ch.isalnum() or ch.isspace()) else ' ' for ch in text).split() if w] total_words = len(words) total_chars = len(text) total_chars_no_spaces = len(text.replace(' ', '')) counter = Counter(w.lower() for w in words) top_words = counter.most_common(20) # كشف اللغة language = None try: if LANGDETECT_AVAILABLE and total_words >= 3: language = detect(' '.join(words[:1000])) except Exception: language = None # إحصاءات إضافية paragraphs = [p for p in text.split('\n') if p.strip()] sentences = [s for s in re.split(r'[.!?]+', text) if s.strip()] # تحليل التواريخ date_patterns = [ r'\d{1,2}/\d{1,2}/\d{4}', r'\d{4}-\d{2}-\d{2}', r'\d{1,2}-\d{1,2}-\d{4}' ] dates_found = [] for pattern in date_patterns: dates_found.extend(re.findall(pattern, text)) # تحليل عناوين البريد الإلكتروني emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text) # تحليل الأرقام الهاتفية phones = re.findall(r'[\+]?[1-9]?[0-9]{7,14}', text) return { 'total_words': total_words, 'total_characters': total_chars, 'total_characters_no_spaces': total_chars_no_spaces, 'paragraphs_count': len(paragraphs), 'sentences_count': len(sentences), 'top_words': top_words, 'language': language, 'dates_found': dates_found[:10], # أول 10 تواريخ فقط 'emails_found': emails[:10], # أول 10 عناوين بريد فقط 'phones_found': phones[:10], # أول 10 أرقام فقط 'estimated_reading_minutes': max(1, total_words // 200), # افتراض 200 كلمة/دقيقة 'unique_words_count': len(set(words)), 'average_word_length': sum(len(word) for word in words) / len(words) if words else 0 } def extract_metadata(file_path): """استخراج البيانات الوصفية للملف""" try: stat = os.stat(file_path) return { 'file_hash_md5': calculate_file_hash(file_path), 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'file_size_mb': round(stat.st_size / (1024 * 1024), 2), 'file_size_bytes': stat.st_size } except Exception as e: logger.error(f"Metadata extraction failed: {str(e)}") return {} def calculate_file_hash(file_path): """حساب بصمة الملف""" hasher = hashlib.md5() try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return hasher.hexdigest() except Exception as e: logger.error(f"Hash calculation failed: {str(e)}") return "error_calculating_hash" def analyze_txt(file_stream): """تحليل الملفات النصية""" raw = safe_read_file(file_stream) # كشف الترميز try: enc = chardet.detect(raw)['encoding'] or 'utf-8' text = raw.decode(enc, errors='replace') except Exception as e: logger.warning(f"Encoding detection failed, using utf-8: {str(e)}") text = raw.decode('utf-8', errors='replace') stats = get_text_stats(text) return { 'text_preview': text[:4580], # عرض أول 4580 حرف فقط 'stats': stats, 'metadata': {'encoding': enc}, 'analysis_type': 'text' } def analyze_json(file_stream): """تحليل ملفات JSON""" raw = safe_read_file(file_stream) try: text_content = raw.decode('utf-8', errors='replace') obj = json.loads(text_content) except Exception as e: return {'error': 'invalid_json', 'exception': str(e), 'analysis_type': 'json'} # استخراج النص من الهيكل def extract_strings(o): if isinstance(o, str): return [o] if isinstance(o, dict): res = [] for k, v in o.items(): res.append(k) # إضافة المفاتيح أيضاً res += extract_strings(v) return res if isinstance(o, list): res = [] for v in o: res += extract_strings(v) return res return [] all_text = ' '.join(extract_strings(obj)) stats = get_text_stats(all_text) return { 'json_preview': obj if isinstance(obj, (dict, list)) and len(str(obj)) < 10000 else str(type(obj)), 'structure_type': type(obj).__name__, 'stats': stats, 'analysis_type': 'json' } def analyze_xml(file_stream): """تحليل ملفات XML بشكل آمن""" raw = safe_read_file(file_stream) try: # منع هجمات XXE parser = ET.XMLParser(resolve_entities=False, no_network=True) root = ET.fromstring(raw, parser=parser) except Exception as e: return {'error': 'invalid_xml', 'exception': str(e), 'analysis_type': 'xml'} # استخراج نص من العناصر texts = [] for elem in root.iter(): if elem.text and elem.text.strip(): texts.append(elem.text.strip()) if elem.tail and elem.tail.strip(): texts.append(elem.tail.strip()) all_text = ' '.join(texts) stats = get_text_stats(all_text) return { 'root_tag': root.tag, 'stats': stats, 'elements_count': len(list(root.iter())), 'text_elements_count': len(texts), 'analysis_type': 'xml' } def analyze_docx(path_or_stream): """تحليل ملفات Word""" if not DOCX_AVAILABLE: return {'error': 'docx_support_not_available', 'analysis_type': 'docx'} try: doc = Document(path_or_stream) texts = [] # الفقرات for p in doc.paragraphs: if p.text and p.text.strip(): texts.append(p.text.strip()) # الجداول for table in doc.tables: for row in table.rows: for cell in row.cells: if cell.text and cell.text.strip(): texts.append(cell.text.strip()) # الرؤوس والتذييلات for section in doc.sections: if section.header: for p in section.header.paragraphs: if p.text and p.text.strip(): texts.append(p.text.strip()) if section.footer: for p in section.footer.paragraphs: if p.text and p.text.strip(): texts.append(p.text.strip()) all_text = '\n'.join(texts) stats = get_text_stats(all_text) return { 'stats': stats, 'text_preview': all_text[:10000], 'paragraphs_count': len(doc.paragraphs), 'tables_count': len(doc.tables), 'sections_count': len(doc.sections), 'analysis_type': 'docx' } except Exception as e: return {'error': 'cannot_read_docx', 'exception': str(e), 'analysis_type': 'docx'} def analyze_xlsx(path_or_stream): """تحليل ملفات Excel""" if not EXCEL_AVAILABLE: return {'error': 'xlsx_support_not_available', 'analysis_type': 'xlsx'} try: wb = load_workbook( filename=path_or_stream, read_only=True, data_only=True, keep_vba=False # منع تنفيذ الماكرو ) sheets = wb.sheetnames sheet_summaries = {} all_data = [] for name in sheets: ws = wb[name] rows = [] count = 0 for row in ws.iter_rows(values_only=True, max_row=200): # أول 200 صف فقط row_data = [str(c) if c is not None else '' for c in row] rows.append(row_data) all_data.extend([str(c) for c in row if c is not None]) count += 1 if count >= 200: break sheet_summaries[name] = { 'sample_rows': rows[:10], # أول 10 صفوف فقط للعرض 'sample_row_count': len(rows), 'max_column': ws.max_column, 'max_row': ws.max_row } # تحليل النص المجمع all_text = ' '.join(all_data) stats = get_text_stats(all_text) return { 'sheets': sheets, 'sheet_summaries': sheet_summaries, 'stats': stats, 'analysis_type': 'xlsx' } except Exception as e: return {'error': 'cannot_read_xlsx', 'exception': str(e), 'analysis_type': 'xlsx'} def analyze_pptx(path_or_stream): """تحليل ملفات PowerPoint""" if not PPTX_AVAILABLE: return {'error': 'pptx_support_not_available', 'analysis_type': 'pptx'} try: prs = Presentation(path_or_stream) slides = [] all_texts = [] for i, slide in enumerate(prs.slides): texts = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text and shape.text.strip(): text_content = shape.text.strip() texts.append(text_content) all_texts.append(text_content) slides.append({ 'slide_number': i + 1, 'slide_text': '\n'.join(texts), 'shapes_count': len([s for s in slide.shapes if hasattr(s, 'text') and s.text]) }) combined = '\n'.join(all_texts) stats = get_text_stats(combined) return { 'num_slides': len(slides), 'stats': stats, 'slides_preview': slides[:5], # أول 5 شرائح فقط للعرض 'total_shapes': sum(s['shapes_count'] for s in slides), 'analysis_type': 'pptx' } except Exception as e: return {'error': 'cannot_read_pptx', 'exception': str(e), 'analysis_type': 'pptx'} def analyze_pdf(path_or_stream): """تحليل ملفات PDF""" if not PDF_AVAILABLE: return {'error': 'pdf_support_not_available', 'analysis_type': 'pdf'} try: reader = PdfReader(path_or_stream) texts = [] metadata = {} # استخراج البيانات الوصفية if reader.metadata: metadata = { 'title': reader.metadata.get('/Title', ''), 'author': reader.metadata.get('/Author', ''), 'subject': reader.metadata.get('/Subject', ''), 'creator': reader.metadata.get('/Creator', ''), 'producer': reader.metadata.get('/Producer', ''), 'creation_date': reader.metadata.get('/CreationDate', '') } # استخراج النص من الصفحات for i, page in enumerate(reader.pages): try: page_text = page.extract_text() or '' texts.append(page_text) except Exception as e: logger.warning(f"Failed to extract text from PDF page {i+1}: {str(e)}") texts.append('') all_text = '\n'.join(texts) stats = get_text_stats(all_text) return { 'num_pages': len(reader.pages), 'stats': stats, 'text_preview': all_text[:10000], 'pdf_metadata': metadata, 'encrypted': reader.is_encrypted, 'analysis_type': 'pdf' } except Exception as e: return {'error': 'cannot_read_pdf', 'exception': str(e), 'analysis_type': 'pdf'} @app.route('/') def index(): """الصفحة الرئيسية""" supported_formats = ", ".join(sorted(ALLOWED_EXTENSIONS)) return f"""
Supported formats: {supported_formats}
Max file size: 200MB