""" file_analyzer.py - الإصدار المحسن والمصحح وصف: خدمة ويب بسيطة تستقبل تحميل ملفات (MS Office: .docx, .xlsx, .pptx), PDF, .txt, .json, .xml ثم تقوم بتحليلها وإرجاع نتيجة تحليل JSON تحتوي على: نوع الملف، حجم، نص مستخرج، إحصاءات بسيطة (عدد الكلمات، الكلمات الشائعة)، وملاحظات خاصة بكل نوع (عدد الصفحات للـ PDF، أوراق العمل في Excel ..) المتطلبات (ثبتها قبل التشغيل): pip install flask werkzeug python-docx openpyxl python-pptx pypdf2 langdetect chardet ملاحظات: - PyPDF2 يستخدم لاستخراج نص من PDF (بسيط). لنتائج أفضل مع PDF الممسوحة ضوئياً استخدم OCR مثل pytesseract (لم يُدرج هنا). - هذا سكربت تعليمي — تأكد من تشغيله في بيئة آمنة عند استقبال ملفات من الخارج. تشغيل: python file_analyzer.py ثم افتح http://127.0.0.1:4580 وادخل إلى endpoint /upload (POST) لرفع ملف (form field name = file) """ from flask import Flask, request, jsonify from werkzeug.utils import secure_filename import os import io import json import xml.etree.ElementTree as ET from collections import Counter import chardet import re import zipfile from datetime import datetime import hashlib # استيراد المكتبات مع معالجة الأخطاء try: from docx import Document DOCX_AVAILABLE = True except ImportError: DOCX_AVAILABLE = False print("⚠️ تحذير: python-docx غير مثبت - لن يتم دعم ملفات Word") try: from openpyxl import load_workbook EXCEL_AVAILABLE = True except ImportError: EXCEL_AVAILABLE = False print("⚠️ تحذير: openpyxl غير مثبت - لن يتم دعم ملفات Excel") try: from pptx import Presentation PPTX_AVAILABLE = True except ImportError: PPTX_AVAILABLE = False print("⚠️ تحذير: python-pptx غير مثبت - لن يتم دعم ملفات PowerPoint") try: from PyPDF2 import PdfReader PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ تحذير: PyPDF2 غير مثبت - لن يتم دعم ملفات PDF") try: from langdetect import detect, DetectorFactory DetectorFactory.seed = 0 # ثابت للنتائج المتوقعة في الكشف عن اللغة LANGDETECT_AVAILABLE = True except ImportError: LANGDETECT_AVAILABLE = False print("⚠️ تحذير: langdetect غير مثبت - لن يتم دعم كشف اللغة") # تحديد الصيغ المدعومة بناءً على المكتبات المتوفرة ALLOWED_EXTENSIONS = set(['txt', 'json', 'xml']) if DOCX_AVAILABLE: ALLOWED_EXTENSIONS.add('docx') if EXCEL_AVAILABLE: ALLOWED_EXTENSIONS.add('xlsx') if PPTX_AVAILABLE: ALLOWED_EXTENSIONS.add('pptx') if PDF_AVAILABLE: ALLOWED_EXTENSIONS.add('pdf') UPLOAD_FOLDER = 'uploads' MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB MAX_MEMORY_CHUNK = 50 * 1024 * 1024 # 50MB للقراءة التدريجية os.makedirs(UPLOAD_FOLDER, exist_ok=True) app = Flask(__name__) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE # إعداد سجل الأخطاء import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def allowed_file(filename): """التحقق من أن امتداد الملف مسموح به""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def safe_read_file(file_stream, max_size=MAX_MEMORY_CHUNK): """قراءة الملف بشكل آمن مع تحديد حد أقصى للحجم""" content = b'' bytes_read = 0 while bytes_read < max_size: chunk = file_stream.read(8192) # قراءة قطع صغيرة if not chunk: break content += chunk bytes_read += len(chunk) if bytes_read >= max_size: logger.warning(f"File exceeded memory limit, truncated to {max_size} bytes") break return content def check_zip_bomb(file_path, max_ratio=100, max_files=10000): """الكشف عن هجمات Zip Bomb للملفات المضغوطة (docx, xlsx, pptx)""" if file_path.endswith(('.docx', '.xlsx', '.pptx')): try: with zipfile.ZipFile(file_path, 'r') as zf: total_size = 0 uncompressed_size = 0 file_count = 0 for info in zf.infolist(): file_count += 1 uncompressed_size += info.file_size total_size += info.compress_size if file_count > max_files: raise ValueError("Too many files in archive - possible zip bomb") if total_size > 0 and uncompressed_size / total_size > max_ratio: raise ValueError("Compression ratio too high - possible zip bomb") except Exception as e: logger.error(f"Zip bomb check failed: {str(e)}") raise def get_text_stats(text): """تحليل النص وإرجاع إحصاءات مفصلة""" # تنظيف نصي متقدم words = [w for w in ''.join(ch if (ch.isalnum() or ch.isspace()) else ' ' for ch in text).split() if w] total_words = len(words) total_chars = len(text) total_chars_no_spaces = len(text.replace(' ', '')) counter = Counter(w.lower() for w in words) top_words = counter.most_common(20) # كشف اللغة language = None try: if LANGDETECT_AVAILABLE and total_words >= 3: language = detect(' '.join(words[:1000])) except Exception: language = None # إحصاءات إضافية paragraphs = [p for p in text.split('\n') if p.strip()] sentences = [s for s in re.split(r'[.!?]+', text) if s.strip()] # تحليل التواريخ date_patterns = [ r'\d{1,2}/\d{1,2}/\d{4}', r'\d{4}-\d{2}-\d{2}', r'\d{1,2}-\d{1,2}-\d{4}' ] dates_found = [] for pattern in date_patterns: dates_found.extend(re.findall(pattern, text)) # تحليل عناوين البريد الإلكتروني emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text) # تحليل الأرقام الهاتفية phones = re.findall(r'[\+]?[1-9]?[0-9]{7,14}', text) return { 'total_words': total_words, 'total_characters': total_chars, 'total_characters_no_spaces': total_chars_no_spaces, 'paragraphs_count': len(paragraphs), 'sentences_count': len(sentences), 'top_words': top_words, 'language': language, 'dates_found': dates_found[:10], # أول 10 تواريخ فقط 'emails_found': emails[:10], # أول 10 عناوين بريد فقط 'phones_found': phones[:10], # أول 10 أرقام فقط 'estimated_reading_minutes': max(1, total_words // 200), # افتراض 200 كلمة/دقيقة 'unique_words_count': len(set(words)), 'average_word_length': sum(len(word) for word in words) / len(words) if words else 0 } def extract_metadata(file_path): """استخراج البيانات الوصفية للملف""" try: stat = os.stat(file_path) return { 'file_hash_md5': calculate_file_hash(file_path), 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'file_size_mb': round(stat.st_size / (1024 * 1024), 2), 'file_size_bytes': stat.st_size } except Exception as e: logger.error(f"Metadata extraction failed: {str(e)}") return {} def calculate_file_hash(file_path): """حساب بصمة الملف""" hasher = hashlib.md5() try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return hasher.hexdigest() except Exception as e: logger.error(f"Hash calculation failed: {str(e)}") return "error_calculating_hash" def analyze_txt(file_stream): """تحليل الملفات النصية""" raw = safe_read_file(file_stream) # كشف الترميز try: enc = chardet.detect(raw)['encoding'] or 'utf-8' text = raw.decode(enc, errors='replace') except Exception as e: logger.warning(f"Encoding detection failed, using utf-8: {str(e)}") text = raw.decode('utf-8', errors='replace') stats = get_text_stats(text) return { 'text_preview': text[:4580], # عرض أول 4580 حرف فقط 'stats': stats, 'metadata': {'encoding': enc}, 'analysis_type': 'text' } def analyze_json(file_stream): """تحليل ملفات JSON""" raw = safe_read_file(file_stream) try: text_content = raw.decode('utf-8', errors='replace') obj = json.loads(text_content) except Exception as e: return {'error': 'invalid_json', 'exception': str(e), 'analysis_type': 'json'} # استخراج النص من الهيكل def extract_strings(o): if isinstance(o, str): return [o] if isinstance(o, dict): res = [] for k, v in o.items(): res.append(k) # إضافة المفاتيح أيضاً res += extract_strings(v) return res if isinstance(o, list): res = [] for v in o: res += extract_strings(v) return res return [] all_text = ' '.join(extract_strings(obj)) stats = get_text_stats(all_text) return { 'json_preview': obj if isinstance(obj, (dict, list)) and len(str(obj)) < 10000 else str(type(obj)), 'structure_type': type(obj).__name__, 'stats': stats, 'analysis_type': 'json' } def analyze_xml(file_stream): """تحليل ملفات XML بشكل آمن""" raw = safe_read_file(file_stream) try: # منع هجمات XXE parser = ET.XMLParser(resolve_entities=False, no_network=True) root = ET.fromstring(raw, parser=parser) except Exception as e: return {'error': 'invalid_xml', 'exception': str(e), 'analysis_type': 'xml'} # استخراج نص من العناصر texts = [] for elem in root.iter(): if elem.text and elem.text.strip(): texts.append(elem.text.strip()) if elem.tail and elem.tail.strip(): texts.append(elem.tail.strip()) all_text = ' '.join(texts) stats = get_text_stats(all_text) return { 'root_tag': root.tag, 'stats': stats, 'elements_count': len(list(root.iter())), 'text_elements_count': len(texts), 'analysis_type': 'xml' } def analyze_docx(path_or_stream): """تحليل ملفات Word""" if not DOCX_AVAILABLE: return {'error': 'docx_support_not_available', 'analysis_type': 'docx'} try: doc = Document(path_or_stream) texts = [] # الفقرات for p in doc.paragraphs: if p.text and p.text.strip(): texts.append(p.text.strip()) # الجداول for table in doc.tables: for row in table.rows: for cell in row.cells: if cell.text and cell.text.strip(): texts.append(cell.text.strip()) # الرؤوس والتذييلات for section in doc.sections: if section.header: for p in section.header.paragraphs: if p.text and p.text.strip(): texts.append(p.text.strip()) if section.footer: for p in section.footer.paragraphs: if p.text and p.text.strip(): texts.append(p.text.strip()) all_text = '\n'.join(texts) stats = get_text_stats(all_text) return { 'stats': stats, 'text_preview': all_text[:10000], 'paragraphs_count': len(doc.paragraphs), 'tables_count': len(doc.tables), 'sections_count': len(doc.sections), 'analysis_type': 'docx' } except Exception as e: return {'error': 'cannot_read_docx', 'exception': str(e), 'analysis_type': 'docx'} def analyze_xlsx(path_or_stream): """تحليل ملفات Excel""" if not EXCEL_AVAILABLE: return {'error': 'xlsx_support_not_available', 'analysis_type': 'xlsx'} try: wb = load_workbook( filename=path_or_stream, read_only=True, data_only=True, keep_vba=False # منع تنفيذ الماكرو ) sheets = wb.sheetnames sheet_summaries = {} all_data = [] for name in sheets: ws = wb[name] rows = [] count = 0 for row in ws.iter_rows(values_only=True, max_row=200): # أول 200 صف فقط row_data = [str(c) if c is not None else '' for c in row] rows.append(row_data) all_data.extend([str(c) for c in row if c is not None]) count += 1 if count >= 200: break sheet_summaries[name] = { 'sample_rows': rows[:10], # أول 10 صفوف فقط للعرض 'sample_row_count': len(rows), 'max_column': ws.max_column, 'max_row': ws.max_row } # تحليل النص المجمع all_text = ' '.join(all_data) stats = get_text_stats(all_text) return { 'sheets': sheets, 'sheet_summaries': sheet_summaries, 'stats': stats, 'analysis_type': 'xlsx' } except Exception as e: return {'error': 'cannot_read_xlsx', 'exception': str(e), 'analysis_type': 'xlsx'} def analyze_pptx(path_or_stream): """تحليل ملفات PowerPoint""" if not PPTX_AVAILABLE: return {'error': 'pptx_support_not_available', 'analysis_type': 'pptx'} try: prs = Presentation(path_or_stream) slides = [] all_texts = [] for i, slide in enumerate(prs.slides): texts = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text and shape.text.strip(): text_content = shape.text.strip() texts.append(text_content) all_texts.append(text_content) slides.append({ 'slide_number': i + 1, 'slide_text': '\n'.join(texts), 'shapes_count': len([s for s in slide.shapes if hasattr(s, 'text') and s.text]) }) combined = '\n'.join(all_texts) stats = get_text_stats(combined) return { 'num_slides': len(slides), 'stats': stats, 'slides_preview': slides[:5], # أول 5 شرائح فقط للعرض 'total_shapes': sum(s['shapes_count'] for s in slides), 'analysis_type': 'pptx' } except Exception as e: return {'error': 'cannot_read_pptx', 'exception': str(e), 'analysis_type': 'pptx'} def analyze_pdf(path_or_stream): """تحليل ملفات PDF""" if not PDF_AVAILABLE: return {'error': 'pdf_support_not_available', 'analysis_type': 'pdf'} try: reader = PdfReader(path_or_stream) texts = [] metadata = {} # استخراج البيانات الوصفية if reader.metadata: metadata = { 'title': reader.metadata.get('/Title', ''), 'author': reader.metadata.get('/Author', ''), 'subject': reader.metadata.get('/Subject', ''), 'creator': reader.metadata.get('/Creator', ''), 'producer': reader.metadata.get('/Producer', ''), 'creation_date': reader.metadata.get('/CreationDate', '') } # استخراج النص من الصفحات for i, page in enumerate(reader.pages): try: page_text = page.extract_text() or '' texts.append(page_text) except Exception as e: logger.warning(f"Failed to extract text from PDF page {i+1}: {str(e)}") texts.append('') all_text = '\n'.join(texts) stats = get_text_stats(all_text) return { 'num_pages': len(reader.pages), 'stats': stats, 'text_preview': all_text[:10000], 'pdf_metadata': metadata, 'encrypted': reader.is_encrypted, 'analysis_type': 'pdf' } except Exception as e: return {'error': 'cannot_read_pdf', 'exception': str(e), 'analysis_type': 'pdf'} @app.route('/') def index(): """الصفحة الرئيسية""" supported_formats = ", ".join(sorted(ALLOWED_EXTENSIONS)) return f""" Simple File Analyzer

📁 Simple File Analyzer

Supported formats: {supported_formats}

Max file size: 200MB

{"
Note: Some file types may not be available due to missing dependencies
" if len(ALLOWED_EXTENSIONS) < 7 else ""}

Upload a File for Analysis



Health Check | Supported Formats

""" @app.route('/supported') def supported_formats(): """عرض الصيغ المدعومة والمكتبات المثبتة""" libraries_status = { 'python-docx (Word)': DOCX_AVAILABLE, 'openpyxl (Excel)': EXCEL_AVAILABLE, 'python-pptx (PowerPoint)': PPTX_AVAILABLE, 'PyPDF2 (PDF)': PDF_AVAILABLE, 'langdetect (Language Detection)': LANGDETECT_AVAILABLE } return jsonify({ 'supported_extensions': sorted(list(ALLOWED_EXTENSIONS)), 'libraries_status': libraries_status, 'max_file_size_mb': MAX_FILE_SIZE / (1024 * 1024) }) @app.route('/health') def health_check(): """فحص صحة الخدمة""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'service': 'File Analyzer', 'supported_formats_count': len(ALLOWED_EXTENSIONS) }) @app.route('/upload', methods=['POST']) def upload_file(): """معالجة رفع الملفات""" try: if 'file' not in request.files: return jsonify({'error': 'no_file_part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'no_selected_file'}), 400 if not file or not allowed_file(file.filename): return jsonify({ 'error': 'file_type_not_allowed', 'allowed_extensions': list(ALLOWED_EXTENSIONS), 'your_file_extension': file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else 'unknown' }), 400 # تأمين اسم الملف filename = secure_filename(file.filename) ext = filename.rsplit('.', 1)[1].lower() save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) # حفظ الملف file.stream.seek(0) file.save(save_path) # التحقق من حجم الملف size = os.path.getsize(save_path) if size > MAX_FILE_SIZE: os.remove(save_path) return jsonify({'error': 'file_too_large', 'max_size_mb': MAX_FILE_SIZE / (1024 * 1024)}), 413 # التحقق من هجمات Zip Bomb للملفات المضغوطة if ext in ['docx', 'xlsx', 'pptx']: try: check_zip_bomb(save_path) except ValueError as e: os.remove(save_path) return jsonify({'error': 'security_risk', 'message': str(e)}), 400 # استخراج البيانات الوصفية metadata = extract_metadata(save_path) # تحليل الملف بناءً على الامتداد analysis_functions = { 'txt': analyze_txt, 'json': analyze_json, 'xml': analyze_xml } # إضافة الدوال المتوفرة فقط if DOCX_AVAILABLE: analysis_functions['docx'] = analyze_docx if EXCEL_AVAILABLE: analysis_functions['xlsx'] = analyze_xlsx if PPTX_AVAILABLE: analysis_functions['pptx'] = analyze_pptx if PDF_AVAILABLE: analysis_functions['pdf'] = analyze_pdf with open(save_path, 'rb') as f: if ext in analysis_functions: result = analysis_functions[ext](f) else: result = {'error': 'unsupported_extension', 'extension': ext} # تنظيف الملف المؤقت (اختياري - يمكن الاحتفاظ به للتحليل المستقبلي) try: os.remove(save_path) except Exception as e: logger.warning(f"Could not remove temporary file: {str(e)}") response = { 'filename': filename, 'extension': ext, 'size_bytes': size, 'upload_time': datetime.now().isoformat(), 'metadata': metadata, 'analysis': result } logger.info(f"File analyzed successfully: {filename} ({size} bytes)") return jsonify(response) except MemoryError: logger.error("Memory error during file processing") return jsonify({'error': 'file_too_large_memory'}), 413 except Exception as e: logger.error(f"Upload error: {str(e)}") return jsonify({'error': 'internal_server_error', 'message': str(e)}), 500 @app.errorhandler(413) def too_large(e): """معالجة أخطاء حجم الملف الكبير""" return jsonify({'error': 'file_too_large', 'max_size_mb': MAX_FILE_SIZE / (1024 * 1024)}), 413 @app.errorhandler(500) def internal_error(e): """معالجة الأخطاء الداخلية""" return jsonify({'error': 'internal_server_error'}), 500 if __name__ == '__main__': logger.info("Starting File Analyzer Service...") logger.info(f"Upload folder: {os.path.abspath(UPLOAD_FOLDER)}") logger.info(f"Allowed extensions: {ALLOWED_EXTENSIONS}") logger.info(f"Available libraries: DOCX={DOCX_AVAILABLE}, EXCEL={EXCEL_AVAILABLE}, PPTX={PPTX_AVAILABLE}, PDF={PDF_AVAILABLE}") print(f"\n🎯 File Analyzer Service Ready!") print(f"📁 Supported formats: {', '.join(sorted(ALLOWED_EXTENSIONS))}") print(f"🌐 Access at: http://127.0.0.1:4580") print(f"💾 Upload folder: {os.path.abspath(UPLOAD_FOLDER)}") app.run( debug=True, host='0.0.0.0', # السماح بالوصول من أي عنوان port=8490 )