meccatronis commited on
Commit
68ef616
·
verified ·
1 Parent(s): 1ec3d37

Upload core/file_analyzer.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. core/file_analyzer.py +522 -0
core/file_analyzer.py ADDED
@@ -0,0 +1,522 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ File Analyzer Module
3
+ ====================
4
+
5
+ Analyzes files for recovery potential and extracts metadata.
6
+ Supports various file formats including images, videos, documents, and databases.
7
+ """
8
+
9
+ import os
10
+ import logging
11
+ import struct
12
+ from typing import Dict, Optional, Any, List, Tuple
13
+ from dataclasses import dataclass
14
+ from datetime import datetime
15
+ from enum import Enum
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ class FileHealth(Enum):
21
+ """File health status"""
22
+ HEALTHY = "healthy"
23
+ DAMAGED = "damaged"
24
+ CORRUPTED = "corrupted"
25
+ PARTIAL = "partial"
26
+ UNKNOWN = "unknown"
27
+
28
+
29
+ @dataclass
30
+ class FileMetadata:
31
+ """Metadata extracted from a file"""
32
+ file_type: str = ""
33
+ mime_type: str = ""
34
+ extension: str = ""
35
+ size: int = 0
36
+ created: Optional[datetime] = None
37
+ modified: Optional[datetime] = None
38
+ width: int = 0
39
+ height: int = 0
40
+ duration: float = 0.0
41
+ bitrate: int = 0
42
+ codec: str = ""
43
+ artist: str = ""
44
+ title: str = ""
45
+ album: str = ""
46
+ camera_make: str = ""
47
+ camera_model: str = ""
48
+ gps_latitude: float = 0.0
49
+ gps_longitude: float = 0.0
50
+ extra: Dict[str, Any] = None
51
+
52
+ def __post_init__(self):
53
+ if self.extra is None:
54
+ self.extra = {}
55
+
56
+
57
+ class FileAnalyzer:
58
+ """
59
+ Analyzes files for recovery and extracts metadata.
60
+
61
+ Features:
62
+ - File signature detection
63
+ - Metadata extraction (EXIF, ID3, etc.)
64
+ - File health assessment
65
+ - Recovery potential estimation
66
+ """
67
+
68
+ # File signatures (magic bytes)
69
+ SIGNATURES = {
70
+ # Images
71
+ b'\xff\xd8\xff\xe0': ('image/jpeg', 'jpg', 'JPEG Image'),
72
+ b'\xff\xd8\xff\xe1': ('image/jpeg', 'jpg', 'JPEG Image with EXIF'),
73
+ b'\xff\xd8\xff\xdb': ('image/jpeg', 'jpg', 'JPEG Image'),
74
+ b'\x89PNG\r\n\x1a\n': ('image/png', 'png', 'PNG Image'),
75
+ b'GIF87a': ('image/gif', 'gif', 'GIF Image'),
76
+ b'GIF89a': ('image/gif', 'gif', 'GIF Image'),
77
+ b'BM': ('image/bmp', 'bmp', 'BMP Image'),
78
+ b'RIFF': ('image/webp', 'webp', 'WebP Image'), # Need further check
79
+ b'\x00\x00\x01\x00': ('image/x-icon', 'ico', 'ICO Icon'),
80
+
81
+ # Videos
82
+ b'\x00\x00\x00\x18ftyp': ('video/mp4', 'mp4', 'MP4 Video'),
83
+ b'\x00\x00\x00\x1cftyp': ('video/mp4', 'mp4', 'MP4 Video'),
84
+ b'\x00\x00\x00\x20ftyp': ('video/mp4', 'mp4', 'MP4 Video'),
85
+ b'\x1aE\xdf\xa3': ('video/x-matroska', 'mkv', 'Matroska Video'),
86
+ b'\x00\x00\x00\x14ftyp3gp': ('video/3gpp', '3gp', '3GP Video'),
87
+ b'FLV\x01': ('video/x-flv', 'flv', 'Flash Video'),
88
+
89
+ # Audio
90
+ b'ID3': ('audio/mpeg', 'mp3', 'MP3 Audio'),
91
+ b'\xff\xfb': ('audio/mpeg', 'mp3', 'MP3 Audio'),
92
+ b'\xff\xf3': ('audio/mpeg', 'mp3', 'MP3 Audio'),
93
+ b'\xff\xf2': ('audio/mpeg', 'mp3', 'MP3 Audio'),
94
+ b'OggS': ('audio/ogg', 'ogg', 'OGG Audio'),
95
+ b'fLaC': ('audio/flac', 'flac', 'FLAC Audio'),
96
+
97
+ # Documents
98
+ b'%PDF': ('application/pdf', 'pdf', 'PDF Document'),
99
+ b'PK\x03\x04': ('application/zip', 'zip', 'ZIP Archive'), # Also docx, xlsx
100
+ b'\xd0\xcf\x11\xe0': ('application/msword', 'doc', 'MS Office Document'),
101
+ b'{\rtf': ('application/rtf', 'rtf', 'RTF Document'),
102
+
103
+ # Databases
104
+ b'SQLite format 3': ('application/x-sqlite3', 'db', 'SQLite Database'),
105
+
106
+ # Archives
107
+ b'\x1f\x8b\x08': ('application/gzip', 'gz', 'GZIP Archive'),
108
+ b'Rar!\x1a\x07': ('application/x-rar', 'rar', 'RAR Archive'),
109
+ b'7z\xbc\xaf\x27\x1c': ('application/x-7z-compressed', '7z', '7-Zip Archive'),
110
+ }
111
+
112
+ # JPEG markers
113
+ JPEG_MARKERS = {
114
+ 0xD8: 'SOI', # Start of Image
115
+ 0xE0: 'APP0', # JFIF
116
+ 0xE1: 'APP1', # EXIF
117
+ 0xDB: 'DQT', # Define Quantization Table
118
+ 0xC0: 'SOF0', # Start of Frame (Baseline)
119
+ 0xC2: 'SOF2', # Start of Frame (Progressive)
120
+ 0xC4: 'DHT', # Define Huffman Table
121
+ 0xDA: 'SOS', # Start of Scan
122
+ 0xD9: 'EOI', # End of Image
123
+ }
124
+
125
+ def __init__(self):
126
+ """Initialize File Analyzer."""
127
+ pass
128
+
129
+ def analyze_file(self, filepath: str) -> Tuple[FileMetadata, FileHealth]:
130
+ """
131
+ Analyze a file and extract metadata.
132
+
133
+ Args:
134
+ filepath: Path to the file
135
+
136
+ Returns:
137
+ Tuple of (FileMetadata, FileHealth)
138
+ """
139
+ metadata = FileMetadata()
140
+ health = FileHealth.UNKNOWN
141
+
142
+ if not os.path.exists(filepath):
143
+ return metadata, FileHealth.CORRUPTED
144
+
145
+ metadata.size = os.path.getsize(filepath)
146
+
147
+ # Get file times
148
+ try:
149
+ stat = os.stat(filepath)
150
+ metadata.modified = datetime.fromtimestamp(stat.st_mtime)
151
+ metadata.created = datetime.fromtimestamp(stat.st_ctime)
152
+ except Exception:
153
+ pass
154
+
155
+ # Read file header
156
+ try:
157
+ with open(filepath, 'rb') as f:
158
+ header = f.read(32)
159
+ except Exception as e:
160
+ logger.error(f"Error reading file: {e}")
161
+ return metadata, FileHealth.CORRUPTED
162
+
163
+ # Identify file type
164
+ file_info = self._identify_file_type(header)
165
+ if file_info:
166
+ metadata.mime_type, metadata.extension, metadata.file_type = file_info
167
+
168
+ # Extract type-specific metadata
169
+ if metadata.mime_type.startswith('image/'):
170
+ metadata, health = self._analyze_image(filepath, metadata)
171
+ elif metadata.mime_type.startswith('video/'):
172
+ metadata, health = self._analyze_video(filepath, metadata)
173
+ elif metadata.mime_type.startswith('audio/'):
174
+ metadata, health = self._analyze_audio(filepath, metadata)
175
+ elif metadata.mime_type == 'application/pdf':
176
+ metadata, health = self._analyze_pdf(filepath, metadata)
177
+ elif metadata.mime_type == 'application/x-sqlite3':
178
+ metadata, health = self._analyze_sqlite(filepath, metadata)
179
+ else:
180
+ health = FileHealth.HEALTHY if metadata.size > 0 else FileHealth.CORRUPTED
181
+
182
+ return metadata, health
183
+
184
+ def _identify_file_type(self, header: bytes) -> Optional[Tuple[str, str, str]]:
185
+ """
186
+ Identify file type from header bytes.
187
+
188
+ Args:
189
+ header: First bytes of the file
190
+
191
+ Returns:
192
+ Tuple of (mime_type, extension, description) or None
193
+ """
194
+ for signature, info in self.SIGNATURES.items():
195
+ if header.startswith(signature):
196
+ return info
197
+
198
+ # Check for JPEG (various markers)
199
+ if header[:2] == b'\xff\xd8':
200
+ return ('image/jpeg', 'jpg', 'JPEG Image')
201
+
202
+ return None
203
+
204
+ def _analyze_image(self, filepath: str, metadata: FileMetadata) -> Tuple[FileMetadata, FileHealth]:
205
+ """Analyze an image file."""
206
+ health = FileHealth.HEALTHY
207
+
208
+ try:
209
+ # Try to use PIL for detailed analysis
210
+ from PIL import Image
211
+ from PIL.ExifTags import TAGS, GPSTAGS
212
+
213
+ with Image.open(filepath) as img:
214
+ metadata.width = img.width
215
+ metadata.height = img.height
216
+
217
+ # Extract EXIF data
218
+ exif_data = img._getexif()
219
+ if exif_data:
220
+ for tag_id, value in exif_data.items():
221
+ tag = TAGS.get(tag_id, tag_id)
222
+
223
+ if tag == 'Make':
224
+ metadata.camera_make = str(value)
225
+ elif tag == 'Model':
226
+ metadata.camera_model = str(value)
227
+ elif tag == 'DateTime':
228
+ try:
229
+ metadata.created = datetime.strptime(value, '%Y:%m:%d %H:%M:%S')
230
+ except Exception:
231
+ pass
232
+ elif tag == 'GPSInfo':
233
+ gps = self._parse_gps_info(value)
234
+ if gps:
235
+ metadata.gps_latitude = gps[0]
236
+ metadata.gps_longitude = gps[1]
237
+
238
+ # Verify image integrity
239
+ img.verify()
240
+
241
+ except Exception as e:
242
+ logger.debug(f"PIL analysis failed: {e}")
243
+ # Fallback to basic analysis
244
+ health = self._check_jpeg_integrity(filepath) if metadata.mime_type == 'image/jpeg' else FileHealth.UNKNOWN
245
+
246
+ return metadata, health
247
+
248
+ def _parse_gps_info(self, gps_info: Dict) -> Optional[Tuple[float, float]]:
249
+ """Parse GPS information from EXIF data."""
250
+ try:
251
+ def convert_to_degrees(value):
252
+ d = float(value[0])
253
+ m = float(value[1])
254
+ s = float(value[2])
255
+ return d + (m / 60.0) + (s / 3600.0)
256
+
257
+ lat = convert_to_degrees(gps_info[2])
258
+ if gps_info[1] == 'S':
259
+ lat = -lat
260
+
261
+ lon = convert_to_degrees(gps_info[4])
262
+ if gps_info[3] == 'W':
263
+ lon = -lon
264
+
265
+ return (lat, lon)
266
+ except Exception:
267
+ return None
268
+
269
+ def _check_jpeg_integrity(self, filepath: str) -> FileHealth:
270
+ """Check JPEG file integrity by scanning markers."""
271
+ try:
272
+ with open(filepath, 'rb') as f:
273
+ # Check SOI marker
274
+ if f.read(2) != b'\xff\xd8':
275
+ return FileHealth.CORRUPTED
276
+
277
+ # Scan for EOI marker
278
+ f.seek(-2, 2) # Go to end
279
+ if f.read(2) == b'\xff\xd9':
280
+ return FileHealth.HEALTHY
281
+
282
+ # EOI not found at end, might be damaged
283
+ return FileHealth.DAMAGED
284
+
285
+ except Exception:
286
+ return FileHealth.CORRUPTED
287
+
288
+ def _analyze_video(self, filepath: str, metadata: FileMetadata) -> Tuple[FileMetadata, FileHealth]:
289
+ """Analyze a video file."""
290
+ health = FileHealth.HEALTHY
291
+
292
+ # Basic analysis - check file structure
293
+ try:
294
+ with open(filepath, 'rb') as f:
295
+ header = f.read(32)
296
+
297
+ # Check for MP4/MOV
298
+ if b'ftyp' in header:
299
+ # Read moov atom for metadata
300
+ f.seek(0)
301
+ content = f.read(min(1024 * 1024, metadata.size)) # First 1MB
302
+
303
+ if b'moov' in content:
304
+ health = FileHealth.HEALTHY
305
+ else:
306
+ health = FileHealth.PARTIAL
307
+
308
+ except Exception as e:
309
+ logger.debug(f"Video analysis failed: {e}")
310
+ health = FileHealth.UNKNOWN
311
+
312
+ return metadata, health
313
+
314
+ def _analyze_audio(self, filepath: str, metadata: FileMetadata) -> Tuple[FileMetadata, FileHealth]:
315
+ """Analyze an audio file."""
316
+ health = FileHealth.HEALTHY
317
+
318
+ try:
319
+ with open(filepath, 'rb') as f:
320
+ header = f.read(128)
321
+
322
+ # Check for ID3 tag
323
+ if header[:3] == b'ID3':
324
+ # Parse ID3v2 header
325
+ version = header[3]
326
+ flags = header[5]
327
+ size = self._decode_syncsafe_int(header[6:10])
328
+
329
+ # Read ID3 frames
330
+ id3_data = f.read(size)
331
+ metadata = self._parse_id3_tags(id3_data, metadata)
332
+
333
+ elif header[:2] in [b'\xff\xfb', b'\xff\xf3', b'\xff\xf2']:
334
+ # MP3 without ID3, check frame sync
335
+ health = FileHealth.HEALTHY
336
+
337
+ except Exception as e:
338
+ logger.debug(f"Audio analysis failed: {e}")
339
+ health = FileHealth.UNKNOWN
340
+
341
+ return metadata, health
342
+
343
+ def _decode_syncsafe_int(self, data: bytes) -> int:
344
+ """Decode ID3v2 syncsafe integer."""
345
+ return (data[0] << 21) | (data[1] << 14) | (data[2] << 7) | data[3]
346
+
347
+ def _parse_id3_tags(self, data: bytes, metadata: FileMetadata) -> FileMetadata:
348
+ """Parse ID3v2 tags."""
349
+ pos = 0
350
+ while pos < len(data) - 10:
351
+ frame_id = data[pos:pos+4].decode('latin-1', errors='ignore')
352
+ if not frame_id.strip() or frame_id[0] == '\x00':
353
+ break
354
+
355
+ frame_size = struct.unpack('>I', data[pos+4:pos+8])[0]
356
+ frame_data = data[pos+10:pos+10+frame_size]
357
+
358
+ try:
359
+ # Skip encoding byte
360
+ text = frame_data[1:].decode('utf-8', errors='ignore').strip('\x00')
361
+
362
+ if frame_id == 'TIT2':
363
+ metadata.title = text
364
+ elif frame_id == 'TPE1':
365
+ metadata.artist = text
366
+ elif frame_id == 'TALB':
367
+ metadata.album = text
368
+ except Exception:
369
+ pass
370
+
371
+ pos += 10 + frame_size
372
+
373
+ return metadata
374
+
375
+ def _analyze_pdf(self, filepath: str, metadata: FileMetadata) -> Tuple[FileMetadata, FileHealth]:
376
+ """Analyze a PDF file."""
377
+ health = FileHealth.HEALTHY
378
+
379
+ try:
380
+ with open(filepath, 'rb') as f:
381
+ # Check header
382
+ header = f.read(8)
383
+ if not header.startswith(b'%PDF'):
384
+ return metadata, FileHealth.CORRUPTED
385
+
386
+ # Check for EOF marker
387
+ f.seek(-1024, 2)
388
+ tail = f.read()
389
+ if b'%%EOF' in tail:
390
+ health = FileHealth.HEALTHY
391
+ else:
392
+ health = FileHealth.DAMAGED
393
+
394
+ except Exception:
395
+ health = FileHealth.UNKNOWN
396
+
397
+ return metadata, health
398
+
399
+ def _analyze_sqlite(self, filepath: str, metadata: FileMetadata) -> Tuple[FileMetadata, FileHealth]:
400
+ """Analyze a SQLite database file."""
401
+ health = FileHealth.HEALTHY
402
+
403
+ try:
404
+ import sqlite3
405
+
406
+ conn = sqlite3.connect(filepath)
407
+ cursor = conn.cursor()
408
+
409
+ # Run integrity check
410
+ cursor.execute("PRAGMA integrity_check")
411
+ result = cursor.fetchone()
412
+
413
+ if result[0] == 'ok':
414
+ health = FileHealth.HEALTHY
415
+ else:
416
+ health = FileHealth.DAMAGED
417
+
418
+ # Get table count
419
+ cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'")
420
+ table_count = cursor.fetchone()[0]
421
+ metadata.extra['table_count'] = table_count
422
+
423
+ conn.close()
424
+
425
+ except Exception as e:
426
+ logger.debug(f"SQLite analysis failed: {e}")
427
+ health = FileHealth.CORRUPTED
428
+
429
+ return metadata, health
430
+
431
+ def estimate_recovery_chance(self, filepath: str) -> float:
432
+ """
433
+ Estimate the chance of successful recovery.
434
+
435
+ Args:
436
+ filepath: Path to the file
437
+
438
+ Returns:
439
+ Recovery chance as percentage (0-100)
440
+ """
441
+ metadata, health = self.analyze_file(filepath)
442
+
443
+ if health == FileHealth.HEALTHY:
444
+ return 100.0
445
+ elif health == FileHealth.DAMAGED:
446
+ return 75.0
447
+ elif health == FileHealth.PARTIAL:
448
+ return 50.0
449
+ elif health == FileHealth.CORRUPTED:
450
+ return 25.0
451
+ else:
452
+ return 50.0
453
+
454
+ def get_file_preview(self, filepath: str, max_size: int = 1024) -> bytes:
455
+ """
456
+ Get a preview of file contents.
457
+
458
+ Args:
459
+ filepath: Path to the file
460
+ max_size: Maximum preview size in bytes
461
+
462
+ Returns:
463
+ Preview bytes
464
+ """
465
+ try:
466
+ with open(filepath, 'rb') as f:
467
+ return f.read(max_size)
468
+ except Exception:
469
+ return b''
470
+
471
+ def compare_files(self, file1: str, file2: str) -> Dict[str, Any]:
472
+ """
473
+ Compare two files.
474
+
475
+ Args:
476
+ file1: Path to first file
477
+ file2: Path to second file
478
+
479
+ Returns:
480
+ Comparison results
481
+ """
482
+ import hashlib
483
+
484
+ result = {
485
+ 'identical': False,
486
+ 'size_match': False,
487
+ 'type_match': False,
488
+ 'hash_match': False,
489
+ }
490
+
491
+ # Compare sizes
492
+ size1 = os.path.getsize(file1) if os.path.exists(file1) else 0
493
+ size2 = os.path.getsize(file2) if os.path.exists(file2) else 0
494
+ result['size_match'] = size1 == size2
495
+
496
+ # Compare types
497
+ meta1, _ = self.analyze_file(file1)
498
+ meta2, _ = self.analyze_file(file2)
499
+ result['type_match'] = meta1.mime_type == meta2.mime_type
500
+
501
+ # Compare hashes
502
+ def get_hash(filepath):
503
+ hasher = hashlib.md5()
504
+ try:
505
+ with open(filepath, 'rb') as f:
506
+ for chunk in iter(lambda: f.read(8192), b''):
507
+ hasher.update(chunk)
508
+ return hasher.hexdigest()
509
+ except Exception:
510
+ return None
511
+
512
+ hash1 = get_hash(file1)
513
+ hash2 = get_hash(file2)
514
+ result['hash_match'] = hash1 == hash2 and hash1 is not None
515
+
516
+ result['identical'] = all([
517
+ result['size_match'],
518
+ result['type_match'],
519
+ result['hash_match']
520
+ ])
521
+
522
+ return result