File size: 8,376 Bytes
9847531
 
 
 
58bdeac
 
 
 
 
9847531
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58bdeac
 
 
9847531
 
58bdeac
 
 
 
 
 
 
 
 
9847531
 
58bdeac
 
9847531
58bdeac
 
 
9847531
58bdeac
 
9847531
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import shutil
from abc import ABC, abstractmethod
import json
try:
    from azure.storage.blob import BlobServiceClient
    AZURE_AVAILABLE = True
except ImportError:
    AZURE_AVAILABLE = False

class StorageInterface(ABC):

    @abstractmethod
    def save_file(self, file_path: str, content: bytes) -> str:
        pass

    @abstractmethod
    def load_file(self, file_path: str) -> bytes:
        pass

    @abstractmethod
    def list_files(self, directory: str) -> list[str]:
        pass

    @abstractmethod
    def file_exists(self, file_path: str) -> bool:
        pass

    @abstractmethod
    def delete_file(self, file_path: str) -> None:
        pass

    @abstractmethod
    def create_directory(self, directory: str) -> None:
        pass

    @abstractmethod
    def delete_directory(self, directory: str) -> None:
        pass

    @abstractmethod
    def upload(self, local_path: str, destination_path: str) -> None:
        pass

    @abstractmethod
    def append_file(self, file_path: str, content: bytes) -> None:
        pass

    @abstractmethod
    def get_modified_time(self, file_path: str) -> float:
        pass

    @abstractmethod
    def directory_exists(self, directory: str) -> bool:
        pass

    def load_json(self, file_path):
        """Load and parse JSON file."""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            return data
        except Exception as e:
            print(f"Error loading JSON from {file_path}: {str(e)}")
            return None


class LocalStorage(StorageInterface):

    def save_file(self, file_path: str, content: bytes) -> str:
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, 'wb') as f:
            f.write(content)
        return file_path

    def load_file(self, file_path: str) -> bytes:
        with open(file_path, 'rb') as f:
            return f.read()

    def list_files(self, directory: str) -> list[str]:
        return [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]

    def file_exists(self, file_path: str) -> bool:
        return os.path.exists(file_path)

    def delete_file(self, file_path: str) -> None:
        os.remove(file_path)

    def create_directory(self, directory: str) -> None:
        os.makedirs(directory, exist_ok=True)

    def delete_directory(self, directory: str) -> None:
        shutil.rmtree(directory)

    def upload(self, local_path: str, destination_path: str) -> None:
        os.makedirs(os.path.dirname(destination_path), exist_ok=True)
        shutil.copy(local_path, destination_path)

    def append_file(self, file_path: str, content: bytes) -> None:
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, 'ab') as f:
            f.write(content)

    def get_modified_time(self, file_path: str) -> float:
        return os.path.getmtime(file_path)

    def directory_exists(self, directory: str) -> bool:
        return self.file_exists(directory)


class BlobStorage(StorageInterface):
    """
        Writes to blob storage, using local disk as a cache

        TODO: Allow configuration of temp dir instead of just using the same paths in both local and remote
    """
    def __init__(self, connection_string: str, container_name: str):
        self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        self.container_client = self.blob_service_client.get_container_client(container_name)
        self.local_storage = LocalStorage()

    def download(self, file_path: str) -> bytes:
        blob_client = self.container_client.get_blob_client(file_path)
        return blob_client.download_blob().readall()

    def sync(self, file_path: str) -> None:
        if not self.local_storage.file_exists(file_path):
            print(f"DEBUG: missing local version of {file_path} - downloading")
            self.local_storage.save_file(file_path, self.download(file_path))
        else:
            local_timestamp = self.local_storage.get_modified_time(file_path)
            remote_timestamp = self.get_modified_time(file_path)
            if local_timestamp < remote_timestamp:
                # We always write remotely before writing locally, so we expect local_timestamp to be > remote timestamp
                print(f"DBEUG: local version of {file_path} out of date - downloading")
                self.local_storage.save_file(file_path, self.download(file_path))

    
    def save_file(self, file_path: str, content: bytes) -> str:
        blob_client = self.container_client.get_blob_client(file_path)
        blob_client.upload_blob(content, overwrite=True)
        self.local_storage.save_file(file_path, content)
        return file_path

    def load_file(self, file_path: str) -> bytes:
        self.sync(file_path)
        return self.local_storage.load_file(file_path)

    def list_files(self, directory: str) -> list[str]:
        return [blob.name for blob in self.container_client.list_blobs(name_starts_with=directory)]

    def file_exists(self, file_path: str) -> bool:
        blob_client = self.container_client.get_blob_client(file_path)
        return blob_client.exists()

    def delete_file(self, file_path: str) -> None:
        self.local_storage.delete_file(file_path)
        blob_client = self.container_client.get_blob_client(file_path)
        blob_client.delete_blob()

    def create_directory(self, directory: str) -> None:
        # Blob storage doesn't have directories, so only create it locally
        self.local_storage.create_directory(directory)

    def delete_directory(self, directory: str) -> None:
        self.local_storage.delete_directory(directory)
        blobs_to_delete = self.container_client.list_blobs(name_starts_with=directory)
        for blob in blobs_to_delete:
            self.container_client.delete_blob(blob.name)

    def upload(self, local_path: str, destination_path: str) -> None:
        with open(local_path, "rb") as data:
            blob_client = self.container_client.get_blob_client(destination_path)
            blob_client.upload_blob(data, overwrite=True)
        self.local_storage.upload(local_path, destination_path)

    def append_file(self, file_path: str, content: bytes) -> None:
        blob_client = self.container_client.get_blob_client(file_path)
        if not blob_client.exists():
            blob_client.create_append_blob()
        else:
            self.sync(file_path)

        blob_client.append_block(content)
        self.local_storage.append_file(file_path, content)

    def get_modified_time(self, file_path: str) -> float:
        blob_client = self.container_client.get_blob_client(file_path)
        properties = blob_client.get_blob_properties()
        # Convert the UTC datetime to a UNIX timestamp
        return properties.last_modified.timestamp()

    def directory_exists(self, directory: str) -> bool:
        blobs = self.container_client.list_blobs(name_starts_with=directory)
        return next(blobs, None) is not None


class StorageFactory:
    @staticmethod
    def get_storage() -> StorageInterface:
        storage_type = os.getenv('STORAGE_TYPE', 'local')
        
        # Always return LocalStorage if STORAGE_TYPE is 'local'
        if storage_type == 'local':
            return LocalStorage()
            
        # Handle Azure storage with fallback
        if storage_type in ['azure', 'blob']:
            # If Azure SDK isn't available, fall back to local
            if not AZURE_AVAILABLE:
                print("Warning: Azure Storage SDK not available, falling back to local storage")
                return LocalStorage()
                
            # Try to get Azure credentials
            connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
            container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME')
            
            # If credentials are missing, fall back to local
            if not connection_string or not container_name:
                print("Warning: Azure credentials not found, falling back to local storage")
                return LocalStorage()
                
            return BlobStorage(connection_string, container_name)
            
        raise ValueError(f"Unsupported storage type: {storage_type}")