Spaces:
Runtime error
Runtime error
| # Copyright (c) 2025 Stephen G. Pope | |
| # | |
| # This program is free software; you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation; either version 2 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU General Public License along | |
| # with this program; if not, write to the Free Software Foundation, Inc., | |
| # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
| from flask import Flask, request | |
| from queue import Queue | |
| from services.webhook import send_webhook | |
| import threading | |
| import uuid | |
| import os | |
| import time | |
| import json | |
| from version import BUILD_NUMBER # Import the BUILD_NUMBER | |
| from app_utils import log_job_status, discover_and_register_blueprints # Import the discover_and_register_blueprints function | |
| from services.gcp_toolkit import trigger_cloud_run_job | |
| MAX_QUEUE_LENGTH = int(os.environ.get('MAX_QUEUE_LENGTH', 0)) | |
| def create_app(): | |
| app = Flask(__name__) | |
| # Create a queue to hold tasks | |
| task_queue = Queue() | |
| queue_id = id(task_queue) # Generate a single queue_id for this worker | |
| # Function to process tasks from the queue | |
| def process_queue(): | |
| while True: | |
| job_id, data, task_func, queue_start_time = task_queue.get() | |
| queue_time = time.time() - queue_start_time | |
| run_start_time = time.time() | |
| pid = os.getpid() # Get the PID of the actual processing thread | |
| # Log job status as running | |
| log_job_status(job_id, { | |
| "job_status": "running", | |
| "job_id": job_id, | |
| "queue_id": queue_id, | |
| "process_id": pid, | |
| "response": None | |
| }) | |
| response = task_func() | |
| run_time = time.time() - run_start_time | |
| total_time = time.time() - queue_start_time | |
| response_data = { | |
| "endpoint": response[1], | |
| "code": response[2], | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "response": response[0] if response[2] == 200 else None, | |
| "message": "success" if response[2] == 200 else response[0], | |
| "pid": pid, | |
| "queue_id": queue_id, | |
| "run_time": round(run_time, 3), | |
| "queue_time": round(queue_time, 3), | |
| "total_time": round(total_time, 3), | |
| "queue_length": task_queue.qsize(), | |
| "build_number": BUILD_NUMBER # Add build number to response | |
| } | |
| # Log job status as done | |
| log_job_status(job_id, { | |
| "job_status": "done", | |
| "job_id": job_id, | |
| "queue_id": queue_id, | |
| "process_id": pid, | |
| "response": response_data | |
| }) | |
| # Only send webhook if webhook_url has an actual value (not an empty string) | |
| if data.get("webhook_url") and data.get("webhook_url") != "": | |
| send_webhook(data.get("webhook_url"), response_data) | |
| task_queue.task_done() | |
| # Start the queue processing in a separate thread | |
| threading.Thread(target=process_queue, daemon=True).start() | |
| # Decorator to add tasks to the queue or bypass it | |
| def queue_task(bypass_queue=False): | |
| def decorator(f): | |
| def wrapper(*args, **kwargs): | |
| job_id = str(uuid.uuid4()) | |
| data = request.json if request.is_json else {} | |
| pid = os.getpid() # Get PID for non-queued tasks | |
| start_time = time.time() | |
| # If running inside a GCP Cloud Run Job instance, execute synchronously | |
| if os.environ.get("CLOUD_RUN_JOB"): | |
| # Get execution name from Google's env var | |
| execution_name = os.environ.get("CLOUD_RUN_EXECUTION", "gcp_job") | |
| # Log job status as running | |
| log_job_status(job_id, { | |
| "job_status": "running", | |
| "job_id": job_id, | |
| "queue_id": execution_name, | |
| "process_id": pid, | |
| "response": None | |
| }) | |
| # Execute the function directly (no queue) | |
| response = f(job_id=job_id, data=data, *args, **kwargs) | |
| run_time = time.time() - start_time | |
| # Build response object | |
| response_obj = { | |
| "code": response[2], | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "response": response[0] if response[2] == 200 else None, | |
| "message": "success" if response[2] == 200 else response[0], | |
| "run_time": round(run_time, 3), | |
| "queue_time": 0, | |
| "total_time": round(run_time, 3), | |
| "pid": pid, | |
| "queue_id": execution_name, | |
| "build_number": BUILD_NUMBER | |
| } | |
| # Log job status as done | |
| log_job_status(job_id, { | |
| "job_status": "done", | |
| "job_id": job_id, | |
| "queue_id": execution_name, | |
| "process_id": pid, | |
| "response": response_obj | |
| }) | |
| # Send webhook if webhook_url is provided | |
| if data.get("webhook_url") and data.get("webhook_url") != "": | |
| send_webhook(data.get("webhook_url"), response_obj) | |
| return response_obj, response[2] | |
| if os.environ.get("GCP_JOB_NAME") and data.get("webhook_url"): | |
| try: | |
| overrides = { | |
| 'container_overrides': [ | |
| { | |
| 'env': [ | |
| # Environment variables to pass to the GCP Cloud Run Job | |
| { | |
| 'name': 'GCP_JOB_PATH', | |
| 'value': request.path # Endpoint to call | |
| }, | |
| { | |
| 'name': 'GCP_JOB_PAYLOAD', | |
| 'value': json.dumps(data) # Payload as a string | |
| }, | |
| ] | |
| } | |
| ], | |
| 'task_count': 1 | |
| } | |
| # Call trigger_cloud_run_job with the overrides dictionary | |
| response = trigger_cloud_run_job( | |
| job_name=os.environ.get("GCP_JOB_NAME"), | |
| location=os.environ.get("GCP_JOB_LOCATION", "us-central1"), | |
| overrides=overrides # Pass overrides to the job | |
| ) | |
| if not response.get("job_submitted"): | |
| raise Exception(f"GCP job trigger failed: {response}") | |
| # Extract execution name and short ID for tracking | |
| execution_name = response.get("execution_name", "") | |
| gcp_queue_id = execution_name.split('/')[-1] if execution_name else "gcp_job" | |
| # Prepare the response object | |
| response_obj = { | |
| "code": 200, | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "message": response, | |
| "job_name": os.environ.get("GCP_JOB_NAME"), | |
| "location": os.environ.get("GCP_JOB_LOCATION", "us-central1"), | |
| "pid": pid, | |
| "queue_id": gcp_queue_id, | |
| "build_number": BUILD_NUMBER | |
| } | |
| log_job_status(job_id, { | |
| "job_status": "submitted", | |
| "job_id": job_id, | |
| "queue_id": gcp_queue_id, | |
| "process_id": pid, | |
| "response": response_obj | |
| }) | |
| return response_obj, 200 # Return 200 since it's a submission success | |
| except Exception as e: | |
| error_response = { | |
| "code": 500, | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "message": f"GCP Cloud Run Job trigger failed: {str(e)}", | |
| "job_name": os.environ.get("GCP_JOB_NAME"), | |
| "location": os.environ.get("GCP_JOB_LOCATION", "us-central1"), | |
| "pid": pid, | |
| "queue_id": "gcp_job", | |
| "build_number": BUILD_NUMBER | |
| } | |
| log_job_status(job_id, { | |
| "job_status": "failed", | |
| "job_id": job_id, | |
| "queue_id": "gcp_job", | |
| "process_id": pid, | |
| "response": error_response | |
| }) | |
| return error_response, 500 | |
| elif bypass_queue or 'webhook_url' not in data: | |
| # Log job status as running immediately (bypassing queue) | |
| log_job_status(job_id, { | |
| "job_status": "running", | |
| "job_id": job_id, | |
| "queue_id": queue_id, | |
| "process_id": pid, | |
| "response": None | |
| }) | |
| response = f(job_id=job_id, data=data, *args, **kwargs) | |
| run_time = time.time() - start_time | |
| response_obj = { | |
| "code": response[2], | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "response": response[0] if response[2] == 200 else None, | |
| "message": "success" if response[2] == 200 else response[0], | |
| "run_time": round(run_time, 3), | |
| "queue_time": 0, | |
| "total_time": round(run_time, 3), | |
| "pid": pid, | |
| "queue_id": queue_id, | |
| "queue_length": task_queue.qsize(), | |
| "build_number": BUILD_NUMBER # Add build number to response | |
| } | |
| # Log job status as done | |
| log_job_status(job_id, { | |
| "job_status": "done", | |
| "job_id": job_id, | |
| "queue_id": queue_id, | |
| "process_id": pid, | |
| "response": response_obj | |
| }) | |
| return response_obj, response[2] | |
| else: | |
| if MAX_QUEUE_LENGTH > 0 and task_queue.qsize() >= MAX_QUEUE_LENGTH: | |
| error_response = { | |
| "code": 429, | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "message": f"MAX_QUEUE_LENGTH ({MAX_QUEUE_LENGTH}) reached", | |
| "pid": pid, | |
| "queue_id": queue_id, | |
| "queue_length": task_queue.qsize(), | |
| "build_number": BUILD_NUMBER # Add build number to response | |
| } | |
| # Log the queue overflow error | |
| log_job_status(job_id, { | |
| "job_status": "done", | |
| "job_id": job_id, | |
| "queue_id": queue_id, | |
| "process_id": pid, | |
| "response": error_response | |
| }) | |
| return error_response, 429 | |
| # Log job status as queued | |
| log_job_status(job_id, { | |
| "job_status": "queued", | |
| "job_id": job_id, | |
| "queue_id": queue_id, | |
| "process_id": pid, | |
| "response": None | |
| }) | |
| task_queue.put((job_id, data, lambda: f(job_id=job_id, data=data, *args, **kwargs), start_time)) | |
| return { | |
| "code": 202, | |
| "id": data.get("id"), | |
| "job_id": job_id, | |
| "message": "processing", | |
| "pid": pid, | |
| "queue_id": queue_id, | |
| "max_queue_length": MAX_QUEUE_LENGTH if MAX_QUEUE_LENGTH > 0 else "unlimited", | |
| "queue_length": task_queue.qsize(), | |
| "build_number": BUILD_NUMBER # Add build number to response | |
| }, 202 | |
| return wrapper | |
| return decorator | |
| app.queue_task = queue_task | |
| # Register special route for Next.js root asset paths first | |
| from routes.v1.media.feedback import create_root_next_routes | |
| create_root_next_routes(app) | |
| # Use the discover_and_register_blueprints function to register all blueprints | |
| discover_and_register_blueprints(app) | |
| return app | |
| app = create_app() | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=8080) |