Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import tempfile | |
| from dotenv import load_dotenv | |
| # Safe stdout/stderr wrapper to prevent OSError: [Errno 22] when stdout pipe is closed/unbuffered in background | |
| class SafeStream: | |
| def __init__(self, original_stream): | |
| self.original_stream = original_stream | |
| def write(self, data): | |
| try: | |
| if self.original_stream: | |
| self.original_stream.write(data) | |
| except OSError as e: | |
| if e.errno != 22: | |
| raise | |
| def flush(self): | |
| try: | |
| if self.original_stream: | |
| self.original_stream.flush() | |
| except OSError: | |
| pass | |
| def __getattr__(self, attr): | |
| return getattr(self.original_stream, attr) | |
| sys.stdout = SafeStream(sys.stdout) | |
| sys.stderr = SafeStream(sys.stderr) | |
| # Load .env variables (including HF_HOME and GROQ_API_KEY) before imports | |
| load_dotenv() | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| # Import modular components | |
| from config import Config | |
| from models.groq_client import GroqClient | |
| from models.clip_classifier import CLIPClassifier | |
| from models.tesseract_ocr import TesseractOCR | |
| from models.huggingface_models import HuggingFaceModels | |
| from services.groq_vision_classifier import GroqVisionScriptClassifier | |
| from services.script_detector import ScriptDetectionService | |
| from utils.image_utils import validate_image | |
| from utils.text_utils import clean_text | |
| from processors.cuneiform_processor import CuneiformProcessor | |
| from utils.gpu_diagnostics import log_gpu_info | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| # CORS — restrict origins in production via ALLOWED_ORIGINS env var | |
| # Example: ALLOWED_ORIGINS=https://your-frontend.vercel.app,https://custom-domain.com | |
| allowed_origins = os.getenv( | |
| "ALLOWED_ORIGINS", | |
| "http://localhost:3000,http://localhost:5173,http://localhost:5000" | |
| ) | |
| CORS(app, origins=allowed_origins.split(",")) | |
| # Global components | |
| config = Config() | |
| groq_client = None | |
| clip_classifier = None | |
| hf_models = None | |
| script_detector = None | |
| cuneiform_processor = None | |
| references = {} | |
| def load_references(): | |
| """Load references from JSON file""" | |
| global references | |
| try: | |
| import json | |
| with open(config.REFERENCES_PATH, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| references = { | |
| "egypt_symbol_notes": data.get("egypt_symbol_notes", {}), | |
| "greek_symbol_notes": data.get("greek_symbol_notes", {}), | |
| "greek_hint": data.get("greek_hint", "If no specific character note is found, treat as lexical marker considering diacriticals (breathing marks, accents, vowel quantity) which affect pronunciation, meaning, and grammatical function in ancient Greek texts."), | |
| "latin_symbol_notes": data.get("latin_symbol_notes", {}), | |
| "latin_hint": data.get("latin_hint", "If no specific character note is found, consider standard Latin letters or medieval scribal abbreviations."), | |
| # Cuneiform references | |
| "cuneiform_symbol_notes": data.get("cuneiform_symbol_notes", {}), | |
| "cuneiform_hint": data.get("cuneiform_hint", "Cuneiform signs represent syllables, words, or concepts in ancient Mesopotamian languages (Sumerian, Akkadian, etc.)") | |
| } | |
| print(f"[INFO] Loaded references from {config.REFERENCES_PATH}") | |
| except Exception as e: | |
| print(f"[WARN] Failed to load references: {e}") | |
| references = { | |
| "egypt_symbol_notes": {}, | |
| "greek_symbol_notes": {}, | |
| "greek_hint": "Possible Greek lexical marker.", | |
| "latin_symbol_notes": {}, | |
| "latin_hint": "Latin scribal practice.", | |
| "cuneiform_symbol_notes": {}, | |
| "cuneiform_hint": "Ancient cuneiform sign." | |
| } | |
| def initialize_components(): | |
| """Initialize lightweight component wrappers synchronously. | |
| No heavy model weights are loaded here — all ML models use lazy loading | |
| and will download/load on their first inference call. This ensures the | |
| app starts instantly on resource-constrained environments like HF Spaces. | |
| """ | |
| global groq_client, clip_classifier, hf_models, script_detector, cuneiform_processor | |
| import time as _time | |
| _t0 = _time.time() | |
| print("[INIT] Initializing components (lazy loading — no model weights loaded yet)...", flush=True) | |
| # Log GPU Diagnostics | |
| log_gpu_info() | |
| # Load references (small JSON file, instant) | |
| load_references() | |
| # Groq client (API key check only, no model download) | |
| groq_client = GroqClient() | |
| groq_status = "ready" if groq_client.is_available() else "unavailable" | |
| print(f"[INIT] Groq client: {groq_status}", flush=True) | |
| # CLIP classifier (lazy — model loads on first classify call) | |
| clip_classifier = CLIPClassifier() | |
| # HF Translator (lazy — model loads on first translate call) | |
| hf_models = HuggingFaceModels() | |
| # Cuneiform processor (lazy — CLIP & translator load on first use) | |
| try: | |
| cuneiform_processor = CuneiformProcessor( | |
| groq_client=groq_client, | |
| references=references, | |
| clip_classifier=clip_classifier | |
| ) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to create cuneiform processor: {e}", flush=True) | |
| cuneiform_processor = None | |
| # Script detection service (creates processor instances, all lazy) | |
| script_detector = ScriptDetectionService( | |
| groq_client=groq_client, | |
| references=references, | |
| clip_classifier=clip_classifier, | |
| translator_pipe=hf_models.get_translator(), | |
| cuneiform_processor=cuneiform_processor | |
| ) | |
| print(f"[INIT] All components ready in {_time.time()-_t0:.1f}s (models will load on first request)", flush=True) | |
| def analyze(): | |
| """Main analysis endpoint with Groq Vision classification""" | |
| tmp_path = None | |
| try: | |
| # Validate request | |
| if 'image' not in request.files: | |
| return jsonify({"error": "No image uploaded"}), 400 | |
| img_file = request.files['image'] | |
| if img_file.filename == '': | |
| return jsonify({"error": "Empty filename"}), 400 | |
| # Validate image file | |
| try: | |
| validate_image(img_file) | |
| except ValueError as e: | |
| return jsonify({"error": str(e)}), 400 | |
| # Save temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: | |
| tmp_path = tmp.name | |
| img_file.save(tmp_path) | |
| # Process image with Groq Vision classification | |
| result = script_detector.detect_and_process(tmp_path) | |
| if not result: | |
| return jsonify({"error": "Could not process image"}), 500 | |
| # Get Vision classification info | |
| vision_classification = result.get('vision_classification', 'unknown') | |
| classification_method = result.get('classification_method', 'unknown') | |
| classification_confidence = result.get('classification_confidence', 0.0) | |
| script_type = result.get('script_type', 'egyptian') | |
| # Base response with Vision classification info | |
| base_response = { | |
| "script_type": script_type, | |
| "vision_classification": vision_classification, | |
| "classification_method": classification_method, | |
| "classification_confidence": classification_confidence, | |
| "confidence": result.get('confidence', 0.0), | |
| "historical_context": result.get('historical_context', {}), | |
| "creative_story": result.get('creative_story', ''), | |
| "model_used": "llama-3.2-90b-vision-preview" | |
| } | |
| # Handle cuneiform processing | |
| if script_type == 'cuneiform': | |
| if not cuneiform_processor or not cuneiform_processor.cuneiform_available: | |
| return jsonify({ | |
| **base_response, | |
| "error": "Cuneiform processing unavailable", | |
| "labels": [], | |
| "gardiner_codes": [], | |
| "translation": "Cuneiform translation model not available", | |
| "translation_ok": False | |
| }), 200 | |
| try: | |
| # Process cuneiform text | |
| processed_result = result.get('processed_result', {}) | |
| cuneiform_text = processed_result.get('text', '') | |
| # Translate cuneiform to English | |
| translation = "" | |
| translation_ok = False | |
| if cuneiform_text and len(cuneiform_text.strip()) > 2: | |
| print(f"[INFO] Translating cuneiform: {cuneiform_text[:50]}...") | |
| translation = cuneiform_processor.translate_cuneiform(cuneiform_text) | |
| translation_ok = bool(translation and not translation.startswith("Error")) | |
| else: | |
| translation = "No readable cuneiform text extracted" | |
| # Build cuneiform response | |
| response_data = { | |
| **base_response, | |
| "labels": [], | |
| "gardiner_codes": [], | |
| "translation": translation, | |
| "translation_ok": translation_ok, | |
| "cuneiform_text": cuneiform_text, | |
| "validation": { | |
| "quality_score": processed_result.get('validation', {}).get('quality_score', 0.0), | |
| "cuneiform_ratio": processed_result.get('validation', {}).get('cuneiform_ratio', 0.0), | |
| "atf_ratio": processed_result.get('validation', {}).get('atf_ratio', 0.0), | |
| "char_analysis": processed_result.get('char_analysis', {}), | |
| "ocr_method": "praeclarum/cuneiform (T5-based translation)", | |
| "supports_translation": True, | |
| "input_format": processed_result.get('char_analysis', {}).get('text_format', 'Unknown') | |
| } | |
| } | |
| return jsonify(response_data) | |
| except Exception as e: | |
| print(f"[ERROR] Cuneiform processing failed: {e}") | |
| return jsonify({ | |
| **base_response, | |
| "error": f"Cuneiform processing error: {str(e)}", | |
| "labels": [], | |
| "gardiner_codes": [], | |
| "translation": "Cuneiform processing failed", | |
| "translation_ok": False | |
| }), 200 | |
| elif script_type in ['greek', 'latin']: | |
| processed_result = result.get('processed_result', {}) | |
| validation = processed_result.get('validation', {}) | |
| response_data = { | |
| **base_response, | |
| "labels": [], | |
| "gardiner_codes": [], | |
| "translation": processed_result.get('text', ''), | |
| "translation_ok": True, | |
| } | |
| # Add enhanced validation info for Greek | |
| if script_type == 'greek': | |
| response_data["validation"] = { | |
| "quality_score": validation.get('quality_score', 0.0), | |
| "greek_ratio": validation.get('greek_ratio', 0.0), | |
| "has_polytonic": validation.get('has_polytonic', False), | |
| "char_analysis": processed_result.get('char_analysis', {}), | |
| "ocr_method": "ancient_greek_ocr" if validation.get('quality_score', 0) > 0.7 else "standard_greek_ocr" | |
| } | |
| elif script_type == 'latin': | |
| response_data["validation"] = { | |
| "quality_score": validation.get('quality_score', 0.0), | |
| "latin_ratio": validation.get('latin_ratio', 0.0), | |
| "trocr_used": validation.get('tridis_used', False) or any(m in validation.get('ocr_method', '') for m in ['tridis', 'trocr-base-printed']), | |
| "char_analysis": processed_result.get('char_analysis', {}), | |
| "ocr_method": validation.get('ocr_method', 'standard_latin_ocr'), | |
| "writing_style": validation.get('writing_style', 'cursive') | |
| } | |
| return jsonify(response_data) | |
| else: # Egyptian | |
| processed = result['processed_result'] | |
| return jsonify({ | |
| **base_response, | |
| "labels": processed['labels'], | |
| "gardiner_codes": processed['codes'], | |
| "translation": processed['translation'], | |
| "translation_ok": processed['translation_ok'] | |
| }) | |
| except Exception as e: | |
| print(f"[ERROR] Analysis failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"error": "Processing failed"}), 500 | |
| finally: | |
| # Cleanup temporary file | |
| if tmp_path: | |
| try: | |
| os.remove(tmp_path) | |
| except Exception: | |
| pass | |
| def chat(): | |
| """Chatbot endpoint for manuscript queries and general dialogue""" | |
| try: | |
| data = request.get_json() or {} | |
| message = data.get("message", "") | |
| history = data.get("history", []) | |
| context = data.get("context", "") | |
| if not message: | |
| return jsonify({"error": "Message is required"}), 400 | |
| system_prompt = ( | |
| "You are DecipherAI's helpful historical assistant. You are an expert paleographer and historian.\n" | |
| "Answer the user's questions about ancient scripts, translations, and history in a helpful, " | |
| "academic yet accessible manner. Cite historical sources when appropriate." | |
| ) | |
| if context: | |
| system_prompt += f"\n\nHere is the context of the current manuscript translation:\n{context}" | |
| if not groq_client or not groq_client.is_available(): | |
| reply = ( | |
| f"Thank you for your question: '{message}'. I'm currently running in offline fallback mode " | |
| f"because the Groq API key is not set. Once configured, I will be able to answer all your " | |
| f"scholarly questions about the translated scripts, historical context, and paleography in real time!" | |
| ) | |
| else: | |
| prompt = "" | |
| for turn in history[-5:]: | |
| role = turn.get("role", "user") | |
| content = turn.get("content", "") | |
| prompt += f"{role.upper()}: {content}\n" | |
| prompt += f"USER: {message}\nASSISTANT:" | |
| reply = groq_client.generate_response( | |
| system_prompt=system_prompt, | |
| user_prompt=prompt, | |
| max_tokens=500 | |
| ) or "I'm sorry, I encountered an error generating a response." | |
| return jsonify({"reply": reply}) | |
| except Exception as e: | |
| print(f"[ERROR] Chat failed: {e}") | |
| return jsonify({"error": "Failed to process chat message"}), 500 | |
| def health_check(): | |
| """Health check endpoint — app is always ready, models load lazily on demand""" | |
| models_loaded = { | |
| "groq": groq_client.is_available() if groq_client else False, | |
| "clip": clip_classifier.is_loaded if clip_classifier else False, | |
| "translator": hf_models is not None if hf_models else False, | |
| "cuneiform": cuneiform_processor is not None if cuneiform_processor else False, | |
| "script_detector": script_detector is not None | |
| } | |
| return jsonify({ | |
| "status": "healthy", | |
| "architecture": "lazy_loading", | |
| "models_loaded": models_loaded | |
| }) | |
| def info(): | |
| """Information endpoint""" | |
| return jsonify({ | |
| "app": "Ancient Script Recognition System", | |
| "version": "2.1.0", | |
| "supported_scripts": [ | |
| "Egyptian Hieroglyphs", | |
| "Ancient Greek", | |
| "Latin", | |
| "Ancient Cuneiform" | |
| ], | |
| "features": [ | |
| "Multi-script detection", | |
| "OCR text extraction", | |
| "Historical context generation", | |
| "Creative story generation", | |
| "Cuneiform translation (Sumerian/Akkadian → English)" | |
| ] | |
| }) | |
| # --- Component initialization --- | |
| # Lightweight init runs synchronously at module level. No heavy model weights | |
| # are loaded here — all ML models use lazy loading on first inference call. | |
| def _auto_initialize(): | |
| """Initialize components when running under a WSGI server (gunicorn, waitress, etc.)""" | |
| if os.getenv("WERKZEUG_RUN_MAIN") == "true": | |
| # Flask reloader child process — handled by __main__ block | |
| return | |
| print("[INIT] WSGI server detected — initializing components...", flush=True) | |
| initialize_components() | |
| if __name__ == "__main__": | |
| print("[INIT] Starting Ancient Script Recognition System (lazy loading)...", flush=True) | |
| # Start Flask app | |
| port = int(os.getenv("PORT", 7860)) | |
| debug = os.getenv("DEBUG", "False").lower() == "true" | |
| # Initialize lightweight components (only in child process if debug mode is on) | |
| if not debug or os.environ.get("WERKZEUG_RUN_MAIN") == "true": | |
| initialize_components() | |
| else: | |
| print("[INFO] Reloader active. Component initialization deferred to child process.") | |
| print(f"[INFO] Starting server on port {port}", flush=True) | |
| app.run(host="0.0.0.0", port=port, debug=debug) | |
| else: | |
| # Running under gunicorn / WSGI | |
| _auto_initialize() | |