| import sqlite3 |
| import json |
| from pathlib import Path |
| import datetime |
|
|
| import os |
| _OUTPUT = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output'))) |
| _OUTPUT.mkdir(parents=True, exist_ok=True) |
| DB_PATH = _OUTPUT / 'jobs.db' |
|
|
|
|
| def _conn(): |
| conn = sqlite3.connect(str(DB_PATH), detect_types=sqlite3.PARSE_DECLTYPES) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
|
|
| def init_db(): |
| c = _conn() |
| cur = c.cursor() |
| cur.execute(''' |
| CREATE TABLE IF NOT EXISTS jobs ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| url TEXT, |
| org_name TEXT, |
| org_url TEXT, |
| max_pages INTEGER, |
| runs INTEGER, |
| status TEXT, |
| progress TEXT, |
| result_path TEXT, |
| created_at TIMESTAMP, |
| updated_at TIMESTAMP |
| ) |
| ''') |
| |
| cur.execute("PRAGMA table_info(jobs)") |
| cols = [r[1] for r in cur.fetchall()] |
| if 'user_id' not in cols: |
| try: |
| cur.execute('ALTER TABLE jobs ADD COLUMN user_id INTEGER') |
| except Exception: |
| pass |
| if 'company_id' not in cols: |
| try: |
| cur.execute('ALTER TABLE jobs ADD COLUMN company_id INTEGER') |
| except Exception: |
| pass |
| if 'industry_override' not in cols: |
| try: |
| cur.execute('ALTER TABLE jobs ADD COLUMN industry_override TEXT') |
| except Exception: |
| pass |
| c.commit() |
| c.close() |
|
|
|
|
| def enqueue_job(url, org_name, org_url, max_pages=3, runs=1, user_id=None, company_id=None, industry_override=None): |
| init_db() |
| now = datetime.datetime.utcnow() |
| c = _conn() |
| cur = c.cursor() |
| cur.execute('''INSERT INTO jobs (url,org_name,org_url,max_pages,runs,status,progress,created_at,updated_at,user_id,company_id,industry_override) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''', |
| (url, org_name, org_url, max_pages, runs, 'pending', json.dumps({}), now, now, user_id, company_id, industry_override)) |
| jid = cur.lastrowid |
| c.commit() |
| c.close() |
| return jid |
|
|
|
|
| def list_jobs(limit=50): |
| init_db() |
| c = _conn() |
| cur = c.cursor() |
| cur.execute('SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?', (limit,)) |
| rows = [dict(r) for r in cur.fetchall()] |
| c.close() |
| return rows |
|
|
|
|
| def get_job(job_id): |
| init_db() |
| c = _conn() |
| cur = c.cursor() |
| cur.execute('SELECT * FROM jobs WHERE id=?', (job_id,)) |
| row = cur.fetchone() |
| c.close() |
| return dict(row) if row else None |
|
|
|
|
| def update_job(job_id, status=None, progress=None, result_path=None): |
| init_db() |
| c = _conn() |
| cur = c.cursor() |
| updates = [] |
| params = [] |
| if status is not None: |
| updates.append('status=?'); params.append(status) |
| if progress is not None: |
| updates.append('progress=?'); params.append(json.dumps(progress)) |
| if result_path is not None: |
| updates.append('result_path=?'); params.append(result_path) |
| params.append(datetime.datetime.utcnow()) |
| params.append(job_id) |
| if updates: |
| sql = f"UPDATE jobs SET {', '.join(updates)}, updated_at=? WHERE id=?" |
| cur.execute(sql, params) |
| c.commit() |
| c.close() |
|
|
|
|
| def claim_next_job(): |
| """Atomically find a pending job and claim it by setting status to 'running'. Returns job id or None.""" |
| init_db() |
| c = _conn() |
| cur = c.cursor() |
| try: |
| |
| cur.execute("SELECT id FROM jobs WHERE status='pending' ORDER BY created_at ASC LIMIT 1") |
| row = cur.fetchone() |
| if not row: |
| return None |
| jid = row['id'] |
| now = datetime.datetime.utcnow() |
| cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, jid)) |
| if cur.rowcount == 1: |
| c.commit() |
| cur.execute('SELECT * FROM jobs WHERE id=?', (jid,)) |
| job = cur.fetchone() |
| return dict(job) if job else None |
| c.commit() |
| return None |
| finally: |
| c.close() |
|
|
|
|
| def claim_job(job_id): |
| """Claim a specific job id if it's pending; returns True when claimed.""" |
| init_db() |
| c = _conn() |
| cur = c.cursor() |
| try: |
| now = datetime.datetime.utcnow() |
| cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, job_id)) |
| claimed = cur.rowcount == 1 |
| if claimed: |
| c.commit() |
| return True |
| c.commit() |
| return False |
| finally: |
| c.close() |
|
|