Because / File_analyzer.py
mrwabnalas40's picture
Update File_analyzer.py
6a500c8 verified
"""
file_analyzer.py - الإصدار المحسن والمصحح
وصف: خدمة ويب بسيطة تستقبل تحميل ملفات (MS Office: .docx, .xlsx, .pptx), PDF, .txt, .json, .xml
ثم تقوم بتحليلها وإرجاع نتيجة تحليل JSON تحتوي على: نوع الملف، حجم، نص مستخرج، إحصاءات بسيطة (عدد الكلمات، الكلمات الشائعة)، وملاحظات خاصة بكل نوع (عدد الصفحات للـ PDF، أوراق العمل في Excel ..)
المتطلبات (ثبتها قبل التشغيل):
pip install flask werkzeug python-docx openpyxl python-pptx pypdf2 langdetect chardet
ملاحظات:
- PyPDF2 يستخدم لاستخراج نص من PDF (بسيط). لنتائج أفضل مع PDF الممسوحة ضوئياً استخدم OCR مثل pytesseract (لم يُدرج هنا).
- هذا سكربت تعليمي — تأكد من تشغيله في بيئة آمنة عند استقبال ملفات من الخارج.
تشغيل:
python file_analyzer.py
ثم افتح http://127.0.0.1:4580 وادخل إلى endpoint /upload (POST) لرفع ملف (form field name = file)
"""
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
import os
import io
import json
import xml.etree.ElementTree as ET
from collections import Counter
import chardet
import re
import zipfile
from datetime import datetime
import hashlib
# استيراد المكتبات مع معالجة الأخطاء
try:
from docx import Document
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
print("⚠️ تحذير: python-docx غير مثبت - لن يتم دعم ملفات Word")
try:
from openpyxl import load_workbook
EXCEL_AVAILABLE = True
except ImportError:
EXCEL_AVAILABLE = False
print("⚠️ تحذير: openpyxl غير مثبت - لن يتم دعم ملفات Excel")
try:
from pptx import Presentation
PPTX_AVAILABLE = True
except ImportError:
PPTX_AVAILABLE = False
print("⚠️ تحذير: python-pptx غير مثبت - لن يتم دعم ملفات PowerPoint")
try:
from PyPDF2 import PdfReader
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
print("⚠️ تحذير: PyPDF2 غير مثبت - لن يتم دعم ملفات PDF")
try:
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0 # ثابت للنتائج المتوقعة في الكشف عن اللغة
LANGDETECT_AVAILABLE = True
except ImportError:
LANGDETECT_AVAILABLE = False
print("⚠️ تحذير: langdetect غير مثبت - لن يتم دعم كشف اللغة")
# تحديد الصيغ المدعومة بناءً على المكتبات المتوفرة
ALLOWED_EXTENSIONS = set(['txt', 'json', 'xml'])
if DOCX_AVAILABLE:
ALLOWED_EXTENSIONS.add('docx')
if EXCEL_AVAILABLE:
ALLOWED_EXTENSIONS.add('xlsx')
if PPTX_AVAILABLE:
ALLOWED_EXTENSIONS.add('pptx')
if PDF_AVAILABLE:
ALLOWED_EXTENSIONS.add('pdf')
UPLOAD_FOLDER = 'uploads'
MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB
MAX_MEMORY_CHUNK = 50 * 1024 * 1024 # 50MB للقراءة التدريجية
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
# إعداد سجل الأخطاء
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def allowed_file(filename):
"""التحقق من أن امتداد الملف مسموح به"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def safe_read_file(file_stream, max_size=MAX_MEMORY_CHUNK):
"""قراءة الملف بشكل آمن مع تحديد حد أقصى للحجم"""
content = b''
bytes_read = 0
while bytes_read < max_size:
chunk = file_stream.read(8192) # قراءة قطع صغيرة
if not chunk:
break
content += chunk
bytes_read += len(chunk)
if bytes_read >= max_size:
logger.warning(f"File exceeded memory limit, truncated to {max_size} bytes")
break
return content
def check_zip_bomb(file_path, max_ratio=100, max_files=10000):
"""الكشف عن هجمات Zip Bomb للملفات المضغوطة (docx, xlsx, pptx)"""
if file_path.endswith(('.docx', '.xlsx', '.pptx')):
try:
with zipfile.ZipFile(file_path, 'r') as zf:
total_size = 0
uncompressed_size = 0
file_count = 0
for info in zf.infolist():
file_count += 1
uncompressed_size += info.file_size
total_size += info.compress_size
if file_count > max_files:
raise ValueError("Too many files in archive - possible zip bomb")
if total_size > 0 and uncompressed_size / total_size > max_ratio:
raise ValueError("Compression ratio too high - possible zip bomb")
except Exception as e:
logger.error(f"Zip bomb check failed: {str(e)}")
raise
def get_text_stats(text):
"""تحليل النص وإرجاع إحصاءات مفصلة"""
# تنظيف نصي متقدم
words = [w for w in ''.join(ch if (ch.isalnum() or ch.isspace()) else ' ' for ch in text).split() if w]
total_words = len(words)
total_chars = len(text)
total_chars_no_spaces = len(text.replace(' ', ''))
counter = Counter(w.lower() for w in words)
top_words = counter.most_common(20)
# كشف اللغة
language = None
try:
if LANGDETECT_AVAILABLE and total_words >= 3:
language = detect(' '.join(words[:1000]))
except Exception:
language = None
# إحصاءات إضافية
paragraphs = [p for p in text.split('\n') if p.strip()]
sentences = [s for s in re.split(r'[.!?]+', text) if s.strip()]
# تحليل التواريخ
date_patterns = [
r'\d{1,2}/\d{1,2}/\d{4}',
r'\d{4}-\d{2}-\d{2}',
r'\d{1,2}-\d{1,2}-\d{4}'
]
dates_found = []
for pattern in date_patterns:
dates_found.extend(re.findall(pattern, text))
# تحليل عناوين البريد الإلكتروني
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)
# تحليل الأرقام الهاتفية
phones = re.findall(r'[\+]?[1-9]?[0-9]{7,14}', text)
return {
'total_words': total_words,
'total_characters': total_chars,
'total_characters_no_spaces': total_chars_no_spaces,
'paragraphs_count': len(paragraphs),
'sentences_count': len(sentences),
'top_words': top_words,
'language': language,
'dates_found': dates_found[:10], # أول 10 تواريخ فقط
'emails_found': emails[:10], # أول 10 عناوين بريد فقط
'phones_found': phones[:10], # أول 10 أرقام فقط
'estimated_reading_minutes': max(1, total_words // 200), # افتراض 200 كلمة/دقيقة
'unique_words_count': len(set(words)),
'average_word_length': sum(len(word) for word in words) / len(words) if words else 0
}
def extract_metadata(file_path):
"""استخراج البيانات الوصفية للملف"""
try:
stat = os.stat(file_path)
return {
'file_hash_md5': calculate_file_hash(file_path),
'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'file_size_mb': round(stat.st_size / (1024 * 1024), 2),
'file_size_bytes': stat.st_size
}
except Exception as e:
logger.error(f"Metadata extraction failed: {str(e)}")
return {}
def calculate_file_hash(file_path):
"""حساب بصمة الملف"""
hasher = hashlib.md5()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
logger.error(f"Hash calculation failed: {str(e)}")
return "error_calculating_hash"
def analyze_txt(file_stream):
"""تحليل الملفات النصية"""
raw = safe_read_file(file_stream)
# كشف الترميز
try:
enc = chardet.detect(raw)['encoding'] or 'utf-8'
text = raw.decode(enc, errors='replace')
except Exception as e:
logger.warning(f"Encoding detection failed, using utf-8: {str(e)}")
text = raw.decode('utf-8', errors='replace')
stats = get_text_stats(text)
return {
'text_preview': text[:4580], # عرض أول 4580 حرف فقط
'stats': stats,
'metadata': {'encoding': enc},
'analysis_type': 'text'
}
def analyze_json(file_stream):
"""تحليل ملفات JSON"""
raw = safe_read_file(file_stream)
try:
text_content = raw.decode('utf-8', errors='replace')
obj = json.loads(text_content)
except Exception as e:
return {'error': 'invalid_json', 'exception': str(e), 'analysis_type': 'json'}
# استخراج النص من الهيكل
def extract_strings(o):
if isinstance(o, str):
return [o]
if isinstance(o, dict):
res = []
for k, v in o.items():
res.append(k) # إضافة المفاتيح أيضاً
res += extract_strings(v)
return res
if isinstance(o, list):
res = []
for v in o:
res += extract_strings(v)
return res
return []
all_text = ' '.join(extract_strings(obj))
stats = get_text_stats(all_text)
return {
'json_preview': obj if isinstance(obj, (dict, list)) and len(str(obj)) < 10000 else str(type(obj)),
'structure_type': type(obj).__name__,
'stats': stats,
'analysis_type': 'json'
}
def analyze_xml(file_stream):
"""تحليل ملفات XML بشكل آمن"""
raw = safe_read_file(file_stream)
try:
# منع هجمات XXE
parser = ET.XMLParser(resolve_entities=False, no_network=True)
root = ET.fromstring(raw, parser=parser)
except Exception as e:
return {'error': 'invalid_xml', 'exception': str(e), 'analysis_type': 'xml'}
# استخراج نص من العناصر
texts = []
for elem in root.iter():
if elem.text and elem.text.strip():
texts.append(elem.text.strip())
if elem.tail and elem.tail.strip():
texts.append(elem.tail.strip())
all_text = ' '.join(texts)
stats = get_text_stats(all_text)
return {
'root_tag': root.tag,
'stats': stats,
'elements_count': len(list(root.iter())),
'text_elements_count': len(texts),
'analysis_type': 'xml'
}
def analyze_docx(path_or_stream):
"""تحليل ملفات Word"""
if not DOCX_AVAILABLE:
return {'error': 'docx_support_not_available', 'analysis_type': 'docx'}
try:
doc = Document(path_or_stream)
texts = []
# الفقرات
for p in doc.paragraphs:
if p.text and p.text.strip():
texts.append(p.text.strip())
# الجداول
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if cell.text and cell.text.strip():
texts.append(cell.text.strip())
# الرؤوس والتذييلات
for section in doc.sections:
if section.header:
for p in section.header.paragraphs:
if p.text and p.text.strip():
texts.append(p.text.strip())
if section.footer:
for p in section.footer.paragraphs:
if p.text and p.text.strip():
texts.append(p.text.strip())
all_text = '\n'.join(texts)
stats = get_text_stats(all_text)
return {
'stats': stats,
'text_preview': all_text[:10000],
'paragraphs_count': len(doc.paragraphs),
'tables_count': len(doc.tables),
'sections_count': len(doc.sections),
'analysis_type': 'docx'
}
except Exception as e:
return {'error': 'cannot_read_docx', 'exception': str(e), 'analysis_type': 'docx'}
def analyze_xlsx(path_or_stream):
"""تحليل ملفات Excel"""
if not EXCEL_AVAILABLE:
return {'error': 'xlsx_support_not_available', 'analysis_type': 'xlsx'}
try:
wb = load_workbook(
filename=path_or_stream,
read_only=True,
data_only=True,
keep_vba=False # منع تنفيذ الماكرو
)
sheets = wb.sheetnames
sheet_summaries = {}
all_data = []
for name in sheets:
ws = wb[name]
rows = []
count = 0
for row in ws.iter_rows(values_only=True, max_row=200): # أول 200 صف فقط
row_data = [str(c) if c is not None else '' for c in row]
rows.append(row_data)
all_data.extend([str(c) for c in row if c is not None])
count += 1
if count >= 200:
break
sheet_summaries[name] = {
'sample_rows': rows[:10], # أول 10 صفوف فقط للعرض
'sample_row_count': len(rows),
'max_column': ws.max_column,
'max_row': ws.max_row
}
# تحليل النص المجمع
all_text = ' '.join(all_data)
stats = get_text_stats(all_text)
return {
'sheets': sheets,
'sheet_summaries': sheet_summaries,
'stats': stats,
'analysis_type': 'xlsx'
}
except Exception as e:
return {'error': 'cannot_read_xlsx', 'exception': str(e), 'analysis_type': 'xlsx'}
def analyze_pptx(path_or_stream):
"""تحليل ملفات PowerPoint"""
if not PPTX_AVAILABLE:
return {'error': 'pptx_support_not_available', 'analysis_type': 'pptx'}
try:
prs = Presentation(path_or_stream)
slides = []
all_texts = []
for i, slide in enumerate(prs.slides):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text and shape.text.strip():
text_content = shape.text.strip()
texts.append(text_content)
all_texts.append(text_content)
slides.append({
'slide_number': i + 1,
'slide_text': '\n'.join(texts),
'shapes_count': len([s for s in slide.shapes if hasattr(s, 'text') and s.text])
})
combined = '\n'.join(all_texts)
stats = get_text_stats(combined)
return {
'num_slides': len(slides),
'stats': stats,
'slides_preview': slides[:5], # أول 5 شرائح فقط للعرض
'total_shapes': sum(s['shapes_count'] for s in slides),
'analysis_type': 'pptx'
}
except Exception as e:
return {'error': 'cannot_read_pptx', 'exception': str(e), 'analysis_type': 'pptx'}
def analyze_pdf(path_or_stream):
"""تحليل ملفات PDF"""
if not PDF_AVAILABLE:
return {'error': 'pdf_support_not_available', 'analysis_type': 'pdf'}
try:
reader = PdfReader(path_or_stream)
texts = []
metadata = {}
# استخراج البيانات الوصفية
if reader.metadata:
metadata = {
'title': reader.metadata.get('/Title', ''),
'author': reader.metadata.get('/Author', ''),
'subject': reader.metadata.get('/Subject', ''),
'creator': reader.metadata.get('/Creator', ''),
'producer': reader.metadata.get('/Producer', ''),
'creation_date': reader.metadata.get('/CreationDate', '')
}
# استخراج النص من الصفحات
for i, page in enumerate(reader.pages):
try:
page_text = page.extract_text() or ''
texts.append(page_text)
except Exception as e:
logger.warning(f"Failed to extract text from PDF page {i+1}: {str(e)}")
texts.append('')
all_text = '\n'.join(texts)
stats = get_text_stats(all_text)
return {
'num_pages': len(reader.pages),
'stats': stats,
'text_preview': all_text[:10000],
'pdf_metadata': metadata,
'encrypted': reader.is_encrypted,
'analysis_type': 'pdf'
}
except Exception as e:
return {'error': 'cannot_read_pdf', 'exception': str(e), 'analysis_type': 'pdf'}
@app.route('/')
def index():
"""الصفحة الرئيسية"""
supported_formats = ", ".join(sorted(ALLOWED_EXTENSIONS))
return f"""
<html>
<head>
<title>Simple File Analyzer</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.container {{ max-width: 800px; margin: 0 auto; }}
.upload-form {{ border: 2px dashed #ccc; padding: 20px; text-align: center; }}
.info {{ background: #f0f8ff; padding: 15px; border-radius: 5px; }}
.warning {{ background: #fffacd; padding: 10px; border-radius: 5px; margin: 10px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>📁 Simple File Analyzer</h1>
<div class="info">
<p><strong>Supported formats:</strong> {supported_formats}</p>
<p><strong>Max file size:</strong> 200MB</p>
</div>
{"<div class='warning'><strong>Note:</strong> Some file types may not be available due to missing dependencies</div>" if len(ALLOWED_EXTENSIONS) < 7 else ""}
<div class="upload-form">
<h3>Upload a File for Analysis</h3>
<form action="/upload" method="post" enctype="multipart/form-data">
<input type="file" name="file" required>
<br><br>
<input type="submit" value="Analyze File" style="padding: 10px 20px;">
</form>
</div>
<p><a href="/health">Health Check</a> | <a href="/supported">Supported Formats</a></p>
</div>
</body>
</html>
"""
@app.route('/supported')
def supported_formats():
"""عرض الصيغ المدعومة والمكتبات المثبتة"""
libraries_status = {
'python-docx (Word)': DOCX_AVAILABLE,
'openpyxl (Excel)': EXCEL_AVAILABLE,
'python-pptx (PowerPoint)': PPTX_AVAILABLE,
'PyPDF2 (PDF)': PDF_AVAILABLE,
'langdetect (Language Detection)': LANGDETECT_AVAILABLE
}
return jsonify({
'supported_extensions': sorted(list(ALLOWED_EXTENSIONS)),
'libraries_status': libraries_status,
'max_file_size_mb': MAX_FILE_SIZE / (1024 * 1024)
})
@app.route('/health')
def health_check():
"""فحص صحة الخدمة"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'service': 'File Analyzer',
'supported_formats_count': len(ALLOWED_EXTENSIONS)
})
@app.route('/upload', methods=['POST'])
def upload_file():
"""معالجة رفع الملفات"""
try:
if 'file' not in request.files:
return jsonify({'error': 'no_file_part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'no_selected_file'}), 400
if not file or not allowed_file(file.filename):
return jsonify({
'error': 'file_type_not_allowed',
'allowed_extensions': list(ALLOWED_EXTENSIONS),
'your_file_extension': file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else 'unknown'
}), 400
# تأمين اسم الملف
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower()
save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# حفظ الملف
file.stream.seek(0)
file.save(save_path)
# التحقق من حجم الملف
size = os.path.getsize(save_path)
if size > MAX_FILE_SIZE:
os.remove(save_path)
return jsonify({'error': 'file_too_large', 'max_size_mb': MAX_FILE_SIZE / (1024 * 1024)}), 413
# التحقق من هجمات Zip Bomb للملفات المضغوطة
if ext in ['docx', 'xlsx', 'pptx']:
try:
check_zip_bomb(save_path)
except ValueError as e:
os.remove(save_path)
return jsonify({'error': 'security_risk', 'message': str(e)}), 400
# استخراج البيانات الوصفية
metadata = extract_metadata(save_path)
# تحليل الملف بناءً على الامتداد
analysis_functions = {
'txt': analyze_txt,
'json': analyze_json,
'xml': analyze_xml
}
# إضافة الدوال المتوفرة فقط
if DOCX_AVAILABLE:
analysis_functions['docx'] = analyze_docx
if EXCEL_AVAILABLE:
analysis_functions['xlsx'] = analyze_xlsx
if PPTX_AVAILABLE:
analysis_functions['pptx'] = analyze_pptx
if PDF_AVAILABLE:
analysis_functions['pdf'] = analyze_pdf
with open(save_path, 'rb') as f:
if ext in analysis_functions:
result = analysis_functions[ext](f)
else:
result = {'error': 'unsupported_extension', 'extension': ext}
# تنظيف الملف المؤقت (اختياري - يمكن الاحتفاظ به للتحليل المستقبلي)
try:
os.remove(save_path)
except Exception as e:
logger.warning(f"Could not remove temporary file: {str(e)}")
response = {
'filename': filename,
'extension': ext,
'size_bytes': size,
'upload_time': datetime.now().isoformat(),
'metadata': metadata,
'analysis': result
}
logger.info(f"File analyzed successfully: {filename} ({size} bytes)")
return jsonify(response)
except MemoryError:
logger.error("Memory error during file processing")
return jsonify({'error': 'file_too_large_memory'}), 413
except Exception as e:
logger.error(f"Upload error: {str(e)}")
return jsonify({'error': 'internal_server_error', 'message': str(e)}), 500
@app.errorhandler(413)
def too_large(e):
"""معالجة أخطاء حجم الملف الكبير"""
return jsonify({'error': 'file_too_large', 'max_size_mb': MAX_FILE_SIZE / (1024 * 1024)}), 413
@app.errorhandler(500)
def internal_error(e):
"""معالجة الأخطاء الداخلية"""
return jsonify({'error': 'internal_server_error'}), 500
if __name__ == '__main__':
logger.info("Starting File Analyzer Service...")
logger.info(f"Upload folder: {os.path.abspath(UPLOAD_FOLDER)}")
logger.info(f"Allowed extensions: {ALLOWED_EXTENSIONS}")
logger.info(f"Available libraries: DOCX={DOCX_AVAILABLE}, EXCEL={EXCEL_AVAILABLE}, PPTX={PPTX_AVAILABLE}, PDF={PDF_AVAILABLE}")
print(f"\n🎯 File Analyzer Service Ready!")
print(f"📁 Supported formats: {', '.join(sorted(ALLOWED_EXTENSIONS))}")
print(f"🌐 Access at: http://127.0.0.1:4580")
print(f"💾 Upload folder: {os.path.abspath(UPLOAD_FOLDER)}")
app.run(
debug=True,
host='0.0.0.0', # السماح بالوصول من أي عنوان
port=8490
)