# Copyright (c) 2025 Stephen G. Pope # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from flask import Flask, request from queue import Queue from services.webhook import send_webhook import threading import uuid import os import time import json from version import BUILD_NUMBER # Import the BUILD_NUMBER from app_utils import log_job_status, discover_and_register_blueprints # Import the discover_and_register_blueprints function from services.gcp_toolkit import trigger_cloud_run_job MAX_QUEUE_LENGTH = int(os.environ.get('MAX_QUEUE_LENGTH', 0)) def create_app(): app = Flask(__name__) # Create a queue to hold tasks task_queue = Queue() queue_id = id(task_queue) # Generate a single queue_id for this worker # Function to process tasks from the queue def process_queue(): while True: job_id, data, task_func, queue_start_time = task_queue.get() queue_time = time.time() - queue_start_time run_start_time = time.time() pid = os.getpid() # Get the PID of the actual processing thread # Log job status as running log_job_status(job_id, { "job_status": "running", "job_id": job_id, "queue_id": queue_id, "process_id": pid, "response": None }) response = task_func() run_time = time.time() - run_start_time total_time = time.time() - queue_start_time response_data = { "endpoint": response[1], "code": response[2], "id": data.get("id"), "job_id": job_id, "response": response[0] if response[2] == 200 else None, "message": "success" if response[2] == 200 else response[0], "pid": pid, "queue_id": queue_id, "run_time": round(run_time, 3), "queue_time": round(queue_time, 3), "total_time": round(total_time, 3), "queue_length": task_queue.qsize(), "build_number": BUILD_NUMBER # Add build number to response } # Log job status as done log_job_status(job_id, { "job_status": "done", "job_id": job_id, "queue_id": queue_id, "process_id": pid, "response": response_data }) # Only send webhook if webhook_url has an actual value (not an empty string) if data.get("webhook_url") and data.get("webhook_url") != "": send_webhook(data.get("webhook_url"), response_data) task_queue.task_done() # Start the queue processing in a separate thread threading.Thread(target=process_queue, daemon=True).start() # Decorator to add tasks to the queue or bypass it def queue_task(bypass_queue=False): def decorator(f): def wrapper(*args, **kwargs): job_id = str(uuid.uuid4()) data = request.json if request.is_json else {} pid = os.getpid() # Get PID for non-queued tasks start_time = time.time() # If running inside a GCP Cloud Run Job instance, execute synchronously if os.environ.get("CLOUD_RUN_JOB"): # Get execution name from Google's env var execution_name = os.environ.get("CLOUD_RUN_EXECUTION", "gcp_job") # Log job status as running log_job_status(job_id, { "job_status": "running", "job_id": job_id, "queue_id": execution_name, "process_id": pid, "response": None }) # Execute the function directly (no queue) response = f(job_id=job_id, data=data, *args, **kwargs) run_time = time.time() - start_time # Build response object response_obj = { "code": response[2], "id": data.get("id"), "job_id": job_id, "response": response[0] if response[2] == 200 else None, "message": "success" if response[2] == 200 else response[0], "run_time": round(run_time, 3), "queue_time": 0, "total_time": round(run_time, 3), "pid": pid, "queue_id": execution_name, "build_number": BUILD_NUMBER } # Log job status as done log_job_status(job_id, { "job_status": "done", "job_id": job_id, "queue_id": execution_name, "process_id": pid, "response": response_obj }) # Send webhook if webhook_url is provided if data.get("webhook_url") and data.get("webhook_url") != "": send_webhook(data.get("webhook_url"), response_obj) return response_obj, response[2] if os.environ.get("GCP_JOB_NAME") and data.get("webhook_url"): try: overrides = { 'container_overrides': [ { 'env': [ # Environment variables to pass to the GCP Cloud Run Job { 'name': 'GCP_JOB_PATH', 'value': request.path # Endpoint to call }, { 'name': 'GCP_JOB_PAYLOAD', 'value': json.dumps(data) # Payload as a string }, ] } ], 'task_count': 1 } # Call trigger_cloud_run_job with the overrides dictionary response = trigger_cloud_run_job( job_name=os.environ.get("GCP_JOB_NAME"), location=os.environ.get("GCP_JOB_LOCATION", "us-central1"), overrides=overrides # Pass overrides to the job ) if not response.get("job_submitted"): raise Exception(f"GCP job trigger failed: {response}") # Extract execution name and short ID for tracking execution_name = response.get("execution_name", "") gcp_queue_id = execution_name.split('/')[-1] if execution_name else "gcp_job" # Prepare the response object response_obj = { "code": 200, "id": data.get("id"), "job_id": job_id, "message": response, "job_name": os.environ.get("GCP_JOB_NAME"), "location": os.environ.get("GCP_JOB_LOCATION", "us-central1"), "pid": pid, "queue_id": gcp_queue_id, "build_number": BUILD_NUMBER } log_job_status(job_id, { "job_status": "submitted", "job_id": job_id, "queue_id": gcp_queue_id, "process_id": pid, "response": response_obj }) return response_obj, 200 # Return 200 since it's a submission success except Exception as e: error_response = { "code": 500, "id": data.get("id"), "job_id": job_id, "message": f"GCP Cloud Run Job trigger failed: {str(e)}", "job_name": os.environ.get("GCP_JOB_NAME"), "location": os.environ.get("GCP_JOB_LOCATION", "us-central1"), "pid": pid, "queue_id": "gcp_job", "build_number": BUILD_NUMBER } log_job_status(job_id, { "job_status": "failed", "job_id": job_id, "queue_id": "gcp_job", "process_id": pid, "response": error_response }) return error_response, 500 elif bypass_queue or 'webhook_url' not in data: # Log job status as running immediately (bypassing queue) log_job_status(job_id, { "job_status": "running", "job_id": job_id, "queue_id": queue_id, "process_id": pid, "response": None }) response = f(job_id=job_id, data=data, *args, **kwargs) run_time = time.time() - start_time response_obj = { "code": response[2], "id": data.get("id"), "job_id": job_id, "response": response[0] if response[2] == 200 else None, "message": "success" if response[2] == 200 else response[0], "run_time": round(run_time, 3), "queue_time": 0, "total_time": round(run_time, 3), "pid": pid, "queue_id": queue_id, "queue_length": task_queue.qsize(), "build_number": BUILD_NUMBER # Add build number to response } # Log job status as done log_job_status(job_id, { "job_status": "done", "job_id": job_id, "queue_id": queue_id, "process_id": pid, "response": response_obj }) return response_obj, response[2] else: if MAX_QUEUE_LENGTH > 0 and task_queue.qsize() >= MAX_QUEUE_LENGTH: error_response = { "code": 429, "id": data.get("id"), "job_id": job_id, "message": f"MAX_QUEUE_LENGTH ({MAX_QUEUE_LENGTH}) reached", "pid": pid, "queue_id": queue_id, "queue_length": task_queue.qsize(), "build_number": BUILD_NUMBER # Add build number to response } # Log the queue overflow error log_job_status(job_id, { "job_status": "done", "job_id": job_id, "queue_id": queue_id, "process_id": pid, "response": error_response }) return error_response, 429 # Log job status as queued log_job_status(job_id, { "job_status": "queued", "job_id": job_id, "queue_id": queue_id, "process_id": pid, "response": None }) task_queue.put((job_id, data, lambda: f(job_id=job_id, data=data, *args, **kwargs), start_time)) return { "code": 202, "id": data.get("id"), "job_id": job_id, "message": "processing", "pid": pid, "queue_id": queue_id, "max_queue_length": MAX_QUEUE_LENGTH if MAX_QUEUE_LENGTH > 0 else "unlimited", "queue_length": task_queue.qsize(), "build_number": BUILD_NUMBER # Add build number to response }, 202 return wrapper return decorator app.queue_task = queue_task # Register special route for Next.js root asset paths first from routes.v1.media.feedback import create_root_next_routes create_root_next_routes(app) # Use the discover_and_register_blueprints function to register all blueprints discover_and_register_blueprints(app) return app app = create_app() if __name__ == '__main__': app.run(host='0.0.0.0', port=8080)