Samfredoly commited on
Commit
8f66a5f
·
verified ·
1 Parent(s): 531d32b

Create file_processor.py

Browse files
Files changed (1) hide show
  1. file_processor.py +368 -0
file_processor.py ADDED
@@ -0,0 +1,368 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ File processor class for handling dataset file operations
3
+ """
4
+ import os
5
+ import json
6
+ import time
7
+ import asyncio
8
+ from pathlib import Path
9
+ from typing import Dict, List, Optional, Tuple
10
+ from datetime import datetime
11
+ import aiohttp
12
+ from sqlalchemy.orm import Session
13
+ from models import ProcessingState, ProcessingStatusEnum
14
+ import logging
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ class FileProcessor:
20
+ """Object-oriented file processor for dataset integration"""
21
+
22
+ def __init__(self, processed_files_dir: str = "processed_files"):
23
+ """
24
+ Initialize file processor
25
+
26
+ Args:
27
+ processed_files_dir: Directory to store processed files
28
+ """
29
+ self.processed_files_dir = Path(processed_files_dir)
30
+ self.all_raw_dir = self.processed_files_dir / "all_raw"
31
+ self.ato_raw_dir = self.processed_files_dir / "ato_raw"
32
+
33
+ # Create directories
34
+ self.processed_files_dir.mkdir(parents=True, exist_ok=True)
35
+ self.all_raw_dir.mkdir(parents=True, exist_ok=True)
36
+ self.ato_raw_dir.mkdir(parents=True, exist_ok=True)
37
+
38
+ async def download_file(
39
+ self,
40
+ repo_id: str,
41
+ filename: str,
42
+ local_dir: Path,
43
+ token: Optional[str] = None,
44
+ ) -> Optional[str]:
45
+ """
46
+ Download a single file from Hugging Face
47
+
48
+ Args:
49
+ repo_id: Repository ID (e.g., "samfred2/ALL")
50
+ filename: File to download
51
+ local_dir: Local directory to save file
52
+ token: Optional HF token for authentication
53
+
54
+ Returns:
55
+ Path to downloaded file or None if failed
56
+ """
57
+ try:
58
+ logger.info(f"Downloading {filename} from {repo_id}...")
59
+ await asyncio.sleep(1) # Rate limiting
60
+
61
+ local_path = local_dir / filename
62
+ local_path.parent.mkdir(parents=True, exist_ok=True)
63
+
64
+ url = f"https://huggingface.co/api/datasets/{repo_id}/resolve/main/{filename}"
65
+ headers = {"Authorization": f"Bearer {token}"} if token else {}
66
+
67
+ async with aiohttp.ClientSession() as session:
68
+ async with session.get(url, headers=headers) as response:
69
+ if response.status != 200:
70
+ logger.error(
71
+ f"Failed to download {filename}: HTTP {response.status}"
72
+ )
73
+ return None
74
+
75
+ content = await response.read()
76
+ local_path.write_bytes(content)
77
+ logger.info(f"Downloaded to {local_path}")
78
+ return str(local_path)
79
+
80
+ except Exception as e:
81
+ logger.error(f"Error downloading {filename}: {e}")
82
+ return None
83
+
84
+ def load_json_file(self, file_path: str) -> Optional[Dict]:
85
+ """
86
+ Load and parse JSON file
87
+
88
+ Args:
89
+ file_path: Path to JSON file
90
+
91
+ Returns:
92
+ Parsed JSON data or None if failed
93
+ """
94
+ try:
95
+ with open(file_path, "r") as f:
96
+ return json.load(f)
97
+ except Exception as e:
98
+ logger.error(f"Error loading JSON from {file_path}: {e}")
99
+ return None
100
+
101
+ def find_matching_all_file(
102
+ self, ato_filename: str, all_filenames: List[str]
103
+ ) -> Optional[str]:
104
+ """
105
+ Find matching ALL file for ATO file using suffix matching
106
+
107
+ Args:
108
+ ato_filename: ATO filename to match
109
+ all_filenames: List of ALL filenames
110
+
111
+ Returns:
112
+ Matching ALL filename or None
113
+ """
114
+ for all_name in all_filenames:
115
+ if all_name.endswith(ato_filename):
116
+ return all_name
117
+ return None
118
+
119
+ async def list_json_files(
120
+ self, repo_id: str, token: Optional[str] = None
121
+ ) -> List[str]:
122
+ """
123
+ List all JSON files in a repository
124
+
125
+ Args:
126
+ repo_id: Repository ID
127
+ token: Optional HF token
128
+
129
+ Returns:
130
+ List of JSON filenames
131
+ """
132
+ try:
133
+ url = f"https://huggingface.co/api/datasets/{repo_id}"
134
+ headers = {"Authorization": f"Bearer {token}"} if token else {}
135
+
136
+ async with aiohttp.ClientSession() as session:
137
+ async with session.get(url, headers=headers) as response:
138
+ if response.status != 200:
139
+ logger.error(f"Failed to list files from {repo_id}")
140
+ return []
141
+
142
+ data = await response.json()
143
+ siblings = data.get("siblings", [])
144
+
145
+ return [
146
+ f["rfilename"]
147
+ for f in siblings
148
+ if f["rfilename"].endswith(".json")
149
+ ]
150
+
151
+ except Exception as e:
152
+ logger.error(f"Error listing files from {repo_id}: {e}")
153
+ return []
154
+
155
+ async def process_datasets(
156
+ self,
157
+ all_repo_id: str,
158
+ ato_repo_id: str,
159
+ hf_token: Optional[str] = None,
160
+ max_files: int = 0,
161
+ db: Optional[Session] = None,
162
+ ) -> Dict:
163
+ """
164
+ Process datasets: download, match, integrate, and save
165
+
166
+ Args:
167
+ all_repo_id: Source ALL repository ID
168
+ ato_repo_id: Source ATO repository ID
169
+ hf_token: Optional HF token
170
+ max_files: Maximum files to process (0 = all)
171
+ db: Database session for state tracking
172
+
173
+ Returns:
174
+ Processing result dictionary
175
+ """
176
+ try:
177
+ # Update state to downloading
178
+ if db:
179
+ state = db.query(ProcessingState).first()
180
+ if not state:
181
+ state = ProcessingState(status=ProcessingStatusEnum.DOWNLOADING)
182
+ db.add(state)
183
+ else:
184
+ state.status = ProcessingStatusEnum.DOWNLOADING
185
+ state.started_at = datetime.utcnow()
186
+ db.commit()
187
+
188
+ logger.info("Listing repository files...")
189
+ all_files = await self.list_json_files(all_repo_id, hf_token)
190
+ ato_files = await self.list_json_files(ato_repo_id, hf_token)
191
+
192
+ logger.info(f"Found {len(all_files)} files in {all_repo_id}")
193
+ logger.info(f"Found {len(ato_files)} files in {ato_repo_id}")
194
+
195
+ # Match files
196
+ logger.info("Matching ATO to ALL files...")
197
+ match_map: Dict[str, str] = {}
198
+ for ato_file in ato_files:
199
+ matching_all = self.find_matching_all_file(ato_file, all_files)
200
+ if matching_all:
201
+ match_map[ato_file] = matching_all
202
+
203
+ matched_count = len(match_map)
204
+ logger.info(f"Found {matched_count} matching pairs")
205
+
206
+ if db:
207
+ state = db.query(ProcessingState).first()
208
+ if state:
209
+ state.status = ProcessingStatusEnum.MATCHING
210
+ state.total_files = len(ato_files)
211
+ state.matched_pairs = matched_count
212
+ db.commit()
213
+
214
+ # Process matched files
215
+ logger.info("Processing matched files...")
216
+ processed_count = 0
217
+
218
+ for ato_filename, all_filename in match_map.items():
219
+ if max_files > 0 and processed_count >= max_files:
220
+ logger.info(f"Reached limit of {max_files} files")
221
+ break
222
+
223
+ logger.info(f"Processing: {ato_filename} <-> {all_filename}")
224
+
225
+ # Download ATO file
226
+ ato_path = await self.download_file(
227
+ ato_repo_id, ato_filename, self.ato_raw_dir, hf_token
228
+ )
229
+ if not ato_path:
230
+ continue
231
+
232
+ ato_data = self.load_json_file(ato_path)
233
+ if not ato_data:
234
+ continue
235
+
236
+ # Download ALL file
237
+ all_path = await self.download_file(
238
+ all_repo_id, all_filename, self.all_raw_dir, hf_token
239
+ )
240
+ if not all_path:
241
+ continue
242
+
243
+ all_data = self.load_json_file(all_path)
244
+ if not all_data:
245
+ continue
246
+
247
+ # Integrate transcription
248
+ logger.info("Integrating transcription...")
249
+ all_data["transcription_content"] = ato_data
250
+ all_data["transcription_content"]["full_course_name"] = all_filename
251
+
252
+ # Save integrated file
253
+ output_path = self.processed_files_dir / all_filename
254
+ output_path.parent.mkdir(parents=True, exist_ok=True)
255
+ with open(output_path, "w") as f:
256
+ json.dump(all_data, f, indent=4)
257
+
258
+ logger.info(f"Saved integrated file to {output_path}")
259
+ processed_count += 1
260
+
261
+ if db:
262
+ state = db.query(ProcessingState).first()
263
+ if state:
264
+ state.status = ProcessingStatusEnum.INTEGRATING
265
+ state.processed_files = processed_count
266
+ db.commit()
267
+
268
+ logger.info("Processing complete")
269
+
270
+ if db:
271
+ state = db.query(ProcessingState).first()
272
+ if state:
273
+ state.status = ProcessingStatusEnum.COMPLETED
274
+ state.completed_at = datetime.utcnow()
275
+ db.commit()
276
+
277
+ return {
278
+ "success": True,
279
+ "total_files": len(ato_files),
280
+ "matched_pairs": matched_count,
281
+ "processed_files": processed_count,
282
+ }
283
+
284
+ except Exception as e:
285
+ logger.error(f"Processing error: {e}")
286
+
287
+ if db:
288
+ state = db.query(ProcessingState).first()
289
+ if state:
290
+ state.status = ProcessingStatusEnum.ERROR
291
+ state.error_message = str(e)
292
+ state.completed_at = datetime.utcnow()
293
+ db.commit()
294
+
295
+ return {
296
+ "success": False,
297
+ "total_files": 0,
298
+ "matched_pairs": 0,
299
+ "processed_files": 0,
300
+ "error": str(e),
301
+ }
302
+
303
+ def get_processed_files(self) -> List[str]:
304
+ """
305
+ Get list of processed files ready for upload
306
+
307
+ Returns:
308
+ List of relative file paths
309
+ """
310
+ files = []
311
+
312
+ def walk_dir(directory: Path, prefix: str = ""):
313
+ for item in directory.iterdir():
314
+ relative_path = f"{prefix}/{item.name}" if prefix else item.name
315
+
316
+ if item.is_dir():
317
+ walk_dir(item, relative_path)
318
+ elif item.suffix == ".json":
319
+ files.append(relative_path)
320
+
321
+ walk_dir(self.processed_files_dir)
322
+ return files
323
+
324
+ def get_file_content(self, filename: str) -> Optional[Dict]:
325
+ """
326
+ Get file content for preview
327
+
328
+ Args:
329
+ filename: Relative filename
330
+
331
+ Returns:
332
+ File content or None
333
+ """
334
+ file_path = self.processed_files_dir / filename
335
+
336
+ # Security: prevent directory traversal
337
+ try:
338
+ file_path.resolve().relative_to(self.processed_files_dir.resolve())
339
+ except ValueError:
340
+ logger.warning(f"Directory traversal attempt: {filename}")
341
+ return None
342
+
343
+ if not file_path.exists():
344
+ return None
345
+
346
+ return self.load_json_file(str(file_path))
347
+
348
+ def get_file_size(self, filename: str) -> Optional[int]:
349
+ """
350
+ Get file size in bytes
351
+
352
+ Args:
353
+ filename: Relative filename
354
+
355
+ Returns:
356
+ File size or None
357
+ """
358
+ file_path = self.processed_files_dir / filename
359
+
360
+ try:
361
+ file_path.resolve().relative_to(self.processed_files_dir.resolve())
362
+ except ValueError:
363
+ return None
364
+
365
+ if not file_path.exists():
366
+ return None
367
+
368
+ return file_path.stat().st_size