| import os |
| import sqlite3 |
| import json |
| import csv |
| import hashlib |
| from datetime import datetime |
| from huggingface_hub import HfApi, hf_hub_download |
|
|
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| REPO_ID = "Imsidag-community/libretranslate-suggestions" |
| DEST_JSON_PATH_IN_REPO = "suggestions.json" |
| DEST_CSV_PATH_IN_REPO = "suggestions.csv" |
| REPO_TYPE = "dataset" |
|
|
| JSON_OUTPUT_PATH = "/tmp/suggestions.json" |
| CSV_OUTPUT_PATH = "/tmp/suggestions.csv" |
| CHECKSUM_FILE_JSON = "/tmp/.last_suggestions_json_checksum" |
| CHECKSUM_FILE_CSV = "/tmp/.last_suggestions_csv_checksum" |
|
|
| possible_paths = [ |
| "/app/db/suggestions.db", |
| "/app/suggestions.db", |
| "/root/.local/share/db/suggestions.db", |
| "/home/libretranslate/.local/share/db/suggestions.db" |
| ] |
|
|
| def find_db(): |
| print(f"Running in CWD: {os.getcwd()}") |
| for path in possible_paths: |
| if os.path.exists(path): |
| print(f"Found suggestions.db at {path}") |
| return path |
| print("suggestions.db not found in any known path.") |
| return None |
|
|
| def extract_suggestions(db_path): |
| suggestions = [] |
| try: |
| conn = sqlite3.connect(db_path) |
| cursor = conn.cursor() |
| cursor.execute("SELECT q, s, source, target FROM suggestions") |
| rows = cursor.fetchall() |
| conn.close() |
|
|
| for row in rows: |
| unique_id = hashlib.md5((row[0] + row[1] + row[2] + row[3]).encode()).hexdigest() |
| suggestions.append({ |
| "id": unique_id, |
| "source_text": row[0], |
| "suggested_text": row[1], |
| "source_lang": row[2], |
| "target_lang": row[3], |
| "timestamp": datetime.now().isoformat() |
| }) |
| except sqlite3.Error as e: |
| print(f"SQLite error: {e}") |
| return suggestions |
|
|
| def download_existing_json(): |
| try: |
| path = hf_hub_download( |
| repo_id=REPO_ID, |
| repo_type=REPO_TYPE, |
| filename=DEST_JSON_PATH_IN_REPO, |
| token=HF_TOKEN, |
| local_dir="/tmp" |
| ) |
| print("Downloaded existing suggestions from Hugging Face.") |
| return path |
| except Exception as e: |
| print(f"Could not fetch existing suggestions from HF: {e}") |
| return None |
|
|
| def merge_with_existing(suggestions, existing_json_path): |
| existing = {} |
|
|
| if existing_json_path and os.path.exists(existing_json_path): |
| try: |
| with open(existing_json_path, "r", encoding="utf-8") as f: |
| for item in json.load(f): |
| existing[item["id"]] = { |
| "source_text": item["source_text"], |
| "suggested_text": item["suggested_text"], |
| "source_lang": item["source_lang"], |
| "target_lang": item["target_lang"], |
| "timestamp": item.get("timestamp", datetime.now().isoformat()) |
| } |
| except Exception as e: |
| print(f"Failed to read existing JSON: {e}") |
|
|
| changed = False |
| for s in suggestions: |
| s_clean = { |
| "source_text": s["source_text"], |
| "suggested_text": s["suggested_text"], |
| "source_lang": s["source_lang"], |
| "target_lang": s["target_lang"], |
| } |
|
|
| existing_entry = existing.get(s["id"]) |
| if not existing_entry: |
| changed = True |
| existing[s["id"]] = {**s_clean, "timestamp": datetime.now().isoformat()} |
|
|
| if not changed: |
| print("No new suggestions — skipping write/upload.") |
| return None |
|
|
| |
| final = [] |
| for id_, data in existing.items(): |
| final.append({**data, "id": id_}) |
|
|
| with open(JSON_OUTPUT_PATH, "w", encoding="utf-8") as f: |
| json.dump(final, f, indent=2, ensure_ascii=False) |
|
|
| |
| write_csv(final, CSV_OUTPUT_PATH) |
|
|
| return JSON_OUTPUT_PATH |
|
|
| def write_csv(suggestions, csv_path): |
| with open(csv_path, "w", newline="", encoding="utf-8") as csvfile: |
| writer = csv.DictWriter(csvfile, fieldnames=[ |
| "id", "source_text", "suggested_text", "source_lang", "target_lang", "timestamp" |
| ]) |
| writer.writeheader() |
| for item in suggestions: |
| writer.writerow(item) |
|
|
| def get_checksum(filepath): |
| if not os.path.exists(filepath): |
| return None |
| with open(filepath, "rb") as f: |
| return hashlib.md5(f.read()).hexdigest() |
|
|
| def upload_if_updated(filepath, dest_path, checksum_file): |
| if not filepath or not os.path.exists(filepath): |
| return |
|
|
| new_checksum = get_checksum(filepath) |
| old_checksum = None |
|
|
| if os.path.exists(checksum_file): |
| with open(checksum_file, "r") as f: |
| old_checksum = f.read().strip() |
|
|
| if new_checksum != old_checksum: |
| print(f"Uploading updated {os.path.basename(dest_path)} to Hugging Face...") |
| try: |
| api = HfApi() |
| api.upload_file( |
| path_or_fileobj=filepath, |
| path_in_repo=dest_path, |
| repo_id=REPO_ID, |
| repo_type=REPO_TYPE, |
| token=HF_TOKEN |
| ) |
| with open(checksum_file, "w") as f: |
| f.write(new_checksum) |
| print(f"Upload successful: {dest_path} at {datetime.now().isoformat()}") |
| except Exception as e: |
| print(f"Upload failed for {dest_path}:", e) |
| else: |
| print(f"No changes in {os.path.basename(dest_path)} — skipping upload.") |
|
|
| def main(): |
| print(f"===== Application Startup at {datetime.now().isoformat()} =====") |
|
|
| if not HF_TOKEN: |
| print("HF_TOKEN not set — skipping upload.") |
| return |
|
|
| db_path = find_db() |
| if not db_path: |
| return |
|
|
| suggestions = extract_suggestions(db_path) |
| if not suggestions: |
| print("No suggestions found — skipping.") |
| return |
|
|
| existing_path = download_existing_json() |
| merged_json = merge_with_existing(suggestions, existing_path) |
|
|
| if merged_json: |
| upload_if_updated(JSON_OUTPUT_PATH, DEST_JSON_PATH_IN_REPO, CHECKSUM_FILE_JSON) |
| upload_if_updated(CSV_OUTPUT_PATH, DEST_CSV_PATH_IN_REPO, CHECKSUM_FILE_CSV) |
|
|
| if __name__ == "__main__": |
| main() |
|
|