| |
| """ |
| Script to queue verification jobs for tenders that have completed primary analysis. |
| Run this script after the primary worker completes analysis to trigger verification. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| from datetime import datetime, timezone |
| from typing import List, Dict, Any |
|
|
| import psycopg |
| from psycopg.rows import dict_row |
|
|
| def connect_database() -> psycopg.Connection: |
| """Connect to PostgreSQL database""" |
| database_url = os.getenv("DATABASE_URL") |
| if not database_url: |
| raise RuntimeError("DATABASE_URL environment variable is required") |
| |
| return psycopg.connect( |
| database_url, |
| autocommit=False, |
| prepare_threshold=None, |
| ) |
|
|
| def get_tenders_for_verification(conn: psycopg.Connection, limit: int = 50) -> List[Dict[str, Any]]: |
| """Get tenders that are ready for verification""" |
| with conn.cursor(row_factory=dict_row) as cur: |
| cur.execute(""" |
| SELECT |
| t.id, |
| t.title, |
| t.organization_id, |
| t.status, |
| t.verification_status, |
| e.structured_output is not null as has_analysis, |
| COUNT(vj.id) as pending_verification_jobs |
| FROM public.tenders t |
| LEFT JOIN public.extractions e ON e.tender_id = t.id |
| LEFT JOIN public.processing_jobs vj ON vj.tender_id = t.id AND vj.job_type = 'VERIFY' AND vj.status = 'QUEUED' |
| WHERE t.status = 'ANALYSIS_READY' |
| AND e.structured_output IS NOT NULL |
| AND (t.verification_status = 'PENDING' OR t.verification_status IS NULL) |
| AND vj.id IS NULL |
| GROUP BY t.id, t.title, t.organization_id, t.status, t.verification_status, e.structured_output is not null |
| ORDER BY t.updated_at DESC |
| LIMIT %s |
| """, (limit,)) |
| |
| return cur.fetchall() |
|
|
| def queue_verification_job(conn: psycopg.Connection, tender_id: str) -> bool: |
| """Queue a verification job for a tender""" |
| try: |
| with conn.transaction(): |
| with conn.cursor() as cur: |
| |
| cur.execute(""" |
| SELECT id FROM public.processing_jobs |
| WHERE tender_id = %s AND job_type = 'VERIFY' AND status IN ('QUEUED', 'RUNNING') |
| """, (tender_id,)) |
| |
| if cur.fetchone(): |
| print(f"Verification job already exists for tender {tender_id}") |
| return False |
| |
| |
| cur.execute(""" |
| INSERT INTO public.processing_jobs |
| (tender_id, job_type, payload, status, max_attempts, available_at, created_at, updated_at) |
| VALUES (%s, 'VERIFY', %s::jsonb, 'QUEUED', 3, now(), now(), now()) |
| RETURNING id |
| """, (tender_id, "{}")) |
| |
| job_id = cur.fetchone()[0] |
| |
| |
| cur.execute(""" |
| UPDATE public.tenders |
| SET verification_status = 'PROCESSING', |
| updated_at = now() |
| WHERE id = %s |
| """, (tender_id,)) |
| |
| print(f"Queued verification job {job_id} for tender {tender_id}") |
| return True |
| |
| except Exception as e: |
| print(f"Failed to queue verification job for tender {tender_id}: {e}") |
| return False |
|
|
| def queue_all_verifications(limit: int = 50, organization_id: str = None) -> int: |
| """Queue verification jobs for all eligible tenders""" |
| queued_count = 0 |
| |
| try: |
| with connect_database() as conn: |
| tenders = get_tenders_for_verification(conn, limit) |
| |
| if organization_id: |
| tenders = [t for t in tenders if t["organization_id"] == organization_id] |
| |
| print(f"Found {len(tenders)} tenders ready for verification") |
| |
| for tender in tenders: |
| if queue_verification_job(conn, tender["id"]): |
| queued_count += 1 |
| |
| print(f"Successfully queued {queued_count} verification jobs") |
| |
| except Exception as e: |
| print(f"Error queueing verification jobs: {e}") |
| return 0 |
| |
| return queued_count |
|
|
| def get_verification_status(conn: psycopg.Connection, tender_id: str = None) -> List[Dict[str, Any]]: |
| """Get verification status for tenders""" |
| query = """ |
| SELECT |
| t.id as tender_id, |
| t.title as tender_title, |
| t.organization_id, |
| t.status as tender_status, |
| t.verification_status, |
| t.verification_score, |
| t.last_verified_at, |
| j.id as job_id, |
| j.job_type, |
| j.status as job_status, |
| j.created_at as job_created_at, |
| j.locked_at as job_started_at, |
| j.last_error, |
| v.analysis is not null as has_verification, |
| v.comparison->>'agreement_score' as agreement_score |
| FROM public.tenders t |
| LEFT JOIN public.processing_jobs j ON j.tender_id = t.id AND j.job_type = 'VERIFY' |
| LEFT JOIN public.webai_verifications v ON v.tender_id = t.id |
| WHERE t.status = 'ANALYSIS_READY' |
| """ |
| |
| params = [] |
| if tender_id: |
| query += " AND t.id = %s" |
| params.append(tender_id) |
| |
| query += " ORDER BY t.updated_at DESC" |
| |
| with conn.cursor(row_factory=dict_row) as cur: |
| cur.execute(query, params) |
| return cur.fetchall() |
|
|
| def print_verification_status(tender_id: str = None, organization_id: str = None): |
| """Print verification status report""" |
| try: |
| with connect_database() as conn: |
| results = get_verification_status(conn, tender_id) |
| |
| if organization_id: |
| results = [r for r in results if r["organization_id"] == organization_id] |
| |
| if not results: |
| print("No verification data found") |
| return |
| |
| print(f"\n{'Tender ID':<36} {'Title':<40} {'Status':<12} {'Agreement':<10} {'Job Status':<12}") |
| print("-" * 120) |
| |
| for result in results: |
| agreement = result["agreement_score"] or "N/A" |
| if agreement != "N/A": |
| agreement = f"{float(agreement):.2f}" |
| |
| title = (result["tender_title"] or "Untitled")[:37] |
| print(f"{result['tender_id']:<36} {title:<40} {result['verification_status'] or 'PENDING':<12} {agreement:<10} {result['job_status'] or 'NONE':<12}") |
| |
| except Exception as e: |
| print(f"Error getting verification status: {e}") |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Manage WebAI verification jobs") |
| subparsers = parser.add_subparsers(dest="command", help="Available commands") |
| |
| |
| queue_parser = subparsers.add_parser("queue", help="Queue verification jobs") |
| queue_parser.add_argument("--limit", type=int, default=50, help="Maximum number of jobs to queue") |
| queue_parser.add_argument("--org", type=str, help="Filter by organization ID") |
| |
| |
| status_parser = subparsers.add_parser("status", help="Show verification status") |
| status_parser.add_argument("--tender", type=str, help="Specific tender ID") |
| status_parser.add_argument("--org", type=str, help="Filter by organization ID") |
| |
| args = parser.parse_args() |
| |
| if not args.command: |
| parser.print_help() |
| return |
| |
| if args.command == "queue": |
| count = queue_all_verifications(args.limit, args.org) |
| print(f"\nQueued {count} verification jobs") |
| |
| elif args.command == "status": |
| print_verification_status(args.tender, args.org) |
|
|
| if __name__ == "__main__": |
| main() |
|
|