""" Batch orchestration: URL processing runs in a background thread so the generator can yield every 100ms for live timer updates. """ import csv import logging import os import threading import time from datetime import datetime from batch_store import create_batch, finish_batch, update_url from config import LOG_DIR from logger import logger from main import process_article from smtp_mailer import send_batch_summary def _safe_name(batch_name): return "".join(c if c.isalnum() or c in "-_" else "_" for c in batch_name.strip()) def _make_batch_id(batch_name): ts = datetime.now().strftime("%Y-%m-%d-%H%M%S") return f"{_safe_name(batch_name)[:40]}-{ts}" def _make_log_path(batch_name): date_str = datetime.now().strftime("%Y-%m-%d") filename = f"{_safe_name(batch_name)}_{date_str}.log" return os.path.join(LOG_DIR, filename), date_str def _fmt(seconds): """Format seconds as MM:SS.T (e.g. 01:23.4).""" m = int(seconds // 60) s = seconds % 60 tenths = int(s * 10) % 10 return f"{m:02d}:{int(s):02d}.{tenths}" def _read_status_rows(urls): """Read production_status.csv and return a dict keyed by URL.""" csv_path = os.path.join(LOG_DIR, "production_status.csv") url_set = set(urls) data = {} if not os.path.exists(csv_path): return data try: with open(csv_path, newline="", encoding="utf-8") as f: for row in csv.DictReader(f): if row.get("url") in url_set: data[row["url"]] = row except Exception: pass return data def run_batch(batch_name, urls, email_recipient): """ Generator — yields (rows, status_msg, log_path|None, batch_id, elapsed_str, is_done) every 100ms. URL processing runs in a daemon thread; the generator just streams the shared state on each tick. rows columns: [#, URL, Status, Details, New Post URL, Time] elapsed_str: "MM:SS.T" stopwatch string is_done: True on the final yield (after email is sent) """ batch_id = _make_batch_id(batch_name) log_path, date_str = _make_log_path(batch_name) batch_start = time.time() handler = logging.FileHandler(log_path, encoding="utf-8") handler.setLevel(logging.DEBUG) handler.setFormatter( logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S") ) logger.addHandler(handler) logger.info(f"=== Batch '{batch_name}' started — {len(urls)} URL(s) ===") create_batch(batch_id, batch_name, email_recipient, urls) lock = threading.Lock() rows = [[i + 1, url, "Pending", "—", "", "—"] for i, url in enumerate(urls)] state = { "msg": f"Starting '{batch_name}' — {len(urls)} URL(s)...", "current_i": -1, # index of URL currently being processed "url_start": None, # time.time() when current URL started "done": False, "log_path": None, } def worker(): succeeded = 0 failed = 0 results = [] try: for i, url in enumerate(urls): url_start = time.time() with lock: rows[i][2] = "In progress" rows[i][3] = "Scraping + AI rewrite..." rows[i][5] = "0.0s" state["current_i"] = i state["url_start"] = url_start state["msg"] = f"[{i + 1}/{len(urls)}] {url[:70]}" update_url(batch_id, i + 1, "In progress", "Scraping + AI rewrite...") try: post_info = process_article(url) post_url = post_info.get("url") or post_info.get("edit_url", "") url_time = f"{time.time() - url_start:.1f}s" with lock: rows[i][2] = "Done" rows[i][3] = "All steps OK" rows[i][4] = post_url rows[i][5] = url_time succeeded += 1 state["msg"] = ( f"[{i + 1}/{len(urls)}] Done — {succeeded} ok, {failed} failed" ) update_url(batch_id, i + 1, "Done", "All steps OK", post_url, url_time) results.append({"url": url, "status": "success", "post_url": post_url}) except Exception as e: error_msg = str(e)[:100] url_time = f"{time.time() - url_start:.1f}s" with lock: rows[i][2] = "Failed" rows[i][3] = error_msg rows[i][4] = "" rows[i][5] = url_time failed += 1 state["msg"] = ( f"[{i + 1}/{len(urls)}] Failed — {succeeded} ok, {failed} failed" ) update_url(batch_id, i + 1, "Failed", error_msg, "", url_time) results.append({"url": url, "status": "failed", "error": error_msg}) with lock: state["current_i"] = -1 state["url_start"] = None final_status = ( "completed" if failed == 0 else ("failed" if succeeded == 0 else "partial") ) finish_batch(batch_id, final_status) logger.info( f"=== Batch '{batch_name}' complete — {succeeded}/{len(urls)} succeeded ===" ) # Enrich results with title/new_title from production_status.csv csv_data = _read_status_rows(urls) enriched = [] for r in results: csv_row = csv_data.get(r["url"], {}) enriched.append({ **r, "title": csv_row.get("title", ""), "new_title": csv_row.get("new_title", ""), "new_post_url": csv_row.get("new_post_url") or r.get("post_url", ""), "failure_step": csv_row.get("failure_step", ""), }) email_note = "" try: send_batch_summary(batch_name, date_str, enriched, email_recipient, log_path) email_note = f" Email sent to {email_recipient}." logger.info(f"Summary email sent to {email_recipient}") except ValueError as e: logger.warning(str(e)) email_note = f" Email skipped: {e}" except Exception as e: logger.error(f"Email send failed: {e}") email_note = f" Email failed: {e}" handler.flush() handler.close() logger.removeHandler(handler) with lock: state["msg"] = ( f"Batch '{batch_name}' complete — " f"{succeeded}/{len(urls)} succeeded, {failed} failed.{email_note}" ) state["log_path"] = log_path state["done"] = True except Exception as exc: logger.exception(f"Worker thread crashed: {exc}") try: handler.flush() handler.close() logger.removeHandler(handler) except Exception: pass with lock: state["msg"] = f"Batch crashed: {exc}" state["log_path"] = log_path state["done"] = True thread = threading.Thread(target=worker, daemon=True) thread.start() # Tick every 100ms and stream current state while True: time.sleep(0.1) elapsed = _fmt(time.time() - batch_start) with lock: current_rows = [r[:] for r in rows] msg = state["msg"] current_i = state["current_i"] url_start_t = state["url_start"] done = state["done"] final_log = state["log_path"] # Live-tick the in-progress row's Time cell if current_i >= 0 and url_start_t is not None: current_rows[current_i][5] = f"{time.time() - url_start_t:.1f}s" yield current_rows, msg, final_log if done else None, batch_id, elapsed, done if done: break