anchordashboard / app.py
ntdservices's picture
Update app.py
dbf1685 verified
# updated app.py with cleaned full pdf text output
import os
import uuid
import time
import shutil
import threading
import schedule
import datetime
from pydub import AudioSegment
import re
from gradio_client import Client
import uuid # you already import, keep if present
import fitz
from flask import Flask, request, send_file, render_template, jsonify, session
from google.oauth2 import service_account
from googleapiclient.discovery import build
from pdf_utils import (
pdf_to_text_cleaned,
extract_non_dictionary_words,
DEFAULT_PRONUNCIATION_PATH,
)
from pdf_utils_finalclean_airmac_final import pdf_to_final_cleaned_text
from pdf_utils_finalclean_NYmac_final import pdf_to_final_cleaned_text as ny_cleaned_text
app = Flask(__name__)
app.secret_key = os.urandom(24)
UPLOAD_FOLDER = "uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
import json
GOOGLE_SECRET_JSON = os.getenv("GOOGLE_CREDENTIALS_JSON")
SERVICE_ACCOUNT_FILE = "credentials.json"
if GOOGLE_SECRET_JSON:
with open(SERVICE_ACCOUNT_FILE, "w", encoding="utf-8") as f:
f.write(GOOGLE_SECRET_JSON)
SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly"]
SPREADSHEET_ID = "1o_LfUI3FaNH_ZGJ7iMAsOSJw1Sw3z12n7nLSXE60LKs"
RANGE_NAME = "Sheet1!A:D"
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES
)
@app.before_request
def assign_session_folder():
if "session_id" not in session:
session["session_id"] = str(uuid.uuid4())
session_path = os.path.join(UPLOAD_FOLDER, session["session_id"])
os.makedirs(session_path, exist_ok=True)
session["session_path"] = session_path
def fetch_sheet_rows_and_pronunciations(retries=5, delay=3):
for attempt in range(retries):
try:
service = build("sheets", "v4", credentials=credentials)
result = service.spreadsheets().values().get(
spreadsheetId=SPREADSHEET_ID, range=RANGE_NAME
).execute()
rows = result.get("values", [])
pron_map = {}
for r in rows[1:]:
if len(r) >= 4:
word = r[2].strip().lower()
pron = r[3].strip()
if word:
pron_map[word] = pron
return rows, pron_map
except Exception as e:
time.sleep(delay)
return [], {}
def call_tts(client, chunk, voice, rate, pitch):
"""Try TTS with retries + fallback signature."""
for attempt in range(3):
try:
try:
return client.predict(chunk, voice, rate, pitch, api_name="/tts_interface")
except Exception:
# Some Spaces only accept (text, voice)
return client.predict(chunk, voice, api_name="/tts_interface")
except Exception:
time.sleep(1.2 * (attempt + 1))
raise RuntimeError("Upstream TTS failed after retries")
# REPLACE your existing /tts with this
@app.route("/tts", methods=["POST"])
def tts():
data = request.get_json(force=True)
text_input = (data.get("text") or "").strip()
if not text_input:
return jsonify({"error": "No text provided"}), 400
voice = (data.get("voice") or "en-US-AriaNeural - en-US (Female)").strip()
try: rate = int(data.get("rate", 0))
except: rate = 0
try: pitch = int(data.get("pitch", 0))
except: pitch = 0
rate = max(-50, min(50, rate))
pitch = max(-20, min(20, pitch))
# smaller chunks for stability
parts = chunk_text(text_input, 1800)
client = Client("https://altafo-free-tts-unlimted-words.hf.space/")
os.makedirs("static", exist_ok=True)
out_name = f"{uuid.uuid4().hex}.mp3"
out_path = os.path.join("static", out_name)
# single-chunk fast path
if len(parts) == 1:
try:
result = call_tts(client, parts[0], voice, rate, pitch)
except Exception as e:
app.logger.exception("TTS failed")
return jsonify({"error": str(e)}), 502
tmp_mp3 = result[0]
if not os.path.exists(tmp_mp3):
return jsonify({"error": "TTS failed"}), 500
shutil.copy(tmp_mp3, out_path)
return jsonify({"url": f"/static/{out_name}", "chunks": 1})
# multi-chunk path (note the indentation under `else:`)
else:
combined = None
for i, chunk in enumerate(parts, 1):
try:
result = call_tts(client, chunk, voice, rate, pitch)
except Exception as e:
app.logger.exception("TTS failed")
return jsonify({"error": str(e)}), 502
tmp_mp3 = result[0]
seg = AudioSegment.from_file(tmp_mp3, format="mp3")
combined = seg if combined is None else (combined + seg)
time.sleep(0.4) # small pause between chunk requests
combined.export(out_path, format="mp3")
return jsonify({"url": f"/static/{out_name}", "chunks": len(parts)})
# ======= New from index-cleaning app =======
def extract_text_from_pdf(path):
doc = fitz.open(path)
return "\n".join(page.get_text() for page in doc)
def clean_body_script(text: str) -> str:
text = re.sub(r"NTD TODAY.*", "", text)
text = re.sub(r"http.*?ntdtv\.com.*", "", text)
text = re.sub(r"\d+ of \d+", "", text)
text = re.sub(r"\d{1,2}/\d{1,2}/\d{4}.*?(AM|PM)", "", text)
text = re.sub(r"^[A-Z]\d{2,3}[\s\-A-Z0-9_]*\n?", "", text, flags=re.MULTILINE)
text = re.sub(r"\[\s*[A-Z0-9\-_\s]+\s*\]", "", text)
kill_keywords = [
"DAY BREAK", "EVE BREAK", "DAY OPEN",
"COMMERCIAL_DAY_", "NTD COMMERCIAL",
"ENG_", "UK_",
"BLACK_OPEN_DIRECTOR_ONLY",
"mixA", "mixB",
"zzz-", "zzz_",
"Start:", "End:", "Printed:",
"TAKE SOT", "CAM1", "CAM2", "CAM3", "PKG", "VO",
"o Content",
]
for kw in kill_keywords:
text = re.sub(rf"^.*{re.escape(kw)}.*\n?", "", text, flags=re.MULTILINE)
text = re.sub(r"^[ABE]\s*$\n?", "", text, flags=re.MULTILINE)
text = re.sub(r"^\[\s*\]\s*\n?", "", text, flags=re.MULTILINE)
text = re.sub(r"\n\s*\n", "\n", text)
return text.strip()
# ============================================
# ADD
def chunk_text(text: str, max_len: int = 9000):
s = re.sub(r'\s+', ' ', text).strip()
chunks, start, n = [], 0, len(s)
while start < n:
end = min(start + max_len, n)
if end == n:
chunks.append(s[start:end].strip()); break
window = s[start:end]
# prefer cutting at sentence boundaries
cuts = [window.rfind(x) for x in ('. ', '! ', '? ', '.” ', '.”', '!’ ', '!’', '?” ', '?”', '\n\n')]
cut = max(c for c in cuts if c is not None)
if cut == -1:
cut = len(window)
chunk = window[:cut].strip() or window.strip()
chunks.append(chunk)
start += len(chunk)
return chunks
@app.route("/")
def index():
return render_template("index.html")
@app.route("/sheet")
def sheet_json():
rows, _ = fetch_sheet_rows_and_pronunciations()
return jsonify(rows)
@app.route("/upload", methods=["POST"])
def upload_pdf():
file = request.files.get("pdf")
if not file:
return "No PDF supplied", 400
user_folder = session["session_path"]
file_id = str(uuid.uuid4())
pdf_path = os.path.join(user_folder, f"{file_id}.pdf")
txt_out = os.path.join(user_folder, f"{file_id}.txt")
os.makedirs(user_folder, exist_ok=True)
file.save(pdf_path)
raw_text = pdf_to_text_cleaned(pdf_path)
cleaned_text = clean_body_script(extract_text_from_pdf(pdf_path))
final_cleaned_text = pdf_to_final_cleaned_text(pdf_path)
ny_final_cleaned_text = ny_cleaned_text(pdf_path)
_, sheet_pron = fetch_sheet_rows_and_pronunciations()
extract_non_dictionary_words(
raw_text,
dictionary_path="words.txt",
output_txt_path=txt_out,
pronunciation_path=DEFAULT_PRONUNCIATION_PATH,
extra_pronunciations=sheet_pron,
)
output_txt = os.path.join(user_folder, "all_unfamiliar.txt")
sections = {"non_dict": [], "long": [], "least": []}
current = None
with open(txt_out, "r", encoding="utf-8") as f:
for raw in f:
line = raw.rstrip("\n")
if line.startswith("non-dictionary words"):
current = "non_dict"
continue
if line.startswith("long words"):
current = "long"
continue
if line.startswith("least frequent"):
current = "least"
continue
if not line.strip():
current = None
continue
if current:
sections[current].append(line)
with open(output_txt, "w", encoding="utf-8") as f:
f.write("non-dictionary words\n")
for w in sections["non_dict"]:
f.write(f"{w}\n")
f.write("\nlong words\n")
for w in sections["long"]:
f.write(f"{w}\n")
f.write("\nleast frequent 150 known words (dictionary ∩ en_full.txt)\n")
for w in sections["least"]:
f.write(f"{w}\n")
# Final debug output of what files are present
file_list = os.listdir(user_folder)
return {
"download_url": f"/download/{file_id}",
"pdf_text": cleaned_text,
"final_pdf_text": final_cleaned_text,
"ny_final_pdf_text": ny_final_cleaned_text,
}
@app.route("/download/<file_id>")
def download_output(file_id):
user_folder = session.get("session_path")
if not user_folder:
return "Session expired", 400
output_path = os.path.join(user_folder, "all_unfamiliar.txt")
if not os.path.isfile(output_path):
return "File not found", 404
return send_file(output_path, as_attachment=True, download_name="output_words.txt")
@app.route("/cleanup", methods=["POST"])
def cleanup_session():
user_folder = session.get("session_path")
if user_folder and os.path.exists(user_folder):
shutil.rmtree(user_folder, ignore_errors=True)
return "", 204
def delete_old_upload_folders(base_path="uploads", max_age=3600):
now = time.time()
for folder in os.listdir(base_path):
fpath = os.path.join(base_path, folder)
if os.path.isdir(fpath) and now - os.path.getmtime(fpath) > max_age:
shutil.rmtree(fpath, ignore_errors=True)
def daily_midnight_cleanup():
print(f"[{datetime.datetime.now()}] Running midnight cleanup...")
delete_old_upload_folders()
def run_scheduler():
schedule.every().day.at("00:00").do(daily_midnight_cleanup)
while True:
schedule.run_pending()
time.sleep(60)
shutil.rmtree("uploads", ignore_errors=True)
os.makedirs("uploads", exist_ok=True)
threading.Thread(target=run_scheduler, daemon=True).start()
@app.route("/list-uploads", methods=["GET"])
def list_uploads():
try:
session_path = session.get("session_path")
if not session_path or not os.path.isdir(session_path):
return jsonify({"error": "Session folder not found"}), 404
files = os.listdir(session_path)
return jsonify({"files": files})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/ping")
def ping():
return "pong", 200
if __name__ == "__main__":
print("👨‍💻 Flask dev server running (__main__)")
app.run(host="0.0.0.0", port=7860)