|
|
|
|
|
from datetime import datetime |
|
|
from pymongo import MongoClient |
|
|
from gridfs import GridFS |
|
|
import hashlib |
|
|
import os |
|
|
from django.conf import settings |
|
|
from django.http import HttpResponse, Http404 |
|
|
from django.core.files.storage import default_storage |
|
|
from django.core.files.base import ContentFile |
|
|
|
|
|
class MongoDBFileManager: |
|
|
"""Gestionnaire de fichiers MongoDB avec GridFS""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.client = MongoClient('localhost', 27017) |
|
|
self.db = self.client['enise_filesystem'] |
|
|
self.fs = GridFS(self.db) |
|
|
|
|
|
|
|
|
self.metadata_collection = self.db['file_metadata'] |
|
|
self.access_logs = self.db['file_access_logs'] |
|
|
self.permissions = self.db['user_permissions'] |
|
|
|
|
|
def upload_file(self, file_data, filename, user_id, is_public=False, tags=None): |
|
|
"""Uploader un fichier dans GridFS MongoDB""" |
|
|
try: |
|
|
|
|
|
file_hash = self._calculate_file_hash(file_data) |
|
|
|
|
|
|
|
|
existing_file = self.metadata_collection.find_one({'file_hash': file_hash}) |
|
|
if existing_file: |
|
|
return {'success': False, 'message': 'File already exists', 'file_id': str(existing_file['_id'])} |
|
|
|
|
|
|
|
|
file_id = self.fs.put(file_data, filename=filename) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'filename': filename, |
|
|
'original_filename': filename, |
|
|
'file_size': len(file_data) if isinstance(file_data, bytes) else file_data.size, |
|
|
'file_hash': file_hash, |
|
|
'mime_type': self._get_mime_type(filename), |
|
|
'category': self._get_file_category(filename), |
|
|
'tags': tags or [], |
|
|
'owner_id': user_id, |
|
|
'is_public': is_public, |
|
|
'allowed_users': [], |
|
|
'created_at': datetime.now(), |
|
|
'updated_at': datetime.now(), |
|
|
'download_count': 0, |
|
|
'view_count': 0 |
|
|
} |
|
|
|
|
|
|
|
|
result = self.metadata_collection.insert_one(metadata) |
|
|
|
|
|
|
|
|
self._log_access(str(result.inserted_id), user_id, 'upload') |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'file_id': str(result.inserted_id), |
|
|
'message': 'File uploaded successfully' |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {'success': False, 'message': str(e)} |
|
|
|
|
|
def get_file(self, file_id, user_id=None): |
|
|
"""Récupérer un fichier depuis GridFS""" |
|
|
try: |
|
|
|
|
|
metadata = self.metadata_collection.find_one({'_id': file_id}) |
|
|
if not metadata: |
|
|
return None |
|
|
|
|
|
|
|
|
if not self._check_permission(metadata, user_id): |
|
|
return None |
|
|
|
|
|
|
|
|
self._log_access(file_id, user_id, 'view') |
|
|
|
|
|
|
|
|
file_data = self.fs.get(file_id) |
|
|
|
|
|
|
|
|
self.metadata_collection.update_one( |
|
|
{'_id': file_id}, |
|
|
{'$inc': {'view_count': 1}, '$set': {'last_accessed': datetime.now()}} |
|
|
) |
|
|
|
|
|
return file_data, metadata |
|
|
|
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def list_files(self, user_id, category=None, tags=None): |
|
|
"""Lister les fichiers accessibles à un utilisateur""" |
|
|
try: |
|
|
query = { |
|
|
'$or': [ |
|
|
{'is_public': True}, |
|
|
{'owner_id': user_id}, |
|
|
{'allowed_users': user_id} |
|
|
] |
|
|
} |
|
|
|
|
|
if category: |
|
|
query['category'] = category |
|
|
|
|
|
if tags: |
|
|
query['tags'] = {'$in': tags} |
|
|
|
|
|
files = self.metadata_collection.find(query).sort('created_at', -1) |
|
|
|
|
|
return list(files) |
|
|
|
|
|
except Exception: |
|
|
return [] |
|
|
|
|
|
def delete_file(self, file_id, user_id, is_admin=False): |
|
|
"""Supprimer un fichier""" |
|
|
try: |
|
|
metadata = self.metadata_collection.find_one({'_id': file_id}) |
|
|
if not metadata: |
|
|
return False |
|
|
|
|
|
|
|
|
if not is_admin and metadata['owner_id'] != user_id: |
|
|
return False |
|
|
|
|
|
|
|
|
self.fs.delete(file_id) |
|
|
|
|
|
|
|
|
self.metadata_collection.delete_one({'_id': file_id}) |
|
|
|
|
|
|
|
|
self._log_access(file_id, user_id, 'delete') |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def grant_access(self, file_id, user_ids): |
|
|
"""Donner l'accès à un fichier à des utilisateurs spécifiques""" |
|
|
try: |
|
|
self.metadata_collection.update_one( |
|
|
{'_id': file_id}, |
|
|
{'$addToSet': {'allowed_users': {'$each': user_ids}}} |
|
|
) |
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def revoke_access(self, file_id, user_ids): |
|
|
"""Révoquer l'accès à un fichier""" |
|
|
try: |
|
|
self.metadata_collection.update_one( |
|
|
{'_id': file_id}, |
|
|
{'$pull': {'allowed_users': {'$in': user_ids}}} |
|
|
) |
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def get_access_logs(self, file_id=None, user_id=None): |
|
|
"""Récupérer les logs d'accès""" |
|
|
query = {} |
|
|
if file_id: |
|
|
query['file_id'] = file_id |
|
|
if user_id: |
|
|
query['user_id'] = user_id |
|
|
|
|
|
return list(self.access_logs.find(query).sort('timestamp', -1)) |
|
|
|
|
|
|
|
|
def _calculate_file_hash(self, file_data): |
|
|
"""Calculer le hash SHA256""" |
|
|
hash_sha256 = hashlib.sha256() |
|
|
if isinstance(file_data, bytes): |
|
|
hash_sha256.update(file_data) |
|
|
else: |
|
|
for chunk in file_data.chunks(): |
|
|
hash_sha256.update(chunk) |
|
|
return hash_sha256.hexdigest() |
|
|
|
|
|
def _get_mime_type(self, filename): |
|
|
"""Déterminer le type MIME""" |
|
|
ext = os.path.splitext(filename)[1].lower() |
|
|
mime_types = { |
|
|
'.pdf': 'application/pdf', |
|
|
'.doc': 'application/msword', |
|
|
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', |
|
|
'.jpg': 'image/jpeg', |
|
|
'.jpeg': 'image/jpeg', |
|
|
'.png': 'image/png', |
|
|
'.gif': 'image/gif', |
|
|
'.mp4': 'video/mp4', |
|
|
'.mp3': 'audio/mpeg', |
|
|
'.zip': 'application/zip' |
|
|
} |
|
|
return mime_types.get(ext, 'application/octet-stream') |
|
|
|
|
|
def _get_file_category(self, filename): |
|
|
"""Déterminer la catégorie du fichier""" |
|
|
ext = os.path.splitext(filename)[1].lower() |
|
|
if ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: |
|
|
return 'image' |
|
|
elif ext in ['.mp4', '.avi', '.mov', '.wmv']: |
|
|
return 'video' |
|
|
elif ext in ['.mp3', '.wav', '.flac', '.aac']: |
|
|
return 'audio' |
|
|
elif ext in ['.pdf', '.doc', '.docx', '.txt', '.ppt', '.pptx']: |
|
|
return 'document' |
|
|
elif ext in ['.zip', '.rar', '.7z', '.tar', '.gz']: |
|
|
return 'archive' |
|
|
else: |
|
|
return 'other' |
|
|
|
|
|
def _check_permission(self, metadata, user_id): |
|
|
"""Vérifier si un utilisateur a accès à un fichier""" |
|
|
if metadata['is_public']: |
|
|
return True |
|
|
if metadata['owner_id'] == user_id: |
|
|
return True |
|
|
if user_id in metadata.get('allowed_users', []): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _log_access(self, file_id, user_id, access_type, success=True): |
|
|
"""Logger un accès""" |
|
|
log_entry = { |
|
|
'file_id': file_id, |
|
|
'user_id': user_id, |
|
|
'access_type': access_type, |
|
|
'timestamp': datetime.now(), |
|
|
'success': success |
|
|
} |
|
|
self.access_logs.insert_one(log_entry) |