| from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify, send_from_directory |
| import cv2 |
| import numpy as np |
| from unstructured.partition.pdf import partition_pdf |
| import json |
| import base64 |
| import io |
| import os |
| from PIL import Image, ImageEnhance, ImageDraw |
| from imutils.perspective import four_point_transform |
| from dotenv import load_dotenv |
| import pytesseract |
| from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq |
| from langchain_community.document_loaders.image_captions import ImageCaptionLoader |
| from werkzeug.utils import secure_filename |
| import tempfile |
| import torch |
| from langchain_groq import ChatGroq |
| from langgraph.prebuilt import create_react_agent |
| import logging, time |
|
|
| |
| logging.basicConfig( |
| level=logging.DEBUG, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| handlers=[ |
| logging.FileHandler("app.log"), |
| logging.StreamHandler() |
| ] |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| load_dotenv() |
| |
| groq_api_key = os.getenv("GROQ_API_KEY") |
|
|
| llm = ChatGroq( |
| model="meta-llama/llama-4-maverick-17b-128e-instruct", |
| temperature=0, |
| max_tokens=None, |
| ) |
|
|
| app = Flask(__name__) |
|
|
| pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" |
| poppler_path = r"C:\poppler-23.11.0\Library\bin" |
|
|
| count = 0 |
|
|
| OUTPUT_FOLDER = "OUTPUTS" |
| DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE") |
| IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") |
| JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") |
|
|
| for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: |
| os.makedirs(path, exist_ok=True) |
|
|
| |
| try: |
| smolvlm256m_processor = AutoProcessor.from_pretrained( |
| "HuggingFaceTB/SmolVLM-256M-Instruct") |
| |
| smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( |
| "HuggingFaceTB/SmolVLM-256M-Instruct", |
| torch_dtype=torch.bfloat16 if hasattr( |
| torch, "bfloat16") else torch.float32, |
| _attn_implementation="eager" |
| ).to("cpu") |
| except Exception as e: |
| raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}") |
|
|
| |
|
|
|
|
| def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: |
| try: |
| |
| if "<image>" not in prompt: |
| prompt = f"<image> {prompt.strip()}" |
|
|
| num_image_tokens = prompt.count("<image>") |
| if num_image_tokens != 1: |
| raise ValueError( |
| f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}") |
|
|
| inputs = smolvlm256m_processor( |
| images=[image], text=[prompt], return_tensors="pt").to("cpu") |
| output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) |
| return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) |
| except Exception as e: |
| return f"❌ Error during caption generation: {str(e)}" |
|
|
| |
|
|
|
|
| def extract_images_from_pdf(pdf_path, output_json_path): |
| ''' Extract images from PDF and generate structured sprite JSON ''' |
|
|
| try: |
| pdf_filename = os.path.splitext(os.path.basename(pdf_path))[ |
| 0] |
| pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") |
|
|
| |
| extracted_image_subdir = os.path.join( |
| DETECTED_IMAGE_FOLDER_PATH, pdf_filename) |
| json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) |
| os.makedirs(extracted_image_subdir, exist_ok=True) |
| os.makedirs(json_subdir, exist_ok=True) |
|
|
| |
| output_json_path = os.path.join(json_subdir, "extracted.json") |
| final_json_path = os.path.join(json_subdir, "extracted_sprites.json") |
|
|
| try: |
| elements = partition_pdf( |
| filename=pdf_path, |
| strategy="hi_res", |
| extract_image_block_types=["Image"], |
| extract_image_block_to_payload=True, |
| ) |
| except Exception as e: |
| raise RuntimeError( |
| f"❌ Failed to extract images from PDF: {str(e)}") |
|
|
| try: |
| start_time = time.perf_counter() |
| with open(output_json_path, "w") as f: |
| json.dump([element.to_dict() |
| for element in elements], f, indent=4) |
| elapsed = time.perf_counter() - start_time |
| logger.info(f"✅ extracted.json write in {elapsed:.2f} seconds") |
| except Exception as e: |
| raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") |
|
|
| try: |
| |
| with open(output_json_path, 'r') as file: |
| file_elements = json.load(file) |
| except Exception as e: |
| raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") |
|
|
| |
| manipulated_json = {} |
|
|
| |
| system_prompt = """ |
| You are an expert in visual scene understanding. |
| Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. |
| |
| Guidelines: |
| - Focus only the images given in Square Shape. |
| - Don't Consider Blank areas in Image as. |
| - Don't include generic summary or explanation outside the fields. |
| Return only string. |
| """ |
|
|
| agent = create_react_agent( |
| model=llm, |
| tools=[], |
| prompt=system_prompt |
| ) |
|
|
| |
| if os.path.exists(final_json_path): |
| with open(final_json_path, "r") as existing_file: |
| manipulated = json.load(existing_file) |
| |
| existing_keys = [int(k.replace("Sprite ", "")) |
| for k in manipulated.keys()] |
| start_count = max(existing_keys, default=0) + 1 |
| else: |
| start_count = 1 |
|
|
| sprite_count = start_count |
| start_time = time.perf_counter() |
| for i, element in enumerate(file_elements): |
| if "image_base64" in element["metadata"]: |
| try: |
| image_data = base64.b64decode( |
| element["metadata"]["image_base64"]) |
| image = Image.open(io.BytesIO(image_data)).convert("RGB") |
| image.show(title=f"Extracted Image {i+1}") |
| image_path = os.path.join( |
| extracted_image_subdir, f"Sprite_{i+1}.png") |
| image.save(image_path) |
| with open(image_path, "rb") as image_file: |
| image_bytes = image_file.read() |
| img_base64 = base64.b64encode(image_bytes).decode("utf-8") |
| |
| |
|
|
| def clean_caption_output(raw_output: str, prompt: str) -> str: |
| answer = raw_output.replace(prompt, '').replace( |
| "<image>", '').strip(" :-\n") |
| return answer |
|
|
| prompt_description = "Give a brief Captioning." |
| prompt_name = "give a short name caption of this Image." |
|
|
| content1 = [ |
| { |
| "type": "text", |
| "text": f"{prompt_description}" |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/jpeg;base64,{img_base64}" |
| } |
| } |
| ] |
| response1 = agent.invoke( |
| {"messages": [{"role": "user", "content": content1}]}) |
| print(response1) |
| description = response1["messages"][-1].content |
|
|
| content2 = [ |
| { |
| "type": "text", |
| "text": f"{prompt_name}" |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/jpeg;base64,{img_base64}" |
| } |
| } |
| ] |
|
|
| response2 = agent.invoke( |
| {"messages": [{"role": "user", "content": content2}]}) |
| print(response2) |
| name = response2["messages"][-1].content |
|
|
| |
| |
|
|
| |
| |
|
|
| manipulated_json[f"Sprite {sprite_count}"] = { |
| "name": name, |
| "base64": element["metadata"]["image_base64"], |
| "file-path": pdf_dir_path, |
| "description": description |
| } |
| sprite_count += 1 |
| except Exception as e: |
| print(f"⚠️ Error processing Sprite {i+1}: {str(e)}") |
| elapsed = time.perf_counter() - start_time |
| logger.info(f"✅ extracted_sprites.json write in {elapsed:.2f} seconds") |
| |
| |
| with open(final_json_path, "w") as sprite_file: |
| json.dump(manipulated_json, sprite_file, indent=4) |
|
|
| print(f"✅ Manipulated sprite JSON saved: {final_json_path}") |
| return final_json_path, manipulated_json |
|
|
| except Exception as e: |
| raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") |
|
|
|
|
| def similarity_matching(input_json_path: str) -> str: |
| import uuid |
| import shutil |
| import tempfile |
| from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings |
| from matplotlib.offsetbox import OffsetImage, AnnotationBbox |
| from io import BytesIO |
|
|
| logger.info("🔍 Running similarity matching...") |
|
|
| |
| |
| |
| backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops") |
| sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites") |
| image_dirs = [backdrop_images_path, sprite_images_path] |
|
|
| |
| |
| |
| random_id = str(uuid.uuid4()).replace('-', '') |
| project_folder = os.path.join("outputs", f"project_{random_id}") |
|
|
| |
| |
| |
| os.makedirs(project_folder, exist_ok=True) |
| project_json_path = os.path.join(project_folder, "project.json") |
|
|
| |
| |
| |
| with open(input_json_path, 'r') as f: |
| sprites_data = json.load(f) |
|
|
| sprite_ids, texts, sprite_base64 = [], [], [] |
| start_time = time.perf_counter() |
| for sid, sprite in sprites_data.items(): |
| sprite_ids.append(sid) |
| texts.append( |
| "This is " + sprite.get("description", sprite.get("name", ""))) |
| sprite_base64.append(sprite["base64"]) |
| elapsed = time.perf_counter() - start_time |
| logger.info(f"✅ Append Sprite's Name and Description in {elapsed:.2f} seconds") |
| |
| |
| |
| |
| clip_embd = OpenCLIPEmbeddings() |
|
|
| |
| |
| |
| folder_image_paths = [] |
| for image_dir in image_dirs: |
| for root, _, files in os.walk(image_dir): |
| for fname in files: |
| if fname.lower().endswith((".png", ".jpg", ".jpeg")): |
| folder_image_paths.append(os.path.join(root, fname)) |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| temp_dir = tempfile.mkdtemp() |
| sprite_image_paths = [] |
| start_time = time.perf_counter() |
| for idx, b64 in enumerate(sprite_base64): |
| image_data = base64.b64decode(b64.split(",")[-1]) |
| img = Image.open(BytesIO(image_data)).convert("RGB") |
| temp_path = os.path.join(temp_dir, f"sprite_{idx}.png") |
| img.save(temp_path) |
| sprite_image_paths.append(temp_path) |
| elapsed = time.perf_counter() - start_time |
| logger.info(f"✅ Decoded Sprite Base64 in {elapsed:.2f} seconds") |
| |
| |
| |
| |
| sprite_features = clip_embd.embed_image(sprite_image_paths) |
|
|
| |
| |
| |
| with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f: |
| embedding_json = json.load(f) |
| |
| img_matrix = np.array([img["embeddings"] for img in embedding_json]) |
| sprite_matrix = np.array(sprite_features) |
|
|
| if sprite_matrix.size == 0 or img_matrix.size == 0: |
| raise RuntimeError("❌ No valid embeddings found for sprites or reference images.") |
| |
| try: |
| similarity = np.matmul(sprite_matrix, img_matrix.T) |
| except ValueError as ve: |
| if "matmul" in str(ve) and "size" in str(ve): |
| logger.error("❌ Matrix multiplication failed due to shape mismatch. Likely due to empty or invalid embeddings.") |
| raise RuntimeError("Matrix shape mismatch: CLIP embedding input is invalid or empty.") |
| else: |
| raise |
| most_similar_indices = np.argmax(similarity, axis=1) |
|
|
| |
| project_data = [] |
| copied_folders = set() |
|
|
| |
| |
| |
| |
| |
| for sprite_idx, matched_idx in enumerate(most_similar_indices): |
| matched_image_path = folder_image_paths[matched_idx] |
| matched_image_path = os.path.normpath(matched_image_path) |
|
|
| matched_folder = os.path.dirname(matched_image_path) |
| folder_name = os.path.basename(matched_folder) |
|
|
| if matched_folder in copied_folders: |
| continue |
| copied_folders.add(matched_folder) |
| logger.info(f"Matched image path: {matched_image_path}") |
|
|
| sprite_json_path = os.path.join(matched_folder, 'sprite.json') |
| if not os.path.exists(sprite_json_path): |
| logger.warning(f"sprite.json not found in: {matched_folder}") |
| continue |
|
|
| with open(sprite_json_path, 'r') as f: |
| sprite_data = json.load(f) |
| print(f"SPRITE DATA: \n{sprite_data}") |
| |
| for fname in os.listdir(matched_folder): |
| fpath = os.path.join(matched_folder, fname) |
| if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'sprite.json'}: |
| shutil.copy2(fpath, os.path.join(project_folder, fname)) |
| logger.info(f"Copied Sprite asset: {fname}") |
| project_data.append(sprite_data) |
| |
| |
| |
| |
| |
| |
| backdrop_data = [] |
|
|
| for backdrop_idx, matched_idx in enumerate(most_similar_indices): |
| matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) |
|
|
| |
| if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): |
| matched_folder = os.path.dirname(matched_image_path) |
| folder_name = os.path.basename(matched_folder) |
|
|
| logger.info(f"Backdrop matched image: {matched_image_path}") |
|
|
| |
| for fname in os.listdir(matched_folder): |
| fpath = os.path.join(matched_folder, fname) |
| if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'project.json'}: |
| shutil.copy2(fpath, os.path.join(project_folder, fname)) |
| logger.info(f"Copied Backdrop asset: {fname}") |
|
|
| |
| backdrop_json_path = os.path.join(matched_folder, 'project.json') |
| if os.path.exists(backdrop_json_path): |
| with open(backdrop_json_path, 'r') as f: |
| backdrop_json_data = json.load(f) |
| print(f"SPRITE DATA: \n{backdrop_json_data}") |
| if "targets" in backdrop_json_data: |
| for target in backdrop_json_data["targets"]: |
| if target.get("isStage") == True: |
| backdrop_data.append(target) |
| else: |
| logger.warning(f"project.json not found in: {matched_folder}") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| final_project = { |
| "targets": [], |
| "monitors": [], |
| "extensions": [], |
| "meta": { |
| "semver": "3.0.0", |
| "vm": "11.3.0", |
| "agent": "OpenAI ScratchVision Agent" |
| } |
| } |
| start_time = time.perf_counter() |
| for sprite in project_data: |
| if not sprite.get("isStage", False): |
| final_project["targets"].append(sprite) |
| elapsed = time.perf_counter() - start_time |
| logger.info(f"✅ Append sprite 'targets' in {elapsed:.2f} seconds") |
| |
| if backdrop_data: |
| all_costumes, sounds = [], [] |
| for idx, bd in enumerate(backdrop_data): |
| all_costumes.extend(bd.get("costumes", [])) |
| if idx == 0 and "sounds" in bd: |
| sounds = bd["sounds"] |
| final_project["targets"].append({ |
| "isStage": True, |
| "name": "Stage", |
| "variables": {}, |
| "lists": {}, |
| "broadcasts": {}, |
| "blocks": {}, |
| "comments": {}, |
| "currentCostume": 1 if len(all_costumes) > 1 else 0, |
| "costumes": all_costumes, |
| "sounds": sounds, |
| "volume": 100, |
| "layerOrder": 0, |
| "tempo": 60, |
| "videoTransparency": 50, |
| "videoState": "on", |
| "textToSpeechLanguage": None |
| }) |
|
|
| with open(project_json_path, 'w') as f: |
| json.dump(final_project, f, indent=2) |
|
|
| logger.info(f"🎉 Final project saved: {project_json_path}") |
| return project_json_path |
|
|
|
|
| @app.route('/') |
| def index(): |
| return render_template('app_index.html') |
|
|
| |
| @app.route('/process_pdf', methods=['POST']) |
| def process_pdf(): |
| try: |
| logger.info("Received request to process PDF.") |
| if 'pdf_file' not in request.files: |
| logger.warning("No PDF file found in request.") |
| return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 |
|
|
| pdf_file = request.files['pdf_file'] |
| if pdf_file.filename == '': |
| return jsonify({"error": "Empty filename"}), 400 |
|
|
| |
| filename = secure_filename(pdf_file.filename) |
| temp_dir = tempfile.mkdtemp() |
| saved_pdf_path = os.path.join(temp_dir, filename) |
| pdf_file.save(saved_pdf_path) |
|
|
| logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") |
|
|
| |
| json_path = None |
| |
| |
|
|
| |
| logger.info("Received request to process PDF.") |
|
|
| return jsonify({ |
| "message": "✅ PDF processed successfully", |
| "output_json": "output_path", |
| "sprites": "result", |
| "project_output_json": "project_output", |
| "test_url":r"https://prthm11-scratch-vision-game.hf.space/download_sb3/Event_test" |
| }) |
| except Exception as e: |
| logger.exception("❌ Failed to process PDF") |
| return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500 |
|
|
| |
| @app.route("/download_sb3/<project_id>", methods=["GET"]) |
| def download_sb3(project_id): |
| """ |
| Allows users to download the generated .sb3 Scratch project file. |
| """ |
| sb3_filename = f"{project_id}.sb3" |
| sb3_filepath = os.path.join("game_samples", sb3_filename) |
|
|
| try: |
| if os.path.exists(sb3_filepath): |
| logger.info(f"Serving SB3 file for project ID: {project_id}") |
| |
| return send_from_directory( |
| directory="game_samples", |
| path=sb3_filename, |
| as_attachment=True, |
| download_name=sb3_filename |
| ) |
| else: |
| logger.warning(f"SB3 file not found for ID: {project_id}") |
| return jsonify({"error": "Scratch project file not found"}), 404 |
| except Exception as e: |
| logger.error(f"Error serving SB3 file for ID {project_id}: {e}") |
| return jsonify({"error": "Failed to retrieve Scratch project file"}), 500 |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=True) |