File size: 4,078 Bytes
54b2662
 
 
 
9348624
 
54b2662
 
 
 
 
92c9b4d
 
54b2662
 
b91b0a5
54b2662
 
 
92c9b4d
54b2662
 
 
 
 
 
92c9b4d
54b2662
 
 
 
 
 
 
92c9b4d
54b2662
 
92c9b4d
54b2662
 
 
 
 
9348624
 
54b2662
92c9b4d
9348624
 
92c9b4d
54b2662
 
92c9b4d
54b2662
9348624
54b2662
 
9348624
54b2662
9348624
 
54b2662
 
92c9b4d
54b2662
9348624
54b2662
 
9348624
 
 
54b2662
 
92c9b4d
54b2662
 
 
 
 
92c9b4d
54b2662
 
 
 
 
92c9b4d
54b2662
 
92c9b4d
54b2662
 
 
 
9348624
54b2662
 
 
92c9b4d
9348624
 
 
 
 
92c9b4d
9348624
92c9b4d
9348624
54b2662
92c9b4d
9348624
 
54b2662
 
92c9b4d
54b2662
 
 
92c9b4d
54b2662
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import hashlib
import json
import logging
import os
import tempfile
import shutil
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime

# Constants
CHUNK_SIZE = 8192  # Read files in 8KB chunks
DEFAULT_FILE_EXTENSION = '.pdf'


class HashProcessor:

    def __init__(self, verbose: bool = True):

        self.verbose = verbose
        self.logger = logging.getLogger(__name__)
        if not verbose:
            self.logger.setLevel(logging.WARNING)
    
    def get_file_hash(self, path: str) -> Optional[str]:

        h = hashlib.sha256()
        try:
            with open(path, "rb") as f:
                while chunk := f.read(CHUNK_SIZE):
                    h.update(chunk)
            return h.hexdigest()
        except (IOError, OSError) as e:
            self.logger.error(f"Error reading file {path}: {e}")
            return None
        except Exception as e:
            self.logger.error(f"Unexpected error processing file {path}: {e}")
            return None
    
    def scan_files_for_hash(
        self, 
        source_dir: str, 
        file_extension: str = DEFAULT_FILE_EXTENSION,
        recursive: bool = False
    ) -> Dict[str, List[Dict[str, str]]]:

        source_path = Path(source_dir)
        if not source_path.exists():
            raise FileNotFoundError(f"Directory not found: {source_dir}")
            
        hash_to_files = defaultdict(list)
        self.logger.info(f"Scanning files in: {source_dir}")
        
        pattern = f"**/*{file_extension}" if recursive else f"*{file_extension}"
        
        try:
            files = list(source_path.glob(pattern))
            
            for file_path in files:
                if not file_path.is_file():
                    continue
                    
                self.logger.info(f"Computing hash for: {file_path.name}")
                
                file_hash = self.get_file_hash(str(file_path))
                if file_hash:
                    hash_to_files[file_hash].append({
                        'filename': file_path.name,
                        'path': str(file_path),
                        'size': file_path.stat().st_size
                    })
        except PermissionError as e:
            self.logger.error(f"Permission error: {e}")
            raise
        
        return hash_to_files
    
    def load_processed_index(self, index_file: str) -> Dict:

        if os.path.exists(index_file):
            try:
                with open(index_file, "r", encoding="utf-8") as f:
                    return json.load(f)
            except json.JSONDecodeError as e:
                self.logger.error(f"Error reading index file {index_file}: {e}")
                return {}
            except Exception as e:
                self.logger.error(f"Unexpected error reading index: {e}")
                return {}
        return {}
    
    def save_processed_index(self, index_file: str, processed_hashes: Dict) -> None:
        temp_name = None
        try:
            os.makedirs(os.path.dirname(index_file), exist_ok=True)
            
            # Write to temp file first
            dir_name = os.path.dirname(index_file)
            with tempfile.NamedTemporaryFile('w', dir=dir_name, delete=False, encoding='utf-8') as tmp_file:
                json.dump(processed_hashes, tmp_file, indent=2, ensure_ascii=False)
                temp_name = tmp_file.name
            
            # Atomic rename temp to target (POSIX)
            shutil.move(temp_name, index_file)
            self.logger.info(f"Saved index file safely: {index_file}")
            
        except Exception as e:
            self.logger.error(f"Error saving index file {index_file}: {e}")
            if temp_name and os.path.exists(temp_name):
                os.remove(temp_name)
    
    def get_current_timestamp(self) -> str:

        return datetime.now().isoformat()
    
    def get_string_hash(self, text: str) -> str:

        return hashlib.sha256(text.encode('utf-8')).hexdigest()