suppfactsdaily / batch_runner.py
RidhiD.
Rich email from status CSV; timer stops with checkmark on completion
ce58d7c
"""
Batch orchestration: URL processing runs in a background thread so the
generator can yield every 100ms for live timer updates.
"""
import csv
import logging
import os
import threading
import time
from datetime import datetime
from batch_store import create_batch, finish_batch, update_url
from config import LOG_DIR
from logger import logger
from main import process_article
from smtp_mailer import send_batch_summary
def _safe_name(batch_name):
return "".join(c if c.isalnum() or c in "-_" else "_" for c in batch_name.strip())
def _make_batch_id(batch_name):
ts = datetime.now().strftime("%Y-%m-%d-%H%M%S")
return f"{_safe_name(batch_name)[:40]}-{ts}"
def _make_log_path(batch_name):
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{_safe_name(batch_name)}_{date_str}.log"
return os.path.join(LOG_DIR, filename), date_str
def _fmt(seconds):
"""Format seconds as MM:SS.T (e.g. 01:23.4)."""
m = int(seconds // 60)
s = seconds % 60
tenths = int(s * 10) % 10
return f"{m:02d}:{int(s):02d}.{tenths}"
def _read_status_rows(urls):
"""Read production_status.csv and return a dict keyed by URL."""
csv_path = os.path.join(LOG_DIR, "production_status.csv")
url_set = set(urls)
data = {}
if not os.path.exists(csv_path):
return data
try:
with open(csv_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
if row.get("url") in url_set:
data[row["url"]] = row
except Exception:
pass
return data
def run_batch(batch_name, urls, email_recipient):
"""
Generator — yields (rows, status_msg, log_path|None, batch_id, elapsed_str, is_done)
every 100ms. URL processing runs in a daemon thread; the generator just
streams the shared state on each tick.
rows columns: [#, URL, Status, Details, New Post URL, Time]
elapsed_str: "MM:SS.T" stopwatch string
is_done: True on the final yield (after email is sent)
"""
batch_id = _make_batch_id(batch_name)
log_path, date_str = _make_log_path(batch_name)
batch_start = time.time()
handler = logging.FileHandler(log_path, encoding="utf-8")
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S")
)
logger.addHandler(handler)
logger.info(f"=== Batch '{batch_name}' started — {len(urls)} URL(s) ===")
create_batch(batch_id, batch_name, email_recipient, urls)
lock = threading.Lock()
rows = [[i + 1, url, "Pending", "—", "", "—"] for i, url in enumerate(urls)]
state = {
"msg": f"Starting '{batch_name}' — {len(urls)} URL(s)...",
"current_i": -1, # index of URL currently being processed
"url_start": None, # time.time() when current URL started
"done": False,
"log_path": None,
}
def worker():
succeeded = 0
failed = 0
results = []
try:
for i, url in enumerate(urls):
url_start = time.time()
with lock:
rows[i][2] = "In progress"
rows[i][3] = "Scraping + AI rewrite..."
rows[i][5] = "0.0s"
state["current_i"] = i
state["url_start"] = url_start
state["msg"] = f"[{i + 1}/{len(urls)}] {url[:70]}"
update_url(batch_id, i + 1, "In progress", "Scraping + AI rewrite...")
try:
post_info = process_article(url)
post_url = post_info.get("url") or post_info.get("edit_url", "")
url_time = f"{time.time() - url_start:.1f}s"
with lock:
rows[i][2] = "Done"
rows[i][3] = "All steps OK"
rows[i][4] = post_url
rows[i][5] = url_time
succeeded += 1
state["msg"] = (
f"[{i + 1}/{len(urls)}] Done — {succeeded} ok, {failed} failed"
)
update_url(batch_id, i + 1, "Done", "All steps OK", post_url, url_time)
results.append({"url": url, "status": "success", "post_url": post_url})
except Exception as e:
error_msg = str(e)[:100]
url_time = f"{time.time() - url_start:.1f}s"
with lock:
rows[i][2] = "Failed"
rows[i][3] = error_msg
rows[i][4] = ""
rows[i][5] = url_time
failed += 1
state["msg"] = (
f"[{i + 1}/{len(urls)}] Failed — {succeeded} ok, {failed} failed"
)
update_url(batch_id, i + 1, "Failed", error_msg, "", url_time)
results.append({"url": url, "status": "failed", "error": error_msg})
with lock:
state["current_i"] = -1
state["url_start"] = None
final_status = (
"completed" if failed == 0 else ("failed" if succeeded == 0 else "partial")
)
finish_batch(batch_id, final_status)
logger.info(
f"=== Batch '{batch_name}' complete — {succeeded}/{len(urls)} succeeded ==="
)
# Enrich results with title/new_title from production_status.csv
csv_data = _read_status_rows(urls)
enriched = []
for r in results:
csv_row = csv_data.get(r["url"], {})
enriched.append({
**r,
"title": csv_row.get("title", ""),
"new_title": csv_row.get("new_title", ""),
"new_post_url": csv_row.get("new_post_url") or r.get("post_url", ""),
"failure_step": csv_row.get("failure_step", ""),
})
email_note = ""
try:
send_batch_summary(batch_name, date_str, enriched, email_recipient, log_path)
email_note = f" Email sent to {email_recipient}."
logger.info(f"Summary email sent to {email_recipient}")
except ValueError as e:
logger.warning(str(e))
email_note = f" Email skipped: {e}"
except Exception as e:
logger.error(f"Email send failed: {e}")
email_note = f" Email failed: {e}"
handler.flush()
handler.close()
logger.removeHandler(handler)
with lock:
state["msg"] = (
f"Batch '{batch_name}' complete — "
f"{succeeded}/{len(urls)} succeeded, {failed} failed.{email_note}"
)
state["log_path"] = log_path
state["done"] = True
except Exception as exc:
logger.exception(f"Worker thread crashed: {exc}")
try:
handler.flush()
handler.close()
logger.removeHandler(handler)
except Exception:
pass
with lock:
state["msg"] = f"Batch crashed: {exc}"
state["log_path"] = log_path
state["done"] = True
thread = threading.Thread(target=worker, daemon=True)
thread.start()
# Tick every 100ms and stream current state
while True:
time.sleep(0.1)
elapsed = _fmt(time.time() - batch_start)
with lock:
current_rows = [r[:] for r in rows]
msg = state["msg"]
current_i = state["current_i"]
url_start_t = state["url_start"]
done = state["done"]
final_log = state["log_path"]
# Live-tick the in-progress row's Time cell
if current_i >= 0 and url_start_t is not None:
current_rows[current_i][5] = f"{time.time() - url_start_t:.1f}s"
yield current_rows, msg, final_log if done else None, batch_id, elapsed, done
if done:
break