Spaces:
Sleeping
Sleeping
| """ | |
| Batch orchestration: URL processing runs in a background thread so the | |
| generator can yield every 100ms for live timer updates. | |
| """ | |
| import csv | |
| import logging | |
| import os | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from batch_store import create_batch, finish_batch, update_url | |
| from config import LOG_DIR | |
| from logger import logger | |
| from main import process_article | |
| from smtp_mailer import send_batch_summary | |
| def _safe_name(batch_name): | |
| return "".join(c if c.isalnum() or c in "-_" else "_" for c in batch_name.strip()) | |
| def _make_batch_id(batch_name): | |
| ts = datetime.now().strftime("%Y-%m-%d-%H%M%S") | |
| return f"{_safe_name(batch_name)[:40]}-{ts}" | |
| def _make_log_path(batch_name): | |
| date_str = datetime.now().strftime("%Y-%m-%d") | |
| filename = f"{_safe_name(batch_name)}_{date_str}.log" | |
| return os.path.join(LOG_DIR, filename), date_str | |
| def _fmt(seconds): | |
| """Format seconds as MM:SS.T (e.g. 01:23.4).""" | |
| m = int(seconds // 60) | |
| s = seconds % 60 | |
| tenths = int(s * 10) % 10 | |
| return f"{m:02d}:{int(s):02d}.{tenths}" | |
| def _read_status_rows(urls): | |
| """Read production_status.csv and return a dict keyed by URL.""" | |
| csv_path = os.path.join(LOG_DIR, "production_status.csv") | |
| url_set = set(urls) | |
| data = {} | |
| if not os.path.exists(csv_path): | |
| return data | |
| try: | |
| with open(csv_path, newline="", encoding="utf-8") as f: | |
| for row in csv.DictReader(f): | |
| if row.get("url") in url_set: | |
| data[row["url"]] = row | |
| except Exception: | |
| pass | |
| return data | |
| def run_batch(batch_name, urls, email_recipient): | |
| """ | |
| Generator — yields (rows, status_msg, log_path|None, batch_id, elapsed_str, is_done) | |
| every 100ms. URL processing runs in a daemon thread; the generator just | |
| streams the shared state on each tick. | |
| rows columns: [#, URL, Status, Details, New Post URL, Time] | |
| elapsed_str: "MM:SS.T" stopwatch string | |
| is_done: True on the final yield (after email is sent) | |
| """ | |
| batch_id = _make_batch_id(batch_name) | |
| log_path, date_str = _make_log_path(batch_name) | |
| batch_start = time.time() | |
| handler = logging.FileHandler(log_path, encoding="utf-8") | |
| handler.setLevel(logging.DEBUG) | |
| handler.setFormatter( | |
| logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S") | |
| ) | |
| logger.addHandler(handler) | |
| logger.info(f"=== Batch '{batch_name}' started — {len(urls)} URL(s) ===") | |
| create_batch(batch_id, batch_name, email_recipient, urls) | |
| lock = threading.Lock() | |
| rows = [[i + 1, url, "Pending", "—", "", "—"] for i, url in enumerate(urls)] | |
| state = { | |
| "msg": f"Starting '{batch_name}' — {len(urls)} URL(s)...", | |
| "current_i": -1, # index of URL currently being processed | |
| "url_start": None, # time.time() when current URL started | |
| "done": False, | |
| "log_path": None, | |
| } | |
| def worker(): | |
| succeeded = 0 | |
| failed = 0 | |
| results = [] | |
| try: | |
| for i, url in enumerate(urls): | |
| url_start = time.time() | |
| with lock: | |
| rows[i][2] = "In progress" | |
| rows[i][3] = "Scraping + AI rewrite..." | |
| rows[i][5] = "0.0s" | |
| state["current_i"] = i | |
| state["url_start"] = url_start | |
| state["msg"] = f"[{i + 1}/{len(urls)}] {url[:70]}" | |
| update_url(batch_id, i + 1, "In progress", "Scraping + AI rewrite...") | |
| try: | |
| post_info = process_article(url) | |
| post_url = post_info.get("url") or post_info.get("edit_url", "") | |
| url_time = f"{time.time() - url_start:.1f}s" | |
| with lock: | |
| rows[i][2] = "Done" | |
| rows[i][3] = "All steps OK" | |
| rows[i][4] = post_url | |
| rows[i][5] = url_time | |
| succeeded += 1 | |
| state["msg"] = ( | |
| f"[{i + 1}/{len(urls)}] Done — {succeeded} ok, {failed} failed" | |
| ) | |
| update_url(batch_id, i + 1, "Done", "All steps OK", post_url, url_time) | |
| results.append({"url": url, "status": "success", "post_url": post_url}) | |
| except Exception as e: | |
| error_msg = str(e)[:100] | |
| url_time = f"{time.time() - url_start:.1f}s" | |
| with lock: | |
| rows[i][2] = "Failed" | |
| rows[i][3] = error_msg | |
| rows[i][4] = "" | |
| rows[i][5] = url_time | |
| failed += 1 | |
| state["msg"] = ( | |
| f"[{i + 1}/{len(urls)}] Failed — {succeeded} ok, {failed} failed" | |
| ) | |
| update_url(batch_id, i + 1, "Failed", error_msg, "", url_time) | |
| results.append({"url": url, "status": "failed", "error": error_msg}) | |
| with lock: | |
| state["current_i"] = -1 | |
| state["url_start"] = None | |
| final_status = ( | |
| "completed" if failed == 0 else ("failed" if succeeded == 0 else "partial") | |
| ) | |
| finish_batch(batch_id, final_status) | |
| logger.info( | |
| f"=== Batch '{batch_name}' complete — {succeeded}/{len(urls)} succeeded ===" | |
| ) | |
| # Enrich results with title/new_title from production_status.csv | |
| csv_data = _read_status_rows(urls) | |
| enriched = [] | |
| for r in results: | |
| csv_row = csv_data.get(r["url"], {}) | |
| enriched.append({ | |
| **r, | |
| "title": csv_row.get("title", ""), | |
| "new_title": csv_row.get("new_title", ""), | |
| "new_post_url": csv_row.get("new_post_url") or r.get("post_url", ""), | |
| "failure_step": csv_row.get("failure_step", ""), | |
| }) | |
| email_note = "" | |
| try: | |
| send_batch_summary(batch_name, date_str, enriched, email_recipient, log_path) | |
| email_note = f" Email sent to {email_recipient}." | |
| logger.info(f"Summary email sent to {email_recipient}") | |
| except ValueError as e: | |
| logger.warning(str(e)) | |
| email_note = f" Email skipped: {e}" | |
| except Exception as e: | |
| logger.error(f"Email send failed: {e}") | |
| email_note = f" Email failed: {e}" | |
| handler.flush() | |
| handler.close() | |
| logger.removeHandler(handler) | |
| with lock: | |
| state["msg"] = ( | |
| f"Batch '{batch_name}' complete — " | |
| f"{succeeded}/{len(urls)} succeeded, {failed} failed.{email_note}" | |
| ) | |
| state["log_path"] = log_path | |
| state["done"] = True | |
| except Exception as exc: | |
| logger.exception(f"Worker thread crashed: {exc}") | |
| try: | |
| handler.flush() | |
| handler.close() | |
| logger.removeHandler(handler) | |
| except Exception: | |
| pass | |
| with lock: | |
| state["msg"] = f"Batch crashed: {exc}" | |
| state["log_path"] = log_path | |
| state["done"] = True | |
| thread = threading.Thread(target=worker, daemon=True) | |
| thread.start() | |
| # Tick every 100ms and stream current state | |
| while True: | |
| time.sleep(0.1) | |
| elapsed = _fmt(time.time() - batch_start) | |
| with lock: | |
| current_rows = [r[:] for r in rows] | |
| msg = state["msg"] | |
| current_i = state["current_i"] | |
| url_start_t = state["url_start"] | |
| done = state["done"] | |
| final_log = state["log_path"] | |
| # Live-tick the in-progress row's Time cell | |
| if current_i >= 0 and url_start_t is not None: | |
| current_rows[current_i][5] = f"{time.time() - url_start_t:.1f}s" | |
| yield current_rows, msg, final_log if done else None, batch_id, elapsed, done | |
| if done: | |
| break | |