| """ |
| Background worker for processing document generation jobs using batched Claude API. |
| Runs as RQ worker process. |
| """ |
|
|
| import asyncio |
| import io |
| import json |
| import os |
| import pathlib |
| import tempfile |
| import time |
| import traceback |
| import zipfile |
| import shutil |
| import base64 |
| import math |
| from typing import Dict, Any, List, Callable |
| from datetime import datetime |
|
|
| |
| from .config import settings |
|
|
| from .supabase_client import supabase_client |
| from .google_drive import GoogleDriveClient |
| from .utils import ( |
| download_image_to_base64, |
| create_token_mapping_json, |
| download_seed_images, |
| build_prompt, |
| extract_html_documents_from_response, |
| extract_ground_truth, |
| extract_css_from_html, |
| increase_handwriting_font_size, |
| unmark_visual_elements, |
| render_html_to_pdf, |
| preprocess_html_for_pdf, |
| extract_bboxes_from_rendered_pdf, |
| extract_all_bboxes_from_pdf, |
| extract_raw_annotations_from_geometries, |
| process_stage3_complete, |
| process_stage4_ocr, |
| process_stage5_complete, |
| validate_html_structure, |
| validate_pdf, |
| validate_bboxes, |
| retry_on_network_error |
| ) |
| from docgenie.generation.pipeline_01.claude_batching import ClaudeBatchedClient |
| from docgenie import ENV |
|
|
|
|
| |
| |
| VERBOSE_LOGGING = os.getenv('WORKER_VERBOSE_LOGGING', 'false').lower() in ('true', '1', 'yes') |
|
|
| def log_verbose(message: str): |
| """Log message only if verbose logging is enabled""" |
| if VERBOSE_LOGGING: |
| print(message) |
|
|
|
|
| |
| def validate_worker_config(): |
| """Validate worker configuration at startup""" |
| print("=" * 60) |
| print("π§ Worker Configuration Check") |
| print("=" * 60) |
| |
| |
| if settings.ANTHROPIC_API_KEY: |
| print("β ANTHROPIC_API_KEY: Set") |
| else: |
| print("β ANTHROPIC_API_KEY: NOT SET (REQUIRED)") |
| |
| |
| if settings.SUPABASE_URL and settings.SUPABASE_KEY: |
| print(f"β SUPABASE: {settings.SUPABASE_URL[:30]}...") |
| else: |
| print("β SUPABASE: NOT SET (REQUIRED)") |
| |
| |
| if settings.GOOGLE_CLIENT_ID and settings.GOOGLE_CLIENT_SECRET: |
| print(f"β GOOGLE_CLIENT_ID: {settings.GOOGLE_CLIENT_ID[:20]}...") |
| print("β GOOGLE_CLIENT_SECRET: Set") |
| print(" β Token auto-refresh: ENABLED") |
| else: |
| print("β GOOGLE_CLIENT_ID/SECRET: Not set") |
| print(" β Token auto-refresh: DISABLED") |
| print(" β Users must provide fresh access tokens that don't expire during processing") |
| |
| print("=" * 60) |
|
|
| |
| validate_worker_config() |
|
|
|
|
|
|
|
|
| async def process_document_generation_job_async(request_id: str, request_data: Dict[str, Any]): |
| """ |
| Async background job function - processes document generation using batched Claude API. |
| |
| This function: |
| 1. Creates Claude batch with single message (generates N documents) |
| 2. Polls batch until completion |
| 3. Processes all documents (PDFs, handwriting, etc.) |
| 4. Uploads ZIP to user's Google Drive |
| 5. Updates Supabase with results |
| |
| Args: |
| request_id: Document request UUID from Supabase |
| request_data: Request parameters dict containing: |
| - user_id: int |
| - seed_images: List[str] (URLs) |
| - prompt_params: Dict (language, doc_type, num_solutions, etc.) |
| |
| Raises: |
| Exception: Any error during processing (logged to Supabase) |
| """ |
| user_id = request_data['user_id'] |
| google_drive_token = request_data.get('google_drive_token') |
| if google_drive_token == "string": google_drive_token = None |
| google_drive_refresh_token = request_data.get('google_drive_refresh_token') |
| if google_drive_refresh_token == "string": google_drive_refresh_token = None |
| seed_image_urls = request_data['seed_images'] |
| prompt_params = request_data['prompt_params'] |
| |
| |
| log_verbose(f"[Job {request_id}] Cleaning up old results for request {request_id}...") |
| try: |
| supabase_client.delete_generated_documents(request_id) |
| except Exception as cleanup_err: |
| print(f"[Job {request_id}] β Cleanup of old records failed: {cleanup_err}") |
|
|
| |
| if google_drive_refresh_token: |
| if not settings.GOOGLE_CLIENT_ID or not settings.GOOGLE_CLIENT_SECRET: |
| print(f"[Job {request_id}] β οΈ WARNING: refresh_token provided but GOOGLE_CLIENT_ID/SECRET not configured") |
| print(f"[Job {request_id}] Token auto-refresh will fail. Ensure access token remains valid.") |
| |
| |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| tmp_path = pathlib.Path(tmp_dir) |
| batch_dir = tmp_path / "batches" |
| message_dir = tmp_path / "messages" |
| batch_dir.mkdir(exist_ok=True) |
| message_dir.mkdir(exist_ok=True) |
| |
| |
| from .dataset_exporter import DatasetExporter |
| exporter = DatasetExporter(tmp_path, dataset_name="docgenie_documents") |
| |
| try: |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "processing")) |
| print(f"[Job {request_id}] Status: processing (fetching seed images)") |
| |
| |
| log_verbose(f"[Job {request_id}] Downloading {len(seed_image_urls)} seed images...") |
| seed_images_base64 = retry_on_network_error(lambda: download_seed_images(seed_image_urls)) |
| log_verbose(f"[Job {request_id}] Downloaded {len(seed_images_base64)} images") |
| |
| |
| prompt_template_path = ENV.PROMPT_TEMPLATES_DIR / "ClaudeRefined12" / "seed-based-json.txt" |
| if not prompt_template_path.exists(): |
| raise FileNotFoundError(f"Prompt template not found: {prompt_template_path}") |
|
|
| num_solutions = prompt_params.get('num_solutions', 1) |
| chunk_size = settings.BATCH_PROMPT_CHUNK_SIZE |
| num_prompts = math.ceil(num_solutions / chunk_size) |
| |
| prompts = [] |
| images_base64_list = [] |
| image_docids_list = [] |
| |
| for i in range(num_prompts): |
| |
| current_num_solutions = min(chunk_size, num_solutions - (i * chunk_size)) |
| |
| p = build_prompt( |
| language=prompt_params.get('language', 'English'), |
| doc_type=prompt_params.get('doc_type', 'business and administrative'), |
| gt_type=prompt_params.get('gt_type', 'Questions and answers'), |
| gt_format=prompt_params.get('gt_format', '{"question": "answer"}'), |
| num_solutions=current_num_solutions, |
| num_seed_images=len(seed_images_base64), |
| prompt_template_path=prompt_template_path, |
| enable_visual_elements=prompt_params.get('enable_visual_elements', False), |
| visual_element_types=prompt_params.get('visual_element_types', []) |
| ) |
| prompts.append(p) |
| images_base64_list.append(seed_images_base64) |
| image_docids_list.append(["seed"] * len(seed_images_base64)) |
| |
| log_verbose(f"[Job {request_id}] Created {num_prompts} prompts (chunk size: {chunk_size})") |
| |
| |
| log_verbose(f"[Job {request_id}] Creating Claude batch with {num_prompts} messages...") |
| |
| client = ClaudeBatchedClient(api_key=settings.ANTHROPIC_API_KEY) |
| |
| |
| client.send_batch( |
| model=settings.CLAUDE_MODEL, |
| prompts=prompts, |
| images_base64=images_base64_list, |
| image_docids=image_docids_list, |
| batch_data_directory=batch_dir, |
| max_tokens=16384 |
| ) |
| |
| print(f"[Job {request_id}] β³ Batch created with {num_prompts} tasks, awaiting processing...") |
| |
| |
| client.await_batches( |
| batch_data_directory=batch_dir, |
| message_data_directory=message_dir, |
| sleep_seconds_between_batch=2, |
| sleep_seconds_iteration=settings.BATCH_POLL_INTERVAL |
| ) |
| |
| print(f"[Job {request_id}] β Batch complete") |
| |
| |
| message_files = list(message_dir.glob("*.json")) |
| |
| if not message_files: |
| raise RuntimeError("No message results found after batch completion") |
| |
| html_documents = [] |
| for msg_file in message_files: |
| try: |
| message_data = json.loads(msg_file.read_text()) |
| if message_data.get('result_type') == 'succeeded': |
| llm_response = message_data['response'] |
| docs = extract_html_documents_from_response(llm_response) |
| html_documents.extend(docs) |
| |
| |
| from .utils import calculate_message_cost |
| i_tokens = message_data.get('usage_input_tokens', 0) |
| o_tokens = message_data.get('usage_output_tokens', 0) |
| c_create = message_data.get('cache_creation_input_tokens', 0) |
| c_read = message_data.get('cache_read_input_tokens', 0) |
| |
| cost = calculate_message_cost( |
| model=settings.CLAUDE_MODEL, |
| input_tokens=i_tokens, |
| output_tokens=o_tokens, |
| cache_creation_input_tokens=c_create, |
| cache_read_input_tokens=c_read |
| ) |
| exporter.add_cost(cost, i_tokens, o_tokens, c_create, c_read) |
| |
| log_verbose(f" β Extracted {len(docs)} documents from task {msg_file.stem} (Cost: ${cost:.4f})") |
| else: |
| error_msg = message_data.get('error', 'Unknown error') |
| print(f"[Job {request_id}] β Task {msg_file.stem} failed: {error_msg}") |
| except Exception as e: |
| print(f"[Job {request_id}] β Error reading message result {msg_file.name}: {e}") |
| |
| if not html_documents: |
| raise RuntimeError("No valid HTML documents found in any batch results") |
| |
| print(f"[Job {request_id}] β Combined total of {len(html_documents)} documents from all tasks") |
| |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "generating")) |
| print(f"[Job {request_id}] Status: generating (processing documents)") |
| |
| |
| assets_temp_dir = None |
| try: |
| assets_path = f"{user_id}/{request_id}/assets" |
| files = retry_on_network_error(lambda: supabase_client.list_files("doc_storage", assets_path)) |
| |
| |
| asset_files = [f for f in files if f and f.get('id') is not None] if files else [] |
| |
| if asset_files: |
| assets_temp_dir = pathlib.Path(tempfile.mkdtemp()) |
| print(f"[Job {request_id}] Found {len(asset_files)} assets in storage, downloading...") |
| |
| for file_info in asset_files: |
| file_name = file_info['name'] |
| try: |
| file_content = retry_on_network_error(lambda: supabase_client.download_file("doc_storage", f"{assets_path}/{file_name}")) |
| with open(assets_temp_dir / file_name, 'wb') as f: |
| f.write(file_content) |
| log_verbose(f" β Downloaded {file_name}") |
| except Exception as download_err: |
| print(f" β Failed to download {file_name}: {download_err}") |
| else: |
| log_verbose(f"[Job {request_id}] No assets found in {assets_path}") |
| except Exception as e: |
| print(f"[Job {request_id}] β Asset check/download failed: {e}") |
| |
| |
| pdf_files = [] |
| metadata = [] |
| |
| for idx, html in enumerate(html_documents): |
| try: |
| doc_id = f"document_{idx + 1}" |
| log_verbose(f"[Job {request_id}] Processing document {idx + 1}/{len(html_documents)}") |
| |
| |
| original_pdf_path = None |
| |
| |
| is_valid, error_msg = validate_html_structure(html) |
| if not is_valid: |
| print(f"[Job {request_id}] Document {idx + 1} HTML validation failed: {error_msg}") |
| continue |
| |
| |
| gt, html_clean = extract_ground_truth(html) |
| css, _ = extract_css_from_html(html_clean) |
| |
| |
| pdf_path = tmp_path / f"{doc_id}.pdf" |
| pdf_path, width_mm, height_mm, geometries = await render_html_to_pdf( |
| html=html_clean, |
| output_pdf_path=pdf_path |
| ) |
| |
| |
| original_pdf_path = pdf_path |
| |
| |
| is_valid, error_msg = validate_pdf(pdf_path) |
| if not is_valid: |
| print(f"[Job {request_id}] Document {idx + 1} PDF validation failed: {error_msg}") |
| continue |
| |
| |
| bboxes_raw = extract_bboxes_from_rendered_pdf(pdf_path) |
| |
| |
| is_valid, error_msg = validate_bboxes(bboxes_raw, min_bbox_count=1) |
| if not is_valid: |
| print(f"[Job {request_id}] Document {idx + 1} BBox validation warning: {error_msg}") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Extracted {len(bboxes_raw)} bboxes") |
| |
| |
| final_image_b64 = None |
| handwriting_regions = [] |
| visual_elements = [] |
| handwriting_images = {} |
| visual_element_images = {} |
| ocr_results = None |
| pdf_with_handwriting_path = None |
| pdf_final_path = None |
| |
| if prompt_params.get('enable_handwriting') or prompt_params.get('enable_visual_elements'): |
| |
| if prompt_params.get('enable_handwriting'): |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "handwriting")) |
| log_verbose(f"[Job {request_id}] Status: handwriting (generating handwritten text)") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Processing handwriting/visual elements...") |
| |
| try: |
| final_image_b64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path = await process_stage3_complete( |
| pdf_path=pdf_path, |
| geometries=geometries, |
| ground_truth=gt, |
| bboxes_raw=bboxes_raw, |
| page_width_mm=width_mm, |
| page_height_mm=height_mm, |
| enable_handwriting=prompt_params.get('enable_handwriting', False), |
| handwriting_ratio=prompt_params.get('handwriting_ratio', 0.3), |
| handwriting_apply_ink_filter=prompt_params.get('handwriting_apply_ink_filter', True), |
| handwriting_enable_enhancements=prompt_params.get('handwriting_enable_enhancements', False), |
| handwriting_num_inference_steps=prompt_params.get('handwriting_num_inference_steps', 1000), |
| handwriting_writer_ids=prompt_params.get('handwriting_writer_ids', [404, 347, 156, 253, 354, 166, 320]), |
| enable_visual_elements=prompt_params.get('enable_visual_elements', False), |
| visual_element_types=prompt_params.get('visual_element_types', []), |
| seed=prompt_params.get('seed'), |
| assets_dir=assets_temp_dir, |
| barcode_number=prompt_params.get('barcode_number') |
| ) |
| |
| |
| if pdf_final_path and pdf_final_path.exists(): |
| pdf_path = pdf_final_path |
| elif pdf_with_handwriting_path and pdf_with_handwriting_path.exists(): |
| pdf_path = pdf_with_handwriting_path |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: {len(handwriting_regions)} handwriting, {len(visual_elements)} visual elements") |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: Stage 3 failed: {str(e)}") |
| |
| |
| if prompt_params.get('enable_ocr'): |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "ocr")) |
| log_verbose(f"[Job {request_id}] Status: ocr (running OCR on documents)") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Processing OCR...") |
| |
| try: |
| stage4_image, ocr_results = await process_stage4_ocr( |
| pdf_path=pdf_path, |
| enable_ocr=True, |
| dpi=settings.OCR_DPI |
| ) |
| |
| if ocr_results: |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: OCR complete - {len(ocr_results.get('words', []))} words") |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: OCR failed: {str(e)}") |
| |
| |
| stage5_results = {} |
| if any([ |
| prompt_params.get('enable_bbox_normalization'), |
| prompt_params.get('enable_gt_verification'), |
| prompt_params.get('enable_analysis'), |
| prompt_params.get('enable_debug_visualization') |
| ]): |
| |
| if prompt_params.get('enable_gt_verification'): |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "validation")) |
| log_verbose(f"[Job {request_id}] Status: validation (validating ground truth)") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Processing dataset packaging...") |
| |
| try: |
| stage5_results = await process_stage5_complete( |
| document_id=doc_id, |
| pdf_path=pdf_path, |
| image_base64=final_image_b64, |
| ocr_results=ocr_results, |
| ground_truth=gt, |
| has_handwriting=prompt_params.get('enable_handwriting', False), |
| has_visual_elements=prompt_params.get('enable_visual_elements', False), |
| layout_elements=visual_elements, |
| enable_bbox_normalization=prompt_params.get('enable_bbox_normalization', False), |
| enable_gt_verification=prompt_params.get('enable_gt_verification', False), |
| enable_analysis=prompt_params.get('enable_analysis', False), |
| enable_debug_visualization=prompt_params.get('enable_debug_visualization', False) |
| ) |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: Stage 5 failed: {str(e)}") |
| |
| |
| if original_pdf_path and pdf_path != original_pdf_path: |
| pdf_files.append(original_pdf_path) |
| pdf_files.append(pdf_path) |
| else: |
| pdf_files.append(pdf_path) |
| |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Extracting bbox_pdf (word + char level) from original PDF...") |
| |
| try: |
| bboxes_pdf = extract_all_bboxes_from_pdf(original_pdf_path if original_pdf_path else pdf_path) |
| bbox_pdf_word = bboxes_pdf.get('word', []) |
| bbox_pdf_char = bboxes_pdf.get('char', []) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: β Extracted {len(bbox_pdf_word)} word bboxes, {len(bbox_pdf_char)} char bboxes from PDF") |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: β bbox_pdf extraction failed: {e}") |
| bbox_pdf_word = bboxes_raw |
| bbox_pdf_char = [] |
| |
| |
| raw_annotations = None |
| if geometries: |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Extracting raw_annotations from geometries...") |
| try: |
| raw_annotations = extract_raw_annotations_from_geometries(geometries) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: β Extracted {len(raw_annotations)} layout annotations") |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: β raw_annotations extraction failed: {e}") |
| |
| |
| final_image_bytes = None |
| if final_image_b64: |
| import base64 |
| final_image_bytes = base64.b64decode(final_image_b64) |
| |
| |
| debug_viz_bytes = None |
| if stage5_results.get('debug_visualization'): |
| import base64 |
| debug_viz_dict = stage5_results['debug_visualization'] |
| if debug_viz_dict and 'bbox_overlay_base64' in debug_viz_dict: |
| debug_viz_b64 = debug_viz_dict['bbox_overlay_base64'] |
| debug_viz_bytes = base64.b64decode(debug_viz_b64) |
| |
| |
| output_detail = prompt_params.get('output_detail', 'minimal') |
| token_mapping_data = None |
| if output_detail in ["dataset", "complete"]: |
| token_mapping_data = create_token_mapping_json( |
| handwriting_regions, |
| handwriting_images, |
| visual_elements, |
| visual_element_images |
| ) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Output detail '{output_detail}': Prepared {len(handwriting_images)} handwriting tokens, {len(visual_element_images)} visual elements") |
| |
| |
| bbox_final_word = None |
| bbox_final_segment = None |
| if ocr_results and ocr_results.get('words'): |
| |
| bbox_final_word = ocr_results.get('words', []) |
| bbox_final_segment = ocr_results.get('lines', []) |
| else: |
| |
| bbox_final_word = bbox_pdf_word |
| bbox_final_segment = [] |
| |
| |
| pdf_initial_bytes = original_pdf_path.read_bytes() |
| |
| |
| pdf_with_handwriting_bytes = None |
| pdf_final_bytes = None |
| pdf_with_visual_elements_bytes = None |
| |
| if pdf_with_handwriting_path and pdf_with_handwriting_path.exists(): |
| pdf_with_handwriting_bytes = pdf_with_handwriting_path.read_bytes() |
| |
| if pdf_final_path and pdf_final_path.exists(): |
| pdf_final_bytes = pdf_final_path.read_bytes() |
| |
| |
| if pdf_final_bytes and not pdf_with_handwriting_bytes: |
| pdf_with_visual_elements_bytes = pdf_final_bytes |
| pdf_final_bytes = None |
| |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Adding document to dataset exporter...") |
| exporter.add_document( |
| document_id=doc_id, |
| html=html_clean, |
| css=css, |
| pdf_initial=pdf_initial_bytes, |
| pdf_with_handwriting=pdf_with_handwriting_bytes, |
| pdf_with_visual_elements=pdf_with_visual_elements_bytes, |
| pdf_final=pdf_final_bytes, |
| final_image=final_image_bytes, |
| ground_truth=gt, |
| raw_annotations=raw_annotations, |
| bboxes_pdf_word=bbox_pdf_word, |
| bboxes_pdf_char=bbox_pdf_char, |
| bboxes_final_word=bbox_final_word, |
| bboxes_final_segment=bbox_final_segment, |
| bboxes_normalized_word=stage5_results.get('normalized_bboxes_word'), |
| bboxes_normalized_segment=stage5_results.get('normalized_bboxes_segment'), |
| gt_verification=stage5_results.get('gt_verification'), |
| token_mapping=token_mapping_data, |
| handwriting_regions=handwriting_regions, |
| handwriting_images=handwriting_images, |
| visual_elements=visual_elements, |
| visual_element_images=visual_element_images, |
| layout_elements=visual_elements, |
| geometries=geometries, |
| ocr_results=ocr_results, |
| analysis_stats=stage5_results.get('analysis_stats'), |
| debug_visualization=debug_viz_bytes |
| ) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: β Document {doc_id} added to dataset") |
| |
| |
| metadata.append({ |
| "document_id": doc_id, |
| "filename": f"{doc_id}.pdf", |
| "bboxes": bboxes_raw, |
| "ground_truth": gt, |
| "geometries": geometries, |
| "page_width_mm": width_mm, |
| "page_height_mm": height_mm, |
| "handwriting_regions": handwriting_regions, |
| "visual_elements": visual_elements, |
| "has_stage3_image": final_image_b64 is not None, |
| "ocr_results": ocr_results, |
| |
| "normalized_bboxes_word": stage5_results.get('normalized_bboxes_word'), |
| "normalized_bboxes_segment": stage5_results.get('normalized_bboxes_segment'), |
| "gt_verification": stage5_results.get('gt_verification'), |
| "analysis_stats": stage5_results.get('analysis_stats'), |
| "debug_visualization_available": stage5_results.get('debug_visualization') is not None |
| }) |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Error processing document {idx + 1}: {str(e)}") |
| traceback.print_exc() |
| continue |
| |
| if not pdf_files: |
| raise RuntimeError("Failed to process any documents") |
| |
| log_verbose(f"[Job {request_id}] Processed {len(pdf_files)} PDF files") |
| |
| |
| log_verbose(f"[Job {request_id}] π¦ Finalizing dataset export...") |
| exporter.finalize( |
| request_id=request_id, |
| user_id=user_id, |
| prompt_params=prompt_params, |
| api_mode="async" |
| ) |
| log_verbose(f"[Job {request_id}] β Dataset structure finalized at {exporter.base_path}") |
| |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "zipping")) |
| print(f"[Job {request_id}] Status: zipping (creating ZIP archive)") |
| |
| |
| log_verbose(f"[Job {request_id}] π¦ Creating ZIP archive from dataset...") |
| zip_path = tmp_path / f"docgenie_{request_id}.zip" |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
| |
| for file_path in exporter.base_path.rglob('*'): |
| if file_path.is_file(): |
| arcname = file_path.relative_to(exporter.base_path.parent) |
| zip_file.write(file_path, arcname) |
| |
| zip_size_mb = zip_path.stat().st_size / (1024 * 1024) |
| log_verbose(f"[Job {request_id}] β ZIP created: {zip_size_mb:.2f} MB") |
| |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "uploading")) |
| print(f"[Job {request_id}] Status: uploading (uploading to Google Drive)") |
| |
| |
| print(f"[Job {request_id}] β¬οΈ Uploading to Google Drive...") |
| |
| google_drive_url = None |
| gdrive_failed = False |
| gdrive_skipped = False |
| |
| if not google_drive_token or google_drive_token == "string": |
| print(f"[Job {request_id}] No valid Google Drive token provided. Skipping Google Drive upload.") |
| gdrive_skipped = True |
| else: |
| try: |
| drive_client = GoogleDriveClient( |
| access_token=google_drive_token, |
| refresh_token=google_drive_refresh_token |
| ) |
| google_drive_url = drive_client.upload_file( |
| file_path=zip_path, |
| filename=f"docgenie_{request_id}.zip", |
| folder_name=settings.GOOGLE_DRIVE_FOLDER_NAME |
| ) |
| |
| print(f"[Job {request_id}] β Uploaded to Google Drive: {google_drive_url}") |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Google Drive upload failed: {str(e)}") |
| gdrive_failed = True |
| |
| |
| |
| log_verbose(f"[Job {request_id}] Saving results to Supabase...") |
| log_verbose(f"[Job {request_id}] URL: {google_drive_url}") |
| |
| |
| zip_url = None |
| try: |
| zip_storage_path = f"{user_id}/{request_id}/generated/docgenie_{request_id}.zip" |
| retry_on_network_error(lambda: supabase_client.upload_to_storage("doc_storage", zip_storage_path, zip_path.read_bytes(), "application/zip")) |
| zip_url = supabase_client.get_public_url("doc_storage", zip_storage_path) |
| print(f"[Job {request_id}] β Uploaded ZIP to Supabase: {zip_url}") |
| except Exception as e: |
| print(f"[Job {request_id}] β Supabase ZIP upload failed: {e}") |
|
|
| |
| print(f"[Job {request_id}] Uploading individual documents to Supabase...") |
| for idx, doc_data in enumerate(metadata): |
| doc_id = doc_data["document_id"] |
| try: |
| |
| doc_storage_path = f"{user_id}/{request_id}/generated/{idx}_doc.pdf" |
| gt_storage_path = f"{user_id}/{request_id}/generated/{idx}_gt.json" |
| src_storage_path = f"{user_id}/{request_id}/generated/{idx}_src.html" |
| bbox_storage_path = f"{user_id}/{request_id}/generated/{idx}_bbox.json" |
| |
| |
| doc_path = exporter.pdf_final_dir / f"{doc_id}.pdf" |
| if not doc_path.exists(): |
| doc_path = exporter.pdf_initial_dir / f"{doc_id}.pdf" |
| |
| gt_path = exporter.gt_dir / f"{doc_id}.json" |
| src_path = exporter.html_dir / f"{doc_id}.html" |
| bbox_path = exporter.bbox_pdf_word_dir / f"{doc_id}.json" |
| |
| |
| |
| |
| try: |
| |
| if doc_path.exists(): |
| retry_on_network_error(lambda: supabase_client.upload_to_storage("doc_storage", doc_storage_path, doc_path.read_bytes(), "application/pdf")) |
| |
| |
| if gt_path.exists(): |
| retry_on_network_error(lambda: supabase_client.upload_to_storage("doc_storage", gt_storage_path, gt_path.read_bytes(), "application/json")) |
| |
| |
| if src_path.exists(): |
| retry_on_network_error(lambda: supabase_client.upload_to_storage("doc_storage", src_storage_path, src_path.read_bytes(), "text/html")) |
| |
| |
| if bbox_path.exists(): |
| retry_on_network_error(lambda: supabase_client.upload_to_storage("doc_storage", bbox_storage_path, bbox_path.read_bytes(), "application/json")) |
| |
| |
| if doc_data.get("visual_elements") and doc_data.get("visual_element_images"): |
| for ve_id, img_b64 in doc_data["visual_element_images"].items(): |
| ve_storage_path = f"{user_id}/{request_id}/generated/{idx}_ve_{ve_id}.png" |
| try: |
| img_bytes = base64.b64decode(img_b64) |
| retry_on_network_error(lambda: supabase_client.upload_to_storage("doc_storage", ve_storage_path, img_bytes, "image/png")) |
| except Exception as ve_err: |
| print(f" β Failed to upload visual element {ve_id}: {ve_err}") |
| except Exception as upload_err: |
| |
| print(f" β Some file uploads failed for document {idx+1}: {upload_err}") |
|
|
| |
| try: |
| log_verbose(f" π¦ Creating DB record for document {idx+1} (index {idx})...") |
| record_id = retry_on_network_error(lambda: supabase_client.create_generated_document( |
| request_id=request_id, |
| file_url=supabase_client.get_public_url("doc_storage", doc_storage_path), |
| model_version=settings.LLM_MODEL, |
| doc_index=idx, |
| doc_storage_path=doc_storage_path, |
| gt_storage_path=gt_storage_path, |
| html_storage_path=src_storage_path, |
| bbox_storage_path=bbox_storage_path |
| )) |
| print(f" β Processed document {idx+1} and created DB record {record_id}") |
| except Exception as db_err: |
| print(f" β Failed to create DB record for document {idx+1}: {db_err}") |
| except Exception as doc_err: |
| print(f" β Unexpected error processing document {idx+1}: {doc_err}") |
|
|
| |
| if gdrive_skipped: |
| final_status = "completed_no_gdrive" |
| elif gdrive_failed: |
| final_status = "completed_gdrive_failed" |
| else: |
| final_status = "completed" |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status( |
| request_id=request_id, |
| status=final_status, |
| zip_url=zip_url |
| )) |
| |
| print(f"[Job {request_id}] β Job completed successfully!") |
| |
| |
| retry_on_network_error(lambda: supabase_client.log_analytics_event( |
| user_id=user_id, |
| event_type="document_generation_completed", |
| entity_id=request_id |
| )) |
| |
| print(f"[Job {request_id}] β
Job completed successfully!") |
| |
| except Exception as e: |
| |
| error_message = f"{type(e).__name__}: {str(e)}" |
| print(f"[Job {request_id}] β Job failed: {error_message}") |
| traceback.print_exc() |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status( |
| request_id=request_id, |
| status="failed", |
| error_message=error_message, |
| zip_url=locals().get('zip_url') |
| )) |
| |
| |
| retry_on_network_error(lambda: supabase_client.log_analytics_event( |
| user_id=user_id, |
| event_type="document_generation_failed", |
| entity_id=request_id |
| )) |
| |
| raise |
| finally: |
| |
| if 'assets_temp_dir' in locals() and assets_temp_dir and assets_temp_dir.exists(): |
| try: |
| shutil.rmtree(assets_temp_dir, ignore_errors=True) |
| print(f"[Job {request_id}] β Cleaned up assets directory {assets_temp_dir}") |
| except: |
| pass |
|
|
|
|
| def process_document_generation_job(request_id: str, request_data: Dict[str, Any]): |
| """ |
| Synchronous wrapper for RQ - calls the async function with asyncio.run(). |
| |
| This is the function that RQ worker calls. It runs the async version using asyncio. |
| """ |
| print(f"{'='*60}") |
| print(f"π― Worker picked up job: {request_id}") |
| print(f" User ID: {request_data.get('user_id', 'N/A')}") |
| print(f" Num documents: {request_data.get('prompt_params', {}).get('num_solutions', 'N/A')}") |
| print(f"{'='*60}") |
| |
| return asyncio.run(process_document_generation_job_async(request_id, request_data)) |
|
|