Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| os.environ["PLAYWRIGHT_BROWSERS_PATH"] = "/home/user/.cache/ms-playwright" | |
| import logging | |
| from dotenv import load_dotenv | |
| import io | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import json | |
| import requests | |
| import uuid | |
| import importlib | |
| import io | |
| import boto3 | |
| from flask import Flask, request, jsonify, send_file | |
| import numpy as np | |
| from app.util.gen_ai_base import GenAIBaseClient | |
| from app.util.browser_agent import BrowserAgent | |
| from app.util.db_utils import DBManager | |
| from app.util.passport_photo_engine.haar_face_detector import HaarFaceDetector | |
| from app.util.passport_photo_engine.manual_face_extractor import ManualFaceExtractor | |
| from app.util.passport_photo_engine.passport_cropper import PassportCropper | |
| from app.util.passport_photo_engine.segmenter_rmbg import SegmenterRMBG | |
| from app.util.parameter_utils import init_secret | |
| import sys | |
| sys.stdout.reconfigure(line_buffering=True) | |
| API = "https://api-dev.spun.global" | |
| print("--- Loading Passport AI Models (This happens once) ---") | |
| passport_models = { | |
| "segmenter": SegmenterRMBG(), # Heavy model (GPU/CPU) | |
| "detector": HaarFaceDetector(), # Fast model | |
| "extractor": ManualFaceExtractor() | |
| } | |
| print("--- Passport Models Ready ---") | |
| PASSPORT_COLORS = { | |
| "white": (255, 255, 255), | |
| "id_red": (0, 0, 219), | |
| "id_blue": (219, 0, 0), | |
| "light_blue": (235, 206, 135) | |
| } | |
| def create_app() -> Flask: | |
| load_dotenv() | |
| app = Flask(__name__) | |
| # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| try: | |
| aws_param_path = os.getenv("AWS_PARAMETER_STORE_PATH") | |
| print("AWS PARAM PATH:", aws_param_path) | |
| if aws_param_path: | |
| init_secret(aws_param_path) | |
| logging.info("Secrets loaded from AWS SSM Parameter Store.") | |
| except Exception as e: | |
| logging.exception(f"Could not load secrets from SSM: {e}") | |
| print("ENV VARS:", dict(os.environ)) | |
| GENERATOR_MAP = { | |
| "japan-multientry-tourist": ( | |
| "app.util.japan_multientry_visa_letter_generator", | |
| "JapanMultiEntryVisaLetterGenerator", | |
| ), | |
| "schengen": ( | |
| "app.util.schengen_visa_letter_generator", | |
| "SchengenVisaLetterGenerator", | |
| ), | |
| "sponsorship": ( | |
| "app.util.sponsorship_letter_generator", | |
| "SponsorshipLetterGenerator" | |
| ), | |
| "housewife-statement": ( | |
| "app.util.housewife_statement_letter_generator", | |
| "HousewifeStatementLetterGenerator" | |
| ), | |
| "passport-collection": ( | |
| "app.util.passport_collection_letter_generator", | |
| "PassportCollectionLetterGenerator" | |
| ) | |
| } | |
| WORKER_API_KEY = os.getenv("WORKER_API_KEY") | |
| def get_prefill_data(visa_type, application_id): | |
| """ | |
| Dynamically fetches and merges data for a specific visa type | |
| to pre-fill the frontend form. | |
| """ | |
| try: | |
| if visa_type.lower() not in GENERATOR_MAP: | |
| return jsonify({"error": f"Unsupported visa type: {visa_type}"}), 400 | |
| module_path, class_name = GENERATOR_MAP[visa_type.lower()] | |
| module = importlib.import_module(module_path) | |
| generator_class = getattr(module, class_name) | |
| # Instantiate the generator, passing the application_id | |
| letter_generator = generator_class(data={"application_id": application_id}) | |
| # Call the new public method to fetch and merge data | |
| # This method MUST exist on the generator class (see step 2) | |
| prefill_data = letter_generator.get_prefill_data() | |
| if not prefill_data: | |
| return jsonify({"error": "Failed to prepare data or no data found."}), 500 | |
| return jsonify(prefill_data), 200 | |
| except AttributeError as e: | |
| # This error happens if the class doesn't have "get_prefill_data" | |
| msg = f"Generator {class_name} does not implement 'get_prefill_data'. {e}" | |
| logging.error(msg) | |
| return jsonify({"error": msg}), 501 # 501 Not Implemented | |
| except Exception as e: | |
| logging.error(f"Error in /prefill/{visa_type}/{application_id}: {e}", exc_info=True) | |
| return jsonify({"error": str(e)}), 500 | |
| async def scrape(): | |
| try: | |
| raw = request.get_data(as_text=True) | |
| print("Raw body:", raw) | |
| body = json.loads(raw) | |
| logging.info(f"Headers: {dict(request.headers)}") | |
| logging.info(f"Raw data: {request.data}") | |
| url = body.get('url') | |
| max_depth = body.get('max_depth', 2) | |
| if not url: | |
| return jsonify({"error": "URL is required"}), 400 | |
| api_key = os.getenv("GOOGLE_AI_STUDIO_API_KEY") | |
| explorer = GenAIBaseClient(api_key=api_key) | |
| try: | |
| async with BrowserAgent(model=explorer, max_depth=max_depth) as agent: | |
| root_node = await agent.run(start_url=url) | |
| if not root_node: | |
| return jsonify({"error": "Exploration failed or returned no data"}), 500 | |
| except Exception as e: | |
| logging.error(f"Error during scraping: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| response_data = { | |
| "link_map": {href: node.model_dump() for href, node in agent.link_map.items()}, | |
| "token_usage": explorer.token_usage | |
| } | |
| headers = { | |
| "X-WORKER-Api-Key": WORKER_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| body = { | |
| "external_id": str(uuid.uuid4()), | |
| "history": response_data['link_map'], | |
| "smart_type": "data_extractor", | |
| "provider_type": "google_ai_studio", | |
| "total_input_amount": explorer.token_usage['input'], | |
| "total_output_amount": explorer.token_usage['output'] | |
| } | |
| try: | |
| record_response = requests.post( | |
| f"{API}/v1/smart/token/record-usage", | |
| headers=headers, | |
| json=body | |
| ) | |
| record_response.raise_for_status() # Raise exception for 4xx/5xx errors | |
| logging.info("Token usage recorded successfully.") | |
| except requests.exceptions.RequestException as e: | |
| # Log the error, but don't fail the whole request. | |
| # The user should still get their scrape data. | |
| logging.error(f"Failed to record token usage: {e}") | |
| return jsonify(response_data), 200 | |
| except json.JSONDecodeError as e: | |
| return jsonify({"error": f"Invalid JSON: {e}"}), 400 | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"error": str(e)}), 500 | |
| def generate_visa_letter(visa_type): | |
| """ | |
| Dynamically generates visa letters based on <visa_type>. | |
| Example: | |
| POST /generate/schengen | |
| POST /generate/japan-multientry-tourist | |
| POST /generate/sponsorship | |
| POST /generate/housewife-statement | |
| """ | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No JSON payload provided"}), 400 | |
| if visa_type.lower() not in GENERATOR_MAP: | |
| return jsonify({"error": f"Unsupported visa type: {visa_type}"}), 400 | |
| module_path, class_name = GENERATOR_MAP[visa_type.lower()] | |
| module = importlib.import_module(module_path) | |
| generator_class = getattr(module, class_name) | |
| letter_generator = generator_class(data) | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| pdf_filepath, error_msg = letter_generator.compile_pdf(tempdir) | |
| if error_msg: | |
| return jsonify({"error": error_msg}), 500 | |
| with open(pdf_filepath, "rb") as f: | |
| pdf_bytes = io.BytesIO(f.read()) | |
| pdf_bytes.seek(0) | |
| return send_file( | |
| pdf_bytes, | |
| mimetype="application/pdf", | |
| as_attachment=True, | |
| download_name=f"{visa_type.replace('-', '_')}_visa_letter.pdf", | |
| ) | |
| except Exception as e: | |
| print(f"Error in /generate/{visa_type}: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def generate_passport_visa(): | |
| data = request.get_json() | |
| bg_color_name = data.get('bg_color_name', 'white') | |
| output_size = data.get('output_size', (600, 800)) | |
| if not data.get("raw_photo"): | |
| return jsonify({"error": "Missing required field: 'raw_photo'"}), 400 | |
| response = requests.get(data['raw_photo'], stream=True) | |
| if response.status_code != 200: | |
| return jsonify({"error": f"Failed to download image from S3. Status: {response.status_code}"}), 400 | |
| try: | |
| # Read image | |
| in_memory_file = io.BytesIO(response.content) | |
| pil_image = Image.open(in_memory_file).convert("RGB") | |
| img_rgb = np.array(pil_image) | |
| # Get models | |
| seg = passport_models["segmenter"] | |
| det = passport_models["detector"] | |
| ext = passport_models["extractor"] | |
| # 1. Segment | |
| mask = seg.segment(img_rgb) | |
| # 2. Detect | |
| face_rect = det.detect(img_rgb) | |
| if face_rect is None: | |
| return jsonify({"error": "No face detected"}), 400 | |
| x, y, w, h = face_rect | |
| # 3. Angle | |
| info = ext.extract_face(img_rgb, mask) | |
| angle = info.get("chin_angle", 0.0) | |
| # 4. Process | |
| # TODO: adjust bg color and size based on visa requirements | |
| selected_bg = PASSPORT_COLORS.get(bg_color_name, (255, 255, 255)) | |
| cropper = PassportCropper(output_size=output_size, bg_color=selected_bg) | |
| img_clean = cropper.composite(img_rgb, mask) | |
| final_passport = cropper.crop_with_dynamic_zoom( | |
| img_clean, | |
| angle, | |
| (x, y, w, h), | |
| top_margin=0.4, | |
| bottom_margin=1 | |
| ) | |
| # Return result | |
| # Encode image to memory buffer | |
| is_success, buffer = cv2.imencode(".jpg", final_passport) | |
| if not is_success: | |
| return jsonify({"error": "Failed to encode image"}), 500 | |
| # Prepare for S3 Upload | |
| file_stream = io.BytesIO(buffer) | |
| s3_client = boto3.client('s3', | |
| aws_access_key_id=os.getenv("AWS_ACCESS_KEY"), | |
| aws_secret_access_key=os.getenv("AWS_SECRET_KEY"), | |
| region_name=os.getenv("AWS_REGION") | |
| ) | |
| # You should set this env var or hardcode your bucket | |
| S3_BUCKET_NAME = os.getenv("S3_PUBLIC_BUCKET", "spun-core-api-temp-development") | |
| file_key = f"generated-visa-photo/{uuid.uuid4()}.jpg" | |
| # Upload | |
| s3_client.upload_fileobj( | |
| file_stream, | |
| S3_BUCKET_NAME, | |
| file_key, | |
| ExtraArgs={'ContentType': 'image/jpeg'} | |
| ) | |
| s3_url = s3_client.generate_presigned_url( | |
| 'get_object', | |
| Params={'Bucket': S3_BUCKET_NAME, 'Key': file_key}, | |
| ExpiresIn=3600 | |
| ) | |
| return jsonify({ | |
| "status": "success", | |
| "url": s3_url | |
| }) | |
| except Exception as e: | |
| print(f"Passport Error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def photo_metadata(): | |
| """ | |
| Endpoint to get visa photo metadata per service. | |
| """ | |
| try: | |
| db_utils = DBManager() | |
| metadata = db_utils.get_visa_photo_metadata_per_service() | |
| return jsonify(metadata), 200 | |
| except Exception as e: | |
| logging.error(f"Error in /photo-metadata: {e}", exc_info=True) | |
| return jsonify({"error": str(e)}), | |
| def hello_world(): | |
| return "Flask server is running.", 200 | |
| return app | |
| app = create_app() | |
| if __name__ == '__main__': | |
| app.run(debug=True) |