Spaces:
Sleeping
Sleeping
| # updated app.py with cleaned full pdf text output | |
| import os | |
| import uuid | |
| import time | |
| import shutil | |
| import threading | |
| import schedule | |
| import datetime | |
| from pydub import AudioSegment | |
| import re | |
| from gradio_client import Client | |
| import uuid # you already import, keep if present | |
| import fitz | |
| from flask import Flask, request, send_file, render_template, jsonify, session | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from pdf_utils import ( | |
| pdf_to_text_cleaned, | |
| extract_non_dictionary_words, | |
| DEFAULT_PRONUNCIATION_PATH, | |
| ) | |
| from pdf_utils_finalclean_airmac_final import pdf_to_final_cleaned_text | |
| from pdf_utils_finalclean_NYmac_final import pdf_to_final_cleaned_text as ny_cleaned_text | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) | |
| UPLOAD_FOLDER = "uploads" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| import json | |
| GOOGLE_SECRET_JSON = os.getenv("GOOGLE_CREDENTIALS_JSON") | |
| SERVICE_ACCOUNT_FILE = "credentials.json" | |
| if GOOGLE_SECRET_JSON: | |
| with open(SERVICE_ACCOUNT_FILE, "w", encoding="utf-8") as f: | |
| f.write(GOOGLE_SECRET_JSON) | |
| SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly"] | |
| SPREADSHEET_ID = "1o_LfUI3FaNH_ZGJ7iMAsOSJw1Sw3z12n7nLSXE60LKs" | |
| RANGE_NAME = "Sheet1!A:D" | |
| credentials = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_FILE, scopes=SCOPES | |
| ) | |
| def assign_session_folder(): | |
| if "session_id" not in session: | |
| session["session_id"] = str(uuid.uuid4()) | |
| session_path = os.path.join(UPLOAD_FOLDER, session["session_id"]) | |
| os.makedirs(session_path, exist_ok=True) | |
| session["session_path"] = session_path | |
| def fetch_sheet_rows_and_pronunciations(retries=5, delay=3): | |
| for attempt in range(retries): | |
| try: | |
| service = build("sheets", "v4", credentials=credentials) | |
| result = service.spreadsheets().values().get( | |
| spreadsheetId=SPREADSHEET_ID, range=RANGE_NAME | |
| ).execute() | |
| rows = result.get("values", []) | |
| pron_map = {} | |
| for r in rows[1:]: | |
| if len(r) >= 4: | |
| word = r[2].strip().lower() | |
| pron = r[3].strip() | |
| if word: | |
| pron_map[word] = pron | |
| return rows, pron_map | |
| except Exception as e: | |
| time.sleep(delay) | |
| return [], {} | |
| def call_tts(client, chunk, voice, rate, pitch): | |
| """Try TTS with retries + fallback signature.""" | |
| for attempt in range(3): | |
| try: | |
| try: | |
| return client.predict(chunk, voice, rate, pitch, api_name="/tts_interface") | |
| except Exception: | |
| # Some Spaces only accept (text, voice) | |
| return client.predict(chunk, voice, api_name="/tts_interface") | |
| except Exception: | |
| time.sleep(1.2 * (attempt + 1)) | |
| raise RuntimeError("Upstream TTS failed after retries") | |
| # REPLACE your existing /tts with this | |
| def tts(): | |
| data = request.get_json(force=True) | |
| text_input = (data.get("text") or "").strip() | |
| if not text_input: | |
| return jsonify({"error": "No text provided"}), 400 | |
| voice = (data.get("voice") or "en-US-AriaNeural - en-US (Female)").strip() | |
| try: rate = int(data.get("rate", 0)) | |
| except: rate = 0 | |
| try: pitch = int(data.get("pitch", 0)) | |
| except: pitch = 0 | |
| rate = max(-50, min(50, rate)) | |
| pitch = max(-20, min(20, pitch)) | |
| # smaller chunks for stability | |
| parts = chunk_text(text_input, 1800) | |
| client = Client("https://altafo-free-tts-unlimted-words.hf.space/") | |
| os.makedirs("static", exist_ok=True) | |
| out_name = f"{uuid.uuid4().hex}.mp3" | |
| out_path = os.path.join("static", out_name) | |
| # single-chunk fast path | |
| if len(parts) == 1: | |
| try: | |
| result = call_tts(client, parts[0], voice, rate, pitch) | |
| except Exception as e: | |
| app.logger.exception("TTS failed") | |
| return jsonify({"error": str(e)}), 502 | |
| tmp_mp3 = result[0] | |
| if not os.path.exists(tmp_mp3): | |
| return jsonify({"error": "TTS failed"}), 500 | |
| shutil.copy(tmp_mp3, out_path) | |
| return jsonify({"url": f"/static/{out_name}", "chunks": 1}) | |
| # multi-chunk path (note the indentation under `else:`) | |
| else: | |
| combined = None | |
| for i, chunk in enumerate(parts, 1): | |
| try: | |
| result = call_tts(client, chunk, voice, rate, pitch) | |
| except Exception as e: | |
| app.logger.exception("TTS failed") | |
| return jsonify({"error": str(e)}), 502 | |
| tmp_mp3 = result[0] | |
| seg = AudioSegment.from_file(tmp_mp3, format="mp3") | |
| combined = seg if combined is None else (combined + seg) | |
| time.sleep(0.4) # small pause between chunk requests | |
| combined.export(out_path, format="mp3") | |
| return jsonify({"url": f"/static/{out_name}", "chunks": len(parts)}) | |
| # ======= New from index-cleaning app ======= | |
| def extract_text_from_pdf(path): | |
| doc = fitz.open(path) | |
| return "\n".join(page.get_text() for page in doc) | |
| def clean_body_script(text: str) -> str: | |
| text = re.sub(r"NTD TODAY.*", "", text) | |
| text = re.sub(r"http.*?ntdtv\.com.*", "", text) | |
| text = re.sub(r"\d+ of \d+", "", text) | |
| text = re.sub(r"\d{1,2}/\d{1,2}/\d{4}.*?(AM|PM)", "", text) | |
| text = re.sub(r"^[A-Z]\d{2,3}[\s\-A-Z0-9_]*\n?", "", text, flags=re.MULTILINE) | |
| text = re.sub(r"\[\s*[A-Z0-9\-_\s]+\s*\]", "", text) | |
| kill_keywords = [ | |
| "DAY BREAK", "EVE BREAK", "DAY OPEN", | |
| "COMMERCIAL_DAY_", "NTD COMMERCIAL", | |
| "ENG_", "UK_", | |
| "BLACK_OPEN_DIRECTOR_ONLY", | |
| "mixA", "mixB", | |
| "zzz-", "zzz_", | |
| "Start:", "End:", "Printed:", | |
| "TAKE SOT", "CAM1", "CAM2", "CAM3", "PKG", "VO", | |
| "o Content", | |
| ] | |
| for kw in kill_keywords: | |
| text = re.sub(rf"^.*{re.escape(kw)}.*\n?", "", text, flags=re.MULTILINE) | |
| text = re.sub(r"^[ABE]\s*$\n?", "", text, flags=re.MULTILINE) | |
| text = re.sub(r"^\[\s*\]\s*\n?", "", text, flags=re.MULTILINE) | |
| text = re.sub(r"\n\s*\n", "\n", text) | |
| return text.strip() | |
| # ============================================ | |
| # ADD | |
| def chunk_text(text: str, max_len: int = 9000): | |
| s = re.sub(r'\s+', ' ', text).strip() | |
| chunks, start, n = [], 0, len(s) | |
| while start < n: | |
| end = min(start + max_len, n) | |
| if end == n: | |
| chunks.append(s[start:end].strip()); break | |
| window = s[start:end] | |
| # prefer cutting at sentence boundaries | |
| cuts = [window.rfind(x) for x in ('. ', '! ', '? ', '.” ', '.”', '!’ ', '!’', '?” ', '?”', '\n\n')] | |
| cut = max(c for c in cuts if c is not None) | |
| if cut == -1: | |
| cut = len(window) | |
| chunk = window[:cut].strip() or window.strip() | |
| chunks.append(chunk) | |
| start += len(chunk) | |
| return chunks | |
| def index(): | |
| return render_template("index.html") | |
| def sheet_json(): | |
| rows, _ = fetch_sheet_rows_and_pronunciations() | |
| return jsonify(rows) | |
| def upload_pdf(): | |
| file = request.files.get("pdf") | |
| if not file: | |
| return "No PDF supplied", 400 | |
| user_folder = session["session_path"] | |
| file_id = str(uuid.uuid4()) | |
| pdf_path = os.path.join(user_folder, f"{file_id}.pdf") | |
| txt_out = os.path.join(user_folder, f"{file_id}.txt") | |
| os.makedirs(user_folder, exist_ok=True) | |
| file.save(pdf_path) | |
| raw_text = pdf_to_text_cleaned(pdf_path) | |
| cleaned_text = clean_body_script(extract_text_from_pdf(pdf_path)) | |
| final_cleaned_text = pdf_to_final_cleaned_text(pdf_path) | |
| ny_final_cleaned_text = ny_cleaned_text(pdf_path) | |
| _, sheet_pron = fetch_sheet_rows_and_pronunciations() | |
| extract_non_dictionary_words( | |
| raw_text, | |
| dictionary_path="words.txt", | |
| output_txt_path=txt_out, | |
| pronunciation_path=DEFAULT_PRONUNCIATION_PATH, | |
| extra_pronunciations=sheet_pron, | |
| ) | |
| output_txt = os.path.join(user_folder, "all_unfamiliar.txt") | |
| sections = {"non_dict": [], "long": [], "least": []} | |
| current = None | |
| with open(txt_out, "r", encoding="utf-8") as f: | |
| for raw in f: | |
| line = raw.rstrip("\n") | |
| if line.startswith("non-dictionary words"): | |
| current = "non_dict" | |
| continue | |
| if line.startswith("long words"): | |
| current = "long" | |
| continue | |
| if line.startswith("least frequent"): | |
| current = "least" | |
| continue | |
| if not line.strip(): | |
| current = None | |
| continue | |
| if current: | |
| sections[current].append(line) | |
| with open(output_txt, "w", encoding="utf-8") as f: | |
| f.write("non-dictionary words\n") | |
| for w in sections["non_dict"]: | |
| f.write(f"{w}\n") | |
| f.write("\nlong words\n") | |
| for w in sections["long"]: | |
| f.write(f"{w}\n") | |
| f.write("\nleast frequent 150 known words (dictionary ∩ en_full.txt)\n") | |
| for w in sections["least"]: | |
| f.write(f"{w}\n") | |
| # Final debug output of what files are present | |
| file_list = os.listdir(user_folder) | |
| return { | |
| "download_url": f"/download/{file_id}", | |
| "pdf_text": cleaned_text, | |
| "final_pdf_text": final_cleaned_text, | |
| "ny_final_pdf_text": ny_final_cleaned_text, | |
| } | |
| def download_output(file_id): | |
| user_folder = session.get("session_path") | |
| if not user_folder: | |
| return "Session expired", 400 | |
| output_path = os.path.join(user_folder, "all_unfamiliar.txt") | |
| if not os.path.isfile(output_path): | |
| return "File not found", 404 | |
| return send_file(output_path, as_attachment=True, download_name="output_words.txt") | |
| def cleanup_session(): | |
| user_folder = session.get("session_path") | |
| if user_folder and os.path.exists(user_folder): | |
| shutil.rmtree(user_folder, ignore_errors=True) | |
| return "", 204 | |
| def delete_old_upload_folders(base_path="uploads", max_age=3600): | |
| now = time.time() | |
| for folder in os.listdir(base_path): | |
| fpath = os.path.join(base_path, folder) | |
| if os.path.isdir(fpath) and now - os.path.getmtime(fpath) > max_age: | |
| shutil.rmtree(fpath, ignore_errors=True) | |
| def daily_midnight_cleanup(): | |
| print(f"[{datetime.datetime.now()}] Running midnight cleanup...") | |
| delete_old_upload_folders() | |
| def run_scheduler(): | |
| schedule.every().day.at("00:00").do(daily_midnight_cleanup) | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(60) | |
| shutil.rmtree("uploads", ignore_errors=True) | |
| os.makedirs("uploads", exist_ok=True) | |
| threading.Thread(target=run_scheduler, daemon=True).start() | |
| def list_uploads(): | |
| try: | |
| session_path = session.get("session_path") | |
| if not session_path or not os.path.isdir(session_path): | |
| return jsonify({"error": "Session folder not found"}), 404 | |
| files = os.listdir(session_path) | |
| return jsonify({"files": files}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def ping(): | |
| return "pong", 200 | |
| if __name__ == "__main__": | |
| print("👨💻 Flask dev server running (__main__)") | |
| app.run(host="0.0.0.0", port=7860) | |