davidtran999 commited on
Commit
9030829
·
verified ·
1 Parent(s): 1462240

Upload backend/scripts/etl_load.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. backend/scripts/etl_load.py +368 -0
backend/scripts/etl_load.py ADDED
@@ -0,0 +1,368 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import csv
3
+ import os
4
+ import sys
5
+ from datetime import datetime, date
6
+ from pathlib import Path
7
+ from typing import Dict, Optional
8
+
9
+ import django
10
+ from pydantic import BaseModel, ValidationError, field_validator
11
+
12
+
13
+ ROOT_DIR = Path(__file__).resolve().parents[2]
14
+ BACKEND_DIR = ROOT_DIR / "backend"
15
+ HUE_PORTAL_DIR = BACKEND_DIR / "hue_portal"
16
+ DEFAULT_DATA_DIR = ROOT_DIR / "tài nguyên"
17
+ DATA_DIR = Path(os.environ.get("ETL_DATA_DIR", DEFAULT_DATA_DIR))
18
+ LOG_DIR = ROOT_DIR / "backend" / "logs" / "data_quality"
19
+
20
+ # Add backend directory to sys.path so Django can find hue_portal package
21
+ # Django needs to import hue_portal.hue_portal.settings, so backend/ must be in path
22
+ # IMPORTANT: Only add BACKEND_DIR, not HUE_PORTAL_DIR, because Django needs to find
23
+ # the hue_portal package (which is in backend/hue_portal), not the hue_portal directory itself
24
+ if str(BACKEND_DIR) not in sys.path:
25
+ sys.path.insert(0, str(BACKEND_DIR))
26
+
27
+ # Add root for other imports if needed (but not HUE_PORTAL_DIR as it breaks Django imports)
28
+ if str(ROOT_DIR) not in sys.path:
29
+ sys.path.insert(0, str(ROOT_DIR))
30
+
31
+ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hue_portal.hue_portal.settings")
32
+ django.setup()
33
+
34
+ from hue_portal.core.models import Fine, Office, Procedure, Advisory # noqa: E402
35
+
36
+
37
+ LOG_DIR.mkdir(parents=True, exist_ok=True)
38
+
39
+
40
+ class OfficeRecord(BaseModel):
41
+ unit_name: str
42
+ address: Optional[str] = ""
43
+ district: Optional[str] = ""
44
+ working_hours: Optional[str] = ""
45
+ phone: Optional[str] = ""
46
+ email: Optional[str] = ""
47
+ latitude: Optional[float]
48
+ longitude: Optional[float]
49
+ service_scope: Optional[str] = ""
50
+ updated_at: Optional[datetime]
51
+
52
+ @field_validator("unit_name")
53
+ @classmethod
54
+ def unit_name_not_blank(cls, value: str) -> str:
55
+ if not value:
56
+ raise ValueError("unit_name is required")
57
+ return value
58
+
59
+
60
+ class FineRecord(BaseModel):
61
+ violation_code: str
62
+ violation_name: Optional[str] = ""
63
+ article: Optional[str] = ""
64
+ decree: Optional[str] = ""
65
+ min_fine: Optional[float]
66
+ max_fine: Optional[float]
67
+ license_points: Optional[str] = ""
68
+ remedial_measures: Optional[str] = ""
69
+ source_url: Optional[str] = ""
70
+ updated_at: Optional[datetime]
71
+
72
+ @field_validator("violation_code")
73
+ @classmethod
74
+ def code_not_blank(cls, value: str) -> str:
75
+ if not value:
76
+ raise ValueError("violation_code is required")
77
+ return value
78
+
79
+
80
+ class ProcedureRecord(BaseModel):
81
+ title: str
82
+ domain: Optional[str] = ""
83
+ level: Optional[str] = ""
84
+ conditions: Optional[str] = ""
85
+ dossier: Optional[str] = ""
86
+ fee: Optional[str] = ""
87
+ duration: Optional[str] = ""
88
+ authority: Optional[str] = ""
89
+ source_url: Optional[str] = ""
90
+ updated_at: Optional[datetime]
91
+
92
+ @field_validator("title")
93
+ @classmethod
94
+ def title_not_blank(cls, value: str) -> str:
95
+ if not value:
96
+ raise ValueError("title is required")
97
+ return value
98
+
99
+
100
+ class AdvisoryRecord(BaseModel):
101
+ title: str
102
+ summary: str
103
+ source_url: Optional[str] = ""
104
+ published_at: Optional[date]
105
+
106
+ @field_validator("title")
107
+ @classmethod
108
+ def title_not_blank(cls, value: str) -> str:
109
+ if not value:
110
+ raise ValueError("title is required")
111
+ return value
112
+
113
+ @field_validator("summary")
114
+ @classmethod
115
+ def summary_not_blank(cls, value: str) -> str:
116
+ if not value:
117
+ raise ValueError("summary is required")
118
+ return value
119
+
120
+
121
+ def parse_datetime(value: Optional[str]) -> Optional[datetime]:
122
+ if not value:
123
+ return None
124
+ for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%d/%m/%Y"):
125
+ try:
126
+ return datetime.strptime(value, fmt)
127
+ except ValueError:
128
+ continue
129
+ try:
130
+ return datetime.fromisoformat(value)
131
+ except ValueError:
132
+ return None
133
+
134
+
135
+ def parse_date(value: Optional[str]) -> Optional[datetime]:
136
+ """Parse date string to datetime.date object (for Advisory.published_at)"""
137
+ if not value:
138
+ return None
139
+ for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%d/%m/%Y"):
140
+ try:
141
+ dt = datetime.strptime(value, fmt)
142
+ return dt.date()
143
+ except ValueError:
144
+ continue
145
+ return None
146
+
147
+
148
+ def log_error(file_handle, dataset: str, row: Dict[str, str], error: str) -> None:
149
+ file_handle.write(
150
+ f"[{datetime.utcnow().isoformat()}Z] dataset={dataset} error={error} row={row}\n"
151
+ )
152
+
153
+
154
+ def should_skip(updated_at: Optional[datetime], since: Optional[datetime]) -> bool:
155
+ if not since or not updated_at:
156
+ return False
157
+ return updated_at < since
158
+
159
+
160
+ def load_offices(since: Optional[datetime], dry_run: bool, log_file) -> int:
161
+ path = DATA_DIR / "danh_ba_diem_tiep_dan.csv"
162
+ if not path.exists():
163
+ log_error(log_file, "offices", {}, f"File không tồn tại: {path}")
164
+ return 0
165
+
166
+ processed = 0
167
+ with path.open(encoding="utf-8") as handle:
168
+ reader = csv.DictReader(handle)
169
+ for row in reader:
170
+ row = {k: (v or "").strip() for k, v in row.items()}
171
+ for key in ["latitude", "longitude"]:
172
+ if row.get(key) == "":
173
+ row[key] = None
174
+ row["updated_at"] = parse_datetime(row.get("updated_at"))
175
+ try:
176
+ record = OfficeRecord(**row)
177
+ except ValidationError as exc:
178
+ log_error(log_file, "offices", row, str(exc))
179
+ continue
180
+
181
+ if should_skip(record.updated_at, since):
182
+ continue
183
+
184
+ processed += 1
185
+ if dry_run:
186
+ continue
187
+
188
+ Office.objects.update_or_create(
189
+ unit_name=record.unit_name,
190
+ defaults={
191
+ "address": record.address or "",
192
+ "district": record.district or "",
193
+ "working_hours": record.working_hours or "",
194
+ "phone": record.phone or "",
195
+ "email": record.email or "",
196
+ "latitude": record.latitude,
197
+ "longitude": record.longitude,
198
+ "service_scope": record.service_scope or "",
199
+ },
200
+ )
201
+ return processed
202
+
203
+
204
+ def load_fines(since: Optional[datetime], dry_run: bool, log_file) -> int:
205
+ path = DATA_DIR / "muc_phat_theo_hanh_vi.csv"
206
+ if not path.exists():
207
+ log_error(log_file, "fines", {}, f"File không tồn tại: {path}")
208
+ return 0
209
+
210
+ processed = 0
211
+ with path.open(encoding="utf-8") as handle:
212
+ reader = csv.DictReader(handle)
213
+ for row in reader:
214
+ row = {k: (v or "").strip() for k, v in row.items()}
215
+ for key in ["min_fine", "max_fine"]:
216
+ if row.get(key) == "":
217
+ row[key] = None
218
+ row["updated_at"] = parse_datetime(row.get("updated_at"))
219
+ try:
220
+ record = FineRecord(**row)
221
+ except ValidationError as exc:
222
+ log_error(log_file, "fines", row, str(exc))
223
+ continue
224
+
225
+ if should_skip(record.updated_at, since):
226
+ continue
227
+
228
+ processed += 1
229
+ if dry_run:
230
+ continue
231
+
232
+ Fine.objects.update_or_create(
233
+ code=record.violation_code,
234
+ defaults={
235
+ "name": record.violation_name or "",
236
+ "article": record.article or "",
237
+ "decree": record.decree or "",
238
+ "min_fine": record.min_fine,
239
+ "max_fine": record.max_fine,
240
+ "license_points": record.license_points or "",
241
+ "remedial": record.remedial_measures or "",
242
+ "source_url": record.source_url or "",
243
+ },
244
+ )
245
+ return processed
246
+
247
+
248
+ def load_procedures(since: Optional[datetime], dry_run: bool, log_file) -> int:
249
+ path = DATA_DIR / "thu_tuc_hanh_chinh.csv"
250
+ if not path.exists():
251
+ log_error(log_file, "procedures", {}, f"File không tồn tại: {path}")
252
+ return 0
253
+
254
+ processed = 0
255
+ with path.open(encoding="utf-8") as handle:
256
+ reader = csv.DictReader(handle)
257
+ for row in reader:
258
+ # Clean row: ensure keys and values are strings
259
+ clean_row = {}
260
+ for k, v in row.items():
261
+ key = str(k).strip() if k else ""
262
+ value = (v.strip() if isinstance(v, str) else str(v or "")) if v else ""
263
+ clean_row[key] = value
264
+ clean_row["updated_at"] = parse_datetime(clean_row.get("updated_at"))
265
+ try:
266
+ record = ProcedureRecord(**clean_row)
267
+ except ValidationError as exc:
268
+ log_error(log_file, "procedures", clean_row, str(exc))
269
+ continue
270
+
271
+ if should_skip(record.updated_at, since):
272
+ continue
273
+
274
+ processed += 1
275
+ if dry_run:
276
+ continue
277
+
278
+ Procedure.objects.update_or_create(
279
+ title=record.title,
280
+ domain=record.domain or "",
281
+ defaults={
282
+ "level": record.level or "",
283
+ "conditions": record.conditions or "",
284
+ "dossier": record.dossier or "",
285
+ "fee": record.fee or "",
286
+ "duration": record.duration or "",
287
+ "authority": record.authority or "",
288
+ "source_url": record.source_url or "",
289
+ },
290
+ )
291
+ return processed
292
+
293
+
294
+ def load_advisories(since: Optional[datetime], dry_run: bool, log_file) -> int:
295
+ path = DATA_DIR / "canh_bao_lua_dao.csv"
296
+ if not path.exists():
297
+ log_error(log_file, "advisories", {}, f"File không tồn tại: {path}")
298
+ return 0
299
+
300
+ processed = 0
301
+ with path.open(encoding="utf-8") as handle:
302
+ reader = csv.DictReader(handle)
303
+ for row in reader:
304
+ # Clean row: ensure keys and values are strings
305
+ clean_row = {}
306
+ for k, v in row.items():
307
+ key = str(k).strip() if k else ""
308
+ value = (v.strip() if isinstance(v, str) else str(v or "")) if v else ""
309
+ clean_row[key] = value
310
+ clean_row["published_at"] = parse_date(clean_row.get("published_at"))
311
+ try:
312
+ record = AdvisoryRecord(**clean_row)
313
+ except ValidationError as exc:
314
+ log_error(log_file, "advisories", clean_row, str(exc))
315
+ continue
316
+
317
+ # Advisory không có updated_at, chỉ check published_at nếu since được set
318
+ if since and record.published_at:
319
+ if record.published_at < since.date():
320
+ continue
321
+
322
+ processed += 1
323
+ if dry_run:
324
+ continue
325
+
326
+ Advisory.objects.update_or_create(
327
+ title=record.title,
328
+ defaults={
329
+ "summary": record.summary or "",
330
+ "source_url": record.source_url or "",
331
+ "published_at": record.published_at,
332
+ },
333
+ )
334
+ return processed
335
+
336
+
337
+ def parse_args():
338
+ parser = argparse.ArgumentParser(description="ETL dữ liệu chatbot")
339
+ parser.add_argument("--since", help="Chỉ xử lý bản ghi có updated_at >= giá trị này (ISO date)")
340
+ parser.add_argument("--dry-run", action="store_true", help="Chỉ kiểm tra dữ liệu, không ghi vào DB")
341
+ parser.add_argument("--datasets", nargs="*", default=["offices", "fines"], choices=["offices", "fines", "procedures", "advisories"], help="Chọn dataset cần nạp")
342
+ return parser.parse_args()
343
+
344
+
345
+ def main():
346
+ args = parse_args()
347
+ since = parse_datetime(args.since) if args.since else None
348
+ log_path = LOG_DIR / f"etl_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}.log"
349
+
350
+ with log_path.open("a", encoding="utf-8") as log_file:
351
+ if "offices" in args.datasets:
352
+ total = load_offices(since, args.dry_run, log_file)
353
+ print(f"Offices processed: {total}")
354
+ if "fines" in args.datasets:
355
+ total = load_fines(since, args.dry_run, log_file)
356
+ print(f"Fines processed: {total}")
357
+ if "procedures" in args.datasets:
358
+ total = load_procedures(since, args.dry_run, log_file)
359
+ print(f"Procedures processed: {total}")
360
+ if "advisories" in args.datasets:
361
+ total = load_advisories(since, args.dry_run, log_file)
362
+ print(f"Advisories processed: {total}")
363
+
364
+ print(f"Log ghi tại {log_path}")
365
+
366
+
367
+ if __name__ == "__main__":
368
+ main()