| from flask import Flask, request, jsonify, render_template, send_from_directory, send_file |
| import cv2, json,base64,io,os,tempfile,logging, re |
| import numpy as np |
| from unstructured.partition.pdf import partition_pdf |
| from PIL import Image, ImageOps, ImageEnhance |
| from dotenv import load_dotenv |
| from werkzeug.utils import secure_filename |
| from langchain_groq import ChatGroq |
| from langgraph.prebuilt import create_react_agent |
| from pdf2image import convert_from_path, convert_from_bytes |
| from typing import Dict, TypedDict, Optional, Any, List, Tuple |
| from collections import defaultdict |
| from langgraph.graph import StateGraph, END |
| import uuid |
| import shutil, time, functools |
| from io import BytesIO |
| from pathlib import Path |
| from utils.block_relation_builder import block_builder, separate_scripts, transform_logic_to_action_flow, analyze_opcode_counts, process_text |
| from difflib import get_close_matches |
| import torch |
| from transformers import AutoImageProcessor, AutoModel |
| import torch |
| import json |
| import cv2 |
| from imagededup.methods import PHash |
| from image_match.goldberg import ImageSignature |
| import sys |
| import math |
| import hashlib |
|
|
| |
| DINOV2_MODEL = "facebook/dinov2-small" |
|
|
| |
| MAX_PHASH_BITS = 64 |
|
|
| |
| |
| |
| print("Initializing models and helpers...") |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| if DEVICE.type == "cpu": |
| torch.set_num_threads(4) |
|
|
| dinov2_processor = AutoImageProcessor.from_pretrained(DINOV2_MODEL) |
| dinov2_model = AutoModel.from_pretrained(DINOV2_MODEL) |
| dinov2_model.to(DEVICE) |
| dinov2_model.eval() |
|
|
| phash = PHash() |
| gis = ImageSignature() |
|
|
| load_dotenv() |
| |
| groq_api_key = os.getenv("GROQ_API_KEY") |
|
|
| llm = ChatGroq( |
| model="meta-llama/llama-4-scout-17b-16e-instruct", |
| temperature=0, |
| max_tokens=None, |
| ) |
|
|
| app = Flask(__name__) |
|
|
| backdrop_images_path = r"app\blocks\Backdrops" |
| sprite_images_path = r"app\blocks\sprites" |
| code_blocks_image_path = r"app\blocks\code_blocks" |
|
|
| count = 0 |
|
|
| from pathlib import Path |
| BASE_DIR = Path(os.getenv("APP_BASE_DIR", Path(__file__).resolve().parent)) |
| BASE_DIR = Path("/app") |
| LOGS_DIR = Path(os.getenv("LOGS_DIR", "/tmp/logs")).resolve() |
| LOGS_DIR.mkdir(parents=True, exist_ok=True) |
| |
| GEN_PROJECT_DIR = BASE_DIR / "generated_projects" |
| |
| BLOCKS_DIR = BASE_DIR / "blocks" |
| BACKDROP_DIR = BLOCKS_DIR / "Backdrops" |
| SPRITE_DIR = BLOCKS_DIR / "sprites" |
| CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks" |
| SOUND_DIR = BLOCKS_DIR / "sound" |
|
|
| |
| |
|
|
| |
| |
| |
| |
| OUTPUT_DIR = BASE_DIR / "outputs" |
|
|
| |
| MODEL = None |
| FAISS_INDEX = None |
| IMAGE_PATHS = None |
|
|
| |
| for d in ( |
| BLOCKS_DIR, |
| |
| GEN_PROJECT_DIR, |
| BACKDROP_DIR, |
| SPRITE_DIR, |
| CODE_BLOCKS_DIR, |
| SOUND_DIR, |
| OUTPUT_DIR, |
| ): |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| def log_execution_time(func): |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| start_time = time.time() |
| result = func(*args, **kwargs) |
| end_time = time.time() |
| logger.info(f"⏱ {func.__name__} executed in {end_time - start_time:.2f} seconds") |
| return result |
| return wrapper |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| handlers=[ |
| logging.FileHandler(str(LOGS_DIR / "app.log")), |
| logging.StreamHandler() |
| ] |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| class GameState(TypedDict): |
| project_json: dict |
| description: str |
| project_id: str |
| project_image: str |
| pseudo_code: dict |
| action_plan: Optional[Dict] |
| temporary_node: Optional[Dict] |
| page_count: int |
| processing: bool |
| temp_pseudo_code: list |
|
|
| SYSTEM_PROMPT ="""Your task is to process OCR-extracted text from images of Scratch 3.0 code blocks and produce precisely formatted pseudocode JSON. |
| |
| ### Core Role |
| - Treat this as an OCR refinement task: the input may contain typos or spacing issues. |
| - Intelligently correct OCR mistakes to align with valid Scratch 3.0 block syntax. |
| |
| ### Universal Rules |
| 1. **Code Detection:** If no Scratch blocks are detected, the `pseudocode` value must be "No Code-blocks". |
| 2. **Script Ownership:** Determine the target from "Script for:". If it matches a `Stage_costumes` name, set `name_variable` to "Stage". |
| 3. **Pseudocode Structure:** |
| - The pseudocode must be a single JSON string with `\n` for newlines. |
| - Indent nested blocks with 4 spaces. |
| - Every script (hat block) and every C-block (if, repeat, forever) MUST have a corresponding `end` at the correct indentation level. |
| 4. **Formatting Syntax:** |
| - Numbers & Text: `(5)`, `(hello)` |
| - Variables & Dropdowns: `[score v]`, `[space v]` |
| - Reporters: `((x position))` |
| - Booleans: `<condition>` |
| 5. **Final Output:** Your response must ONLY be the valid JSON object and nothing else.""" |
|
|
| SYSTEM_PROMPT_JSON_CORRECTOR = """ |
| You are a JSON correction assistant. Your ONLY task is to fix malformed JSON and return it in the correct format. |
| |
| REQUIRED OUTPUT FORMAT: |
| { |
| "refined_logic": { |
| "name_variable": "sprite_name_here", |
| "pseudocode": "pseudocode_string_here" |
| } |
| } |
| |
| RULES: |
| 1. Extract the sprite name and pseudocode from the input |
| 2. Return ONLY valid JSON in the exact format above |
| 3. No explanations, no extra text, no other fields |
| 4. If you can't find the data, use "Unknown" for name_variable and "No pseudocode found" for pseudocode |
| """ |
|
|
| |
| agent = create_react_agent( |
| model=llm, |
| tools=[], |
| prompt=SYSTEM_PROMPT |
| ) |
|
|
| agent_json_resolver = create_react_agent( |
| model=llm, |
| tools=[], |
| prompt=SYSTEM_PROMPT_JSON_CORRECTOR |
| ) |
|
|
| |
| |
| |
| def make_json_serializable(obj): |
| """Recursively convert numpy and other objects into JSON-serializable types.""" |
| if obj is None: |
| return None |
| if isinstance(obj, (str, int, float, bool)): |
| return obj |
| if isinstance(obj, np.ndarray): |
| return obj.tolist() |
| if isinstance(obj, dict): |
| return {str(k): make_json_serializable(v) for k, v in obj.items()} |
| if isinstance(obj, (list, tuple)): |
| return [make_json_serializable(v) for v in obj] |
| |
| try: |
| return obj.tolist() |
| except Exception: |
| pass |
| |
| return str(obj) |
|
|
| |
| |
| |
| def pil_to_base64(pil_img, fmt="PNG"): |
| buffer = io.BytesIO() |
| pil_img.save(buffer, format=fmt) |
| return base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
|
| def base64_to_pil(b64): |
| try: |
| data = base64.b64decode(b64) |
| return Image.open(io.BytesIO(data)) |
| except Exception as e: |
| print(f"[base64_to_pil] Error: {e}") |
| return None |
|
|
| |
| |
| |
| def load_image_pil(path): |
| try: |
| return Image.open(path) |
| except Exception as e: |
| print(f"[load_image_pil] Could not open {path}: {e}") |
| return None |
|
|
| def add_background(pil_img, bg_color=(255,255,255), size=None): |
| if pil_img is None: |
| return None |
| try: |
| target = size if size is not None else pil_img.size |
| bg = Image.new("RGB", target, bg_color) |
| img_rgba = pil_img.convert("RGBA") |
| if img_rgba.size != target: |
| x = (target[0] - img_rgba.size[0]) // 2 |
| y = (target[1] - img_rgba.size[1]) // 2 |
| else: |
| x, y = 0, 0 |
| mask = img_rgba.split()[3] if img_rgba.mode == "RGBA" else None |
| bg.paste(img_rgba.convert("RGB"), (x,y), mask=mask) |
| return bg |
| except Exception as e: |
| print(f"[add_background] Error: {e}") |
| return None |
|
|
| def preprocess_for_hash(pil_img, size=(256,256)): |
| try: |
| img = pil_img.convert("RGB") |
| img = ImageOps.grayscale(img) |
| img = ImageOps.equalize(img) |
| img = img.resize(size) |
| return np.array(img).astype(np.uint8) |
| except Exception as e: |
| print(f"[preprocess_for_hash] Error: {e}") |
| return None |
|
|
| def preprocess_for_model(pil_img): |
| try: |
| if pil_img.mode == "RGBA": |
| pil_img = pil_img.convert("RGB") |
| elif pil_img.mode == "L": |
| pil_img = pil_img.convert("RGB") |
| else: |
| pil_img = pil_img.convert("RGB") |
| return pil_img |
| except Exception as e: |
| print(f"[preprocess_for_model] Error: {e}") |
| return None |
|
|
| def get_dinov2_embedding_from_pil(pil_img): |
| try: |
| if pil_img is None: |
| return None |
| inputs = dinov2_processor(images=pil_img, return_tensors="pt").to(DEVICE) |
| with torch.no_grad(): |
| outputs = dinov2_model(**inputs) |
| |
| emb = outputs.last_hidden_state[:,0,:].squeeze(0).cpu().numpy() |
| n = np.linalg.norm(emb) |
| if n == 0 or np.isnan(n): |
| return None |
| return (emb / n).astype(float) |
| except Exception as e: |
| print(f"[get_dinov2_embedding_from_pil] Error: {e}") |
| return None |
|
|
| |
| |
| |
| def pil_to_bgr_np(pil_img): |
| arr = np.array(pil_img.convert("RGB")) |
| return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) |
|
|
| def bgr_np_to_pil(bgr_np): |
| rgb = cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB) |
| return Image.fromarray(rgb) |
|
|
| def upscale_image_cv(bgr_np, scale=2): |
| h,w = bgr_np.shape[:2] |
| return cv2.resize(bgr_np, (w*scale, h*scale), interpolation=cv2.INTER_CUBIC) |
|
|
| def reduce_noise_cv(bgr_np): |
| return cv2.fastNlMeansDenoisingColored(bgr_np, None, 10,10,7,21) |
|
|
| def sharpen_cv(bgr_np): |
| kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]]) |
| return cv2.filter2D(bgr_np, -1, kernel) |
|
|
| def enhance_contrast_cv(bgr_np): |
| pil_img = Image.fromarray(cv2.cvtColor(bgr_np, cv2.COLOR_BGR2RGB)) |
| enhancer = ImageEnhance.Contrast(pil_img) |
| enhanced = enhancer.enhance(1.5) |
| return cv2.cvtColor(np.array(enhanced), cv2.COLOR_RGB2BGR) |
|
|
| def process_image_cv2_from_pil(pil_img, scale=2): |
| try: |
| bgr = pil_to_bgr_np(pil_img) |
| bgr = upscale_image_cv(bgr, scale=scale) if scale != 1 else bgr |
| bgr = reduce_noise_cv(bgr) |
| bgr = sharpen_cv(bgr) |
| bgr = enhance_contrast_cv(bgr) |
| return bgr_np_to_pil(bgr) |
| except Exception as e: |
| print(f"[process_image_cv2_from_pil] Error: {e}") |
| return None |
|
|
| |
| def cosine_similarity(a, b): |
| return float(np.dot(a, b)) |
|
|
|
|
| |
| |
| |
|
|
| def run_query_search_flow( |
| query_path: Optional[str] = None, |
| query_b64: Optional[str] = None, |
| processed_dir: str = "./processed", |
| embeddings_dict: Dict[str, np.ndarray] = None, |
| hash_dict: Dict[str, Any] = None, |
| signature_obj_map: Dict[str, Any] = None, |
| gis: Any = None, |
| phash: Any = None, |
| MAX_PHASH_BITS: int = 64, |
| k: int = 10, |
| ) -> Tuple[ |
| List[Tuple[str, float]], |
| List[Tuple[str, Any, float]], |
| List[Tuple[str, Any, float]], |
| List[Tuple[str, float, float, float, float]], |
| ]: |
| """ |
| Run the full query/search flow (base64 -> preprocess -> embed -> scoring). |
| Accepts either query_path (file on disk) OR query_b64 (base64 string). If both are |
| provided, query_b64 takes precedence. |
| |
| Returns: |
| embedding_results_sorted, |
| phash_results_sorted, |
| imgmatch_results_sorted, |
| combined_results_sorted |
| """ |
|
|
| |
| if (query_path is None or query_path == "") and (query_b64 is None or query_b64 == ""): |
| raise ValueError("Either query_path or query_b64 must be provided.") |
|
|
| |
| os.makedirs(processed_dir, exist_ok=True) |
|
|
| print("\n--- Query/Search Phase ---") |
|
|
| |
| if query_b64: |
| |
| query_from_b64 = base64_to_pil(query_b64) |
| if query_from_b64 is None: |
| raise RuntimeError("Could not decode provided base64 query. Exiting.") |
| query_pil_orig = query_from_b64 |
| else: |
| |
| if not os.path.exists(query_path): |
| raise FileNotFoundError(f"Query image not found: {query_path}") |
| query_pil_orig = load_image_pil(query_path) |
| if query_pil_orig is None: |
| raise RuntimeError("Could not load query image from path. Exiting.") |
|
|
| |
| try: |
| query_b64 = pil_to_base64(query_pil_orig, fmt="PNG") |
| except Exception as e: |
| raise RuntimeError(f"Could not base64 query from disk image: {e}") |
| |
| query_from_b64 = base64_to_pil(query_b64) |
| if query_from_b64 is None: |
| raise RuntimeError("Could not decode query base64 after roundtrip. Exiting.") |
|
|
| |
| |
| enhanced_query_pil = process_image_cv2_from_pil(query_from_b64, scale=2) |
| if enhanced_query_pil is None: |
| print("[Query] OpenCV enhancement failed; falling back to base64-decoded image.") |
| enhanced_query_pil = query_from_b64 |
|
|
| |
| query_enhanced_path = os.path.join(processed_dir, "query_enhanced.png") |
| try: |
| enhanced_query_pil.save(query_enhanced_path, format="PNG") |
| except Exception: |
| try: |
| enhanced_query_pil.convert("RGB").save(query_enhanced_path, format="PNG") |
| except Exception: |
| print("[Warning] Could not save enhanced query image for inspection.") |
|
|
| |
| prepped = preprocess_for_model(enhanced_query_pil) |
| query_emb = get_dinov2_embedding_from_pil(prepped) |
| if query_emb is None: |
| raise RuntimeError("Could not compute query embedding. Exiting.") |
|
|
| |
| query_hash_arr = preprocess_for_hash(enhanced_query_pil) |
| if query_hash_arr is None: |
| raise RuntimeError("Could not compute query phash array. Exiting.") |
| query_phash = phash.encode_image(image_array=query_hash_arr) |
|
|
| |
| query_sig = None |
| query_sig_path = os.path.join(processed_dir, "query_for_sig.png") |
| try: |
| enhanced_query_pil.save(query_sig_path, format="PNG") |
| except Exception: |
| try: |
| enhanced_query_pil.convert("RGB").save(query_sig_path, format="PNG") |
| except Exception: |
| query_sig_path = None |
|
|
| if query_sig_path: |
| try: |
| query_sig = gis.generate_signature(query_sig_path) |
| except Exception as e: |
| print(f"[ImageSignature] failed for query: {e}") |
| query_sig = None |
|
|
| |
| |
| |
| embeddings_dict = embeddings_dict or {} |
| hash_dict = hash_dict or {} |
| signature_obj_map = signature_obj_map or {} |
|
|
| image_paths = list(embeddings_dict.keys()) |
| image_embeddings = np.array(list(embeddings_dict.values()), dtype=float) if embeddings_dict else np.array([]) |
|
|
| def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: |
| try: |
| return float(np.dot(a, b)) |
| except Exception: |
| return -1.0 |
|
|
| |
| embedding_results: List[Tuple[str, float]] = [] |
| phash_results: List[Tuple[str, Any, float]] = [] |
| imgmatch_results: List[Tuple[str, Any, float]] = [] |
| combined_results: List[Tuple[str, float, float, float, float]] = [] |
|
|
| |
| for idx, path in enumerate(image_paths): |
| |
| try: |
| stored_emb = image_embeddings[idx] |
| emb_sim = cosine_similarity(query_emb, stored_emb) |
| except Exception: |
| emb_sim = -1.0 |
| embedding_results.append((path, emb_sim)) |
|
|
| |
| try: |
| stored_ph = hash_dict.get(path) |
| if stored_ph is not None: |
| hd = phash.hamming_distance(query_phash, stored_ph) |
| ph_sim = max(0.0, 1.0 - (hd / float(MAX_PHASH_BITS))) |
| else: |
| hd = None |
| ph_sim = 0.0 |
| except Exception: |
| hd = None |
| ph_sim = 0.0 |
| phash_results.append((path, hd, ph_sim)) |
|
|
| |
| try: |
| stored_sig = signature_obj_map.get(path) |
| if stored_sig is not None and query_sig is not None: |
| dist = gis.normalized_distance(stored_sig, query_sig) |
| im_sim = max(0.0, 1.0 - dist) |
| else: |
| dist = None |
| im_sim = 0.0 |
| except Exception: |
| dist = None |
| im_sim = 0.0 |
| imgmatch_results.append((path, dist, im_sim)) |
|
|
| |
| emb_clamped = max(0.0, min(1.0, emb_sim)) |
| combined = (emb_clamped + ph_sim + im_sim) / 3.0 |
| combined_results.append((path, combined, emb_clamped, ph_sim, im_sim)) |
|
|
| |
| |
| |
| embedding_results.sort(key=lambda x: x[1], reverse=True) |
| phash_results_sorted = sorted(phash_results, key=lambda x: (x[2] is not None, x[2]), reverse=True) |
| imgmatch_results_sorted = sorted(imgmatch_results, key=lambda x: (x[2] is not None, x[2]), reverse=True) |
| combined_results.sort(key=lambda x: x[1], reverse=True) |
|
|
| |
| |
| |
| print("\nTop results by DINOv2 Embeddings:") |
| for i, (path, score) in enumerate(embedding_results[:k], start=1): |
| print(f"Rank {i}: {path} | Cosine: {score:.4f}") |
|
|
| print("\nTop results by PHash (Hamming distance & normalized sim):") |
| for i, (path, hd, sim) in enumerate(phash_results_sorted[:k], start=1): |
| print(f"Rank {i}: {path} | Hamming: {hd} | NormSim: {sim:.4f}") |
|
|
| print("\nTop results by ImageSignature (normalized similarity = 1 - distance):") |
| for i, (path, dist, sim) in enumerate(imgmatch_results_sorted[:k], start=1): |
| print(f"Rank {i}: {path} | NormDist: {dist} | NormSim: {sim:.4f}") |
|
|
| print("\nTop results by Combined Score (avg of embedding|phash|image-match):") |
| for i, (path, combined, emb_clamped, ph_sim, im_sim) in enumerate(combined_results[:k], start=1): |
| print(f"Rank {i}: {path} | Combined: {combined:.4f} | emb: {emb_clamped:.4f} | phash_sim: {ph_sim:.4f} | imgmatch_sim: {im_sim:.4f}") |
|
|
| print("\nSearch complete.") |
|
|
| |
| return embedding_results, phash_results_sorted, imgmatch_results_sorted, combined_results |
|
|
| |
| |
| |
| from collections import defaultdict |
| import math |
|
|
| def choose_top_candidates(embedding_results, phash_results, imgmatch_results, top_k=10, |
| method_weights=(0.5, 0.3, 0.2), verbose=True): |
| """ |
| embedding_results: list of (path, emb_sim) where emb_sim roughly in [-1,1] (we'll clamp to 0..1) |
| phash_results: list of (path, hamming, ph_sim) where ph_sim in [0,1] |
| imgmatch_results: list of (path, dist, im_sim) where im_sim in [0,1] |
| method_weights: weights for (emb, phash, imgmatch) when using weighted average |
| returns dict with top candidates from three methods and diagnostics |
| """ |
| |
| emb_map = {p: float(s) for p, s in embedding_results} |
| ph_map = {p: float(sim) for p, _, sim in phash_results} |
| im_map = {p: float(sim) for p, _, sim in imgmatch_results} |
|
|
| |
| all_paths = sorted(set(list(emb_map.keys()) + list(ph_map.keys()) + list(im_map.keys()))) |
|
|
| |
| def normalize_map(m): |
| vals = [m.get(p, None) for p in all_paths] |
| |
| present = [v for v in vals if v is not None and not math.isnan(v)] |
| if not present: |
| return {p: 0.0 for p in all_paths} |
| vmin, vmax = min(present), max(present) |
| if vmax == vmin: |
| |
| return {p: (1.0 if (m.get(p, None) is not None) else 0.0) for p in all_paths} |
| norm = {} |
| for p in all_paths: |
| v = m.get(p, None) |
| if v is None or math.isnan(v): |
| norm[p] = 0.0 |
| else: |
| norm[p] = (v - vmin) / (vmax - vmin) |
| |
| if norm[p] < 0: norm[p] = 0.0 |
| if norm[p] > 1: norm[p] = 1.0 |
| return norm |
|
|
| |
| emb_map_clamped = {} |
| for p, v in emb_map.items(): |
| |
| emb_map_clamped[p] = max(0.0, v) |
|
|
| emb_norm = normalize_map(emb_map_clamped) |
| ph_norm = normalize_map(ph_map) |
| im_norm = normalize_map(im_map) |
|
|
| |
| w_emb, w_ph, w_im = method_weights |
| weighted_scores = {} |
| for p in all_paths: |
| weighted_scores[p] = (w_emb * emb_norm.get(p, 0.0) |
| + w_ph * ph_norm.get(p, 0.0) |
| + w_im * im_norm.get(p, 0.0)) |
|
|
| top_weighted = sorted(weighted_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] |
|
|
| |
| |
| def ranks_from_map(m_norm): |
| |
| items = sorted(m_norm.items(), key=lambda x: x[1], reverse=True) |
| ranks = {} |
| for i, (p, _) in enumerate(items): |
| ranks[p] = i + 1 |
| |
| worst = len(items) + 1 |
| for p in all_paths: |
| if p not in ranks: |
| ranks[p] = worst |
| return ranks |
|
|
| rank_emb = ranks_from_map(emb_norm) |
| rank_ph = ranks_from_map(ph_norm) |
| rank_im = ranks_from_map(im_norm) |
|
|
| rank_sum = {} |
| for p in all_paths: |
| rank_sum[p] = rank_emb.get(p, 9999) + rank_ph.get(p, 9999) + rank_im.get(p, 9999) |
| top_rank_sum = sorted(rank_sum.items(), key=lambda x: x[1])[:top_k] |
|
|
| |
| harm_scores = {} |
| for p in all_paths: |
| a = emb_norm.get(p, 0.0) |
| b = ph_norm.get(p, 0.0) |
| c = im_norm.get(p, 0.0) |
| |
| if a + b + c == 0: |
| harm = 0.0 |
| else: |
| |
| if a == 0 or b == 0 or c == 0: |
| harm = 0.0 |
| else: |
| harm = 3.0 / ((1.0/a) + (1.0/b) + (1.0/c)) |
| harm_scores[p] = harm |
| top_harm = sorted(harm_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] |
|
|
| |
| def topk_set_by_map(m_norm, k=top_k): |
| return set([p for p,_ in sorted(m_norm.items(), key=lambda x: x[1], reverse=True)[:k]]) |
| cons_set = topk_set_by_map(emb_norm, top_k) & topk_set_by_map(ph_norm, top_k) & topk_set_by_map(im_norm, top_k) |
|
|
| |
| result = { |
| "emb_norm": emb_norm, |
| "ph_norm": ph_norm, |
| "im_norm": im_norm, |
| "weighted_topk": top_weighted, |
| "rank_sum_topk": top_rank_sum, |
| "harmonic_topk": top_harm, |
| "consensus_topk": list(cons_set), |
| "weighted_scores_full": weighted_scores, |
| "rank_sum_full": rank_sum, |
| "harmonic_full": harm_scores |
| } |
|
|
| if verbose: |
| print("\nTop by Weighted Normalized Average (weights emb,ph,img = {:.2f},{:.2f},{:.2f}):".format(w_emb, w_ph, w_im)) |
| for i,(p,s) in enumerate(result["weighted_topk"], start=1): |
| print(f" {i}. {p} score={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") |
|
|
| print("\nTop by Rank-sum (lower is better):") |
| for i,(p,s) in enumerate(result["rank_sum_topk"], start=1): |
| print(f" {i}. {p} rank_sum={s} emb_rank={rank_emb.get(p)} ph_rank={rank_ph.get(p)} img_rank={rank_im.get(p)}") |
|
|
| print("\nTop by Harmonic mean (requires non-zero on all metrics):") |
| for i,(p,s) in enumerate(result["harmonic_topk"], start=1): |
| print(f" {i}. {p} harm={s:.4f} emb={emb_norm.get(p,0):.3f} ph={ph_norm.get(p,0):.3f} im={im_norm.get(p,0):.3f}") |
|
|
| print("\nConsensus (in top-{0} of ALL metrics): {1}".format(top_k, result["consensus_topk"])) |
|
|
| return result |
|
|
| def is_subpath(path: str, base: str) -> bool: |
| """Return True if path is inside base (works across OSes).""" |
| try: |
| p = os.path.normpath(os.path.abspath(path)) |
| b = os.path.normpath(os.path.abspath(base)) |
| if os.name == "nt": p = p.lower(); b = b.lower() |
| return os.path.commonpath([p, b]) == b |
| except Exception: |
| return False |
| |
| |
| def _load_block_catalog(block_type: str) -> Dict: |
| """ |
| Loads the Scratch block catalog named '{block_type}_blocks.json' |
| from the <project_root>/blocks/ folder. Returns {} on any error. |
| """ |
| catalog_path = BLOCKS_DIR / f"{block_type}.json" |
|
|
| try: |
| text = catalog_path.read_text() |
| catalog = json.loads(text) |
| logger.info(f"Successfully loaded block catalog from {catalog_path}") |
| return catalog |
| except FileNotFoundError: |
| logger.error(f"Error: Block catalog file not found at {catalog_path}") |
| except json.JSONDecodeError as e: |
| logger.error(f"Error decoding JSON from {catalog_path}: {e}") |
| except Exception as e: |
| logger.error(f"Unexpected error loading {catalog_path}: {e}") |
|
|
| def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: |
| """ |
| Search a single catalog (with keys "description" and "blocks": List[dict]) |
| for a block whose 'op_code' matches the given opcode. |
| Returns the block dict or None if not found. |
| """ |
| for block in catalog_data["blocks"]: |
| if block.get("op_code") == opcode: return block |
| return None |
|
|
| |
| def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: |
| """ |
| Search across multiple catalogs for a given opcode. |
| Returns the first matching block dict or None. |
| """ |
| for catalog in all_catalogs: |
| blk = get_block_by_opcode(catalog, opcode) |
| if blk is not None: return blk |
| return None |
|
|
| def variable_intialization(project_data): |
| """ |
| Updates variable and broadcast definitions in a Scratch project JSON, |
| populating the 'variables' and 'broadcasts' sections of the Stage target |
| and extracting initial values for variables. |
| Args: project_data (dict): The loaded JSON data of the Scratch project. |
| Returns: dict: The updated project JSON data. |
| """ |
|
|
| stage_target = None |
| for target in project_data['targets']: |
| if target.get('isStage'): |
| stage_target = target |
| break |
| if stage_target is None: |
| print("Error: Stage target not found in the project data.") |
| return project_data |
| |
| if "variables" not in stage_target: |
| stage_target["variables"] = {} |
| if "broadcasts" not in stage_target: |
| stage_target["broadcasts"] = {} |
|
|
| |
| def process_dict(obj): |
| if isinstance(obj, dict): |
| |
| if obj.get("opcode") == "data_setvariableto": |
| variable_field = obj.get("fields", {}).get("VARIABLE") |
| value_input = obj.get("inputs", {}).get("VALUE") |
|
|
| if variable_field and isinstance(variable_field, list) and len(variable_field) == 2: |
| var_name = variable_field[0] |
| var_id = variable_field[1] |
|
|
| initial_value = "" |
| if value_input and isinstance(value_input, list) and len(value_input) > 1 and \ |
| isinstance(value_input[1], list) and len(value_input[1]) > 1: |
| if value_input[1][0] == 10: |
| initial_value = str(value_input[1][1]) |
| elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: |
| initial_value = str(value_input[2][1]) |
| elif isinstance(value_input[1], (str, int, float)): |
| initial_value = str(value_input[1]) |
| stage_target["variables"][var_id] = [var_name, initial_value] |
|
|
| for key, value in obj.items(): |
| |
| if key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \ |
| isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11: |
| broadcast_name = value[1][1] |
| broadcast_id = value[1][2] |
| stage_target["broadcasts"][broadcast_id] = broadcast_name |
|
|
| |
| elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2: |
| broadcast_name = value[0] |
| broadcast_id = value[1] |
| stage_target["broadcasts"][broadcast_id] = broadcast_name |
| |
| |
| process_dict(value) |
|
|
| elif isinstance(obj, list): |
| for i, item in enumerate(obj): |
| |
| if isinstance(item, list) and len(item) == 3 and item[0] == 12: |
| var_name = item[1] |
| var_id = item[2] |
| if var_id not in stage_target["variables"]: |
| stage_target["variables"][var_id] = [var_name, ""] |
| process_dict(item) |
|
|
| |
| for target in project_data['targets']: |
| if "blocks" in target: |
| for block_id, block_data in target["blocks"].items(): |
| process_dict(block_data) |
|
|
| return project_data |
|
|
| def deduplicate_variables(project_data): |
| """ |
| Removes duplicate variable entries in the 'variables' dictionary of the Stage target, |
| prioritizing entries with non-empty values. |
| Args: project_data (dict): The loaded JSON data of the Scratch project. |
| Returns: dict: The updated project JSON data with deduplicated variables. |
| """ |
| stage_target = None |
| for target in project_data['targets']: |
| if target.get('isStage'): |
| stage_target = target |
| break |
|
|
| if stage_target is None: |
| print("Error: Stage target not found in the project data.") |
| return project_data |
|
|
| if "variables" not in stage_target: |
| return project_data |
|
|
| resolved_variables = {} |
|
|
| for var_id, var_info in stage_target["variables"].items(): |
| var_name = var_info[0] |
| var_value = var_info[1] |
|
|
| if var_name not in resolved_variables: |
| |
| resolved_variables[var_name] = [var_id, var_name, var_value] |
| else: |
| |
| existing_id, existing_name, existing_value = resolved_variables[var_name] |
| |
| if var_value != "" and existing_value == "": |
| resolved_variables[var_name] = [var_id, var_name, var_value] |
| elif var_value != "" and existing_value != "": |
| resolved_variables[var_name] = [var_id, var_name, var_value] |
| elif var_value == "" and existing_value == "": |
| |
| resolved_variables[var_name] = [var_id, var_name, var_value] |
|
|
| |
| new_variables_dict = {} |
| for var_name, var_data in resolved_variables.items(): |
| var_id_to_keep = var_data[0] |
| var_name_to_keep = var_data[1] |
| var_value_to_keep = var_data[2] |
| new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep] |
| stage_target["variables"] = new_variables_dict |
| return project_data |
|
|
| def variable_adder_main(project_data): |
| try: |
| declare_variable_json= variable_intialization(project_data) |
| print("declare_variable_json------->",declare_variable_json) |
| except Exception as e: |
| print(f"Error error in the variable initialization opcodes: {e}") |
| try: |
| processed_json= deduplicate_variables(declare_variable_json) |
| print("processed_json------->",processed_json) |
| return processed_json |
| except Exception as e: |
| print(f"Error error in the variable initialization opcodes: {e}") |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| def extract_json_from_llm_response(raw_response: str) -> dict: |
| """ |
| Finds and parses the first valid JSON object from a raw LLM response string. |
| """ |
| logger.debug("Attempting to extract JSON from raw LLM response...") |
|
|
| |
| match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", raw_response) |
| if match: |
| json_string = match.group(1) |
| logger.debug("Found JSON inside a markdown block.") |
| try: |
| return json.loads(json_string) |
| except json.JSONDecodeError as e: |
| logger.warning(f"Failed to parse JSON from markdown block: {e}") |
| |
| |
| |
| logger.debug("Markdown block not found or failed. Searching for outermost braces.") |
| try: |
| first_brace = raw_response.find('{') |
| last_brace = raw_response.rfind('}') |
| if first_brace != -1 and last_brace != -1 and first_brace < last_brace: |
| json_string = raw_response[first_brace : last_brace + 1] |
| return json.loads(json_string) |
| else: |
| logger.error("Could not find a valid JSON structure (outermost braces).") |
| raise json.JSONDecodeError("No valid JSON object found in the response.", raw_response, 0) |
| except json.JSONDecodeError as e: |
| logger.error(f"Final JSON parsing attempt failed: {e}") |
| |
| raise |
|
|
| def reduce_image_size_to_limit(clean_b64_str: str, max_kb: int = 4000) -> str: |
| """ |
| Input: clean_b64_str = BASE64 STRING (no data: prefix) |
| Output: BASE64 STRING (no data: prefix), sized as close as possible to max_kb KB. |
| Guarantees: returns a valid base64 string (never None). May still be larger than max_kb |
| if saving at lowest quality cannot get under the limit. |
| """ |
| |
| clean = re.sub(r"\s+", "", clean_b64_str).strip() |
| |
| missing = len(clean) % 4 |
| if missing: |
| clean += "=" * (4 - missing) |
|
|
| try: |
| image_data = base64.b64decode(clean) |
| except Exception as e: |
| raise ValueError("Invalid base64 input to reduce_image_size_to_limit") from e |
|
|
| try: |
| img = Image.open(io.BytesIO(image_data)) |
| img.load() |
| except Exception as e: |
| raise ValueError("Could not open image from base64") from e |
|
|
| |
| if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info): |
| background = Image.new("RGB", img.size, (255, 255, 255)) |
| background.paste(img, mask=img.split()[-1] if img.mode != "RGB" else None) |
| img = background |
| elif img.mode != "RGB": |
| img = img.convert("RGB") |
|
|
| low, high = 20, 95 |
| best_bytes = None |
| |
| while low <= high: |
| mid = (low + high) // 2 |
| buf = io.BytesIO() |
| try: |
| img.save(buf, format="JPEG", quality=mid, optimize=True) |
| except OSError: |
| |
| buf = io.BytesIO() |
| img.save(buf, format="JPEG", quality=mid) |
| size_kb = len(buf.getvalue()) / 1024.0 |
| if size_kb <= max_kb: |
| best_bytes = buf.getvalue() |
| low = mid + 1 |
| else: |
| high = mid - 1 |
|
|
| |
| if best_bytes is None: |
| buf = io.BytesIO() |
| try: |
| img.save(buf, format="JPEG", quality=20, optimize=True) |
| except OSError: |
| buf = io.BytesIO() |
| img.save(buf, format="JPEG", quality=20) |
| best_bytes = buf.getvalue() |
|
|
| return base64.b64encode(best_bytes).decode("utf-8") |
|
|
|
|
| def clean_base64_for_model(raw_b64, max_bytes_threshold=4000000) -> str: |
| """ |
| Accepts: raw_b64 can be: |
| - a data URI 'data:image/png;base64,...' |
| - a plain base64 string |
| - a PIL Image |
| - a list containing the above (take first) |
| Returns: a data URI string 'data:<mime>;base64,<base64>' guaranteed to be syntactically valid. |
| """ |
| |
| if not raw_b64: |
| return "" |
|
|
| if isinstance(raw_b64, list): |
| raw_b64 = raw_b64[0] if raw_b64 else "" |
| if not raw_b64: |
| return "" |
|
|
| if isinstance(raw_b64, Image.Image): |
| buf = io.BytesIO() |
| |
| img = raw_b64.convert("RGB") |
| img.save(buf, format="JPEG") |
| clean_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") |
| mime = "image/jpeg" |
| return f"data:{mime};base64,{clean_b64}" |
|
|
| if not isinstance(raw_b64, str): |
| raise TypeError(f"Expected base64 string or PIL Image, got {type(raw_b64)}") |
|
|
| |
| m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", raw_b64, flags=re.DOTALL) |
| if m: |
| mime = m.group(1) |
| clean_b64 = m.group(2) |
| else: |
| |
| mime = "image/png" |
| clean_b64 = raw_b64 |
|
|
| |
| clean_b64 = re.sub(r"\s+", "", clean_b64).strip() |
| missing = len(clean_b64) % 4 |
| if missing: |
| clean_b64 += "=" * (4 - missing) |
|
|
| original_size_bytes = len(clean_b64.encode("utf-8")) |
| |
| print(f"Original base64 size (bytes): {original_size_bytes}, mime: {mime}") |
|
|
| if original_size_bytes > max_bytes_threshold: |
| |
| reduced_clean = reduce_image_size_to_limit(clean_b64, max_kb=4000) |
| |
| print(f"Reduced base64 size (bytes): {original_size_bytes}, mime: {mime}") |
| return f"data:image/jpeg;base64,{reduced_clean}" |
|
|
| |
| return f"data:{mime};base64,{clean_b64}" |
|
|
| SCRATCH_OPCODES = [ |
| 'motion_movesteps', 'motion_turnright', 'motion_turnleft', 'motion_goto', |
| 'motion_gotoxy', 'motion_glideto', 'motion_glidesecstoxy', 'motion_pointindirection', |
| 'motion_pointtowards', 'motion_changexby', 'motion_setx', 'motion_changeyby', |
| 'motion_sety', 'motion_ifonedgebounce', 'motion_setrotationstyle', 'looks_sayforsecs', |
| 'looks_say', 'looks_thinkforsecs', 'looks_think', 'looks_switchcostumeto', |
| 'looks_nextcostume', 'looks_switchbackdropto', 'looks_switchbackdroptowait', |
| 'looks_nextbackdrop', 'looks_changesizeby', 'looks_setsizeto', 'looks_changeeffectby', |
| 'looks_seteffectto', 'looks_cleargraphiceffects', 'looks_show', 'looks_hide', |
| 'looks_gotofrontback', 'looks_goforwardbackwardlayers', 'sound_playuntildone', |
| 'sound_play', 'sound_stopallsounds', 'sound_changevolumeby', 'sound_setvolumeto', |
| 'event_broadcast', 'event_broadcastandwait', 'control_wait', 'control_wait_until', |
| 'control_stop', 'control_create_clone_of', 'control_delete_this_clone', |
| 'data_setvariableto', 'data_changevariableby', 'data_addtolist', 'data_deleteoflist', |
| 'data_insertatlist', 'data_replaceitemoflist', 'data_showvariable', 'data_hidevariable', |
| 'data_showlist', 'data_hidelist', 'sensing_askandwait', 'sensing_resettimer', |
| 'sensing_setdragmode', 'procedures_call', 'operator_lt', 'operator_equals', |
| 'operator_gt', 'operator_and', 'operator_or', 'operator_not', 'operator_contains', |
| 'sensing_touchingobject', 'sensing_touchingcolor', 'sensing_coloristouchingcolor', |
| 'sensing_keypressed', 'sensing_mousedown', 'data_listcontainsitem', 'control_repeat', |
| 'control_forever', 'control_if', 'control_if_else', 'control_repeat_until', |
| 'motion_xposition', 'motion_yposition', 'motion_direction', 'looks_costumenumbername', |
| 'looks_size', 'looks_backdropnumbername', 'sound_volume', 'sensing_distanceto', |
| 'sensing_answer', 'sensing_mousex', 'sensing_mousey', 'sensing_loudness', |
| 'sensing_timer', 'sensing_of', 'sensing_current', 'sensing_dayssince2000', |
| 'sensing_username', 'operator_add', 'operator_subtract', 'operator_multiply', |
| 'operator_divide', 'operator_random', 'operator_join', 'operator_letterof', |
| 'operator_length', 'operator_mod', 'operator_round', 'operator_mathop', |
| 'data_variable', 'data_list', 'data_itemoflist', 'data_lengthoflist', |
| 'data_itemnumoflist', 'event_whenflagclicked', 'event_whenkeypressed', |
| 'event_whenthisspriteclicked', 'event_whenbackdropswitchesto', 'event_whengreaterthan', |
| 'event_whenbroadcastreceived', 'control_start_as_clone', 'procedures_definition' |
| ] |
|
|
| def validate_and_fix_opcodes(opcode_counts): |
| """ |
| Ensures all opcodes are valid. If an opcode is invalid, replace with closest match. |
| """ |
| corrected_list = [] |
| for item in opcode_counts: |
| opcode = item.get("opcode") |
| count = item.get("count", 1) |
|
|
| if opcode not in SCRATCH_OPCODES: |
| |
| match = get_close_matches(opcode, SCRATCH_OPCODES, n=1, cutoff=0.6) |
| if match: |
| print(f"Opcode '{opcode}' not found. Replacing with '{match[0]}'") |
| opcode = match[0] |
| else: |
| print(f"Opcode '{opcode}' not recognized and no close match found. Skipping.") |
| continue |
| |
| corrected_list.append({"opcode": opcode, "count": count}) |
|
|
| |
| merged = {} |
| for item in corrected_list: |
| merged[item["opcode"]] = merged.get(item["opcode"], 0) + item["count"] |
|
|
| return [{"opcode": k, "count": v} for k, v in merged.items()] |
| |
| def format_scratch_pseudo_code(code_string): |
| """ |
| Parses and formats Scratch pseudo-code with correct indentation, |
| specifically handling if/else/end structures correctly. |
| |
| Args: |
| code_string (str): A string containing Scratch pseudo-code with |
| potentially inconsistent indentation. |
| |
| Returns: |
| str: The correctly formatted and indented pseudo-code string. |
| """ |
| lines = code_string.strip().split('\n') |
| formatted_lines = [] |
| indent_level = 0 |
| |
| |
| indent_keywords = ['when', 'forever', 'if', 'repeat', 'else'] |
| |
| |
| unindent_keywords = ['end', 'else'] |
|
|
| for line in lines: |
| stripped_line = line.strip() |
| if not stripped_line: |
| continue |
|
|
| |
| if any(keyword in stripped_line for keyword in unindent_keywords): |
| |
| if 'else' in stripped_line: |
| |
| indentation = ' ' * (indent_level -1) |
| formatted_lines.append(indentation + stripped_line) |
| continue |
| |
| |
| indent_level = max(0, indent_level - 1) |
| |
| indentation = ' ' * indent_level |
| formatted_lines.append(indentation + stripped_line) |
|
|
| |
| if any(keyword in stripped_line for keyword in indent_keywords): |
| |
| if 'else' not in stripped_line: |
| indent_level += 1 |
|
|
| return '\n'.join(formatted_lines) |
|
|
| |
| def pseudo_generator_node(state: GameState): |
| logger.info("--- Running plan_logic_aligner_node ---") |
| image = state.get("project_image", "") |
| project_json = state["project_json"] |
| cnt =state["page_count"] |
| print(f"The page number recived at the pseudo_generator node:-----> {cnt}") |
| |
| |
| target_names = [t["name"] for t in project_json["targets"]] |
| stage_names = [t["name"] for t in project_json["targets"] if t.get("isStage")] |
| sprite_names = [t["name"] for t in project_json["targets"] if not t.get("isStage")] |
| |
| stage_costumes = [ |
| c["name"] |
| for t in project_json["targets"] if t.get("isStage") |
| for c in t.get("costumes", []) |
| ] |
| refinement_prompt = f""" |
| You are an expert Scratch 3.0 programmer. Your task is to analyze an image of Scratch code blocks and convert it into a structured JSON object containing precise pseudocode. |
| |
| --- |
| ## CONTEXT |
| - **Available Sprites:** {', '.join(sprite_names)} |
| - **Available Stage Costumes:** {', '.join(stage_costumes)} |
| |
| --- |
| ## INSTRUCTIONS |
| 1. **Identify the Target:** Find the text "Script for:" in the image to determine the target sprite or stage. |
| 2. **Apply Stage Rule:** If the identified target name exactly matches any name in the `Available Stage Costumes` list, you MUST set the output `name_variable` to `"Stage"`. Otherwise, use the identified target name. |
| 3. **Handle No Code:** If no Scratch blocks are visible in the image, return the specified "No Code-blocks" JSON format. |
| 4. **Generate Pseudocode:** If blocks are present, convert them to pseudocode according to the rules below. |
| 5. **Output ONLY JSON:** Your entire response must be a single, valid JSON object inside a ```json code block and nothing else. |
| |
| --- |
| ## PSEUDOCODE FORMATTING RULES |
| - **Numbers & Text:** Enclose in parentheses. `(10)`, `(-50)`, `(hello)`. |
| - **Variables & Dropdowns:** Enclose in square brackets with ` v`. `[score v]`, `[space v]`. |
| - **Reporter Blocks:** Enclose in double parentheses. `((x position))`. |
| - **Boolean Conditions:** Enclose in angle brackets. `<((score)) > (50)>`, `<not <touching [edge v]?>>`. |
| - **Specific Block Exceptions:** Self-contained blocks like `if on edge, bounce`, `next costume`, and `hide` should be written as-is, without any parentheses or brackets. |
| - **Line Breaks:** Use `\n` to separate each block onto a new line. The entire pseudocode must be a single JSON string. |
| - **Indentation:** Use **4 spaces** to indent blocks nested inside C-Blocks (like `if`, `if else`, `repeat`, `forever`). |
| - **Termination:** |
| - **Every script** (starting with a hat block) MUST conclude with `end`. |
| - **Every C-Block** (`if`, `repeat`, `forever`) MUST also have its own corresponding `end` at the correct indentation level. This is critical. |
| |
| --- |
| ## REQUIRED JSON FORMAT |
| If code blocks are found: |
| ```json |
| {{ |
| "refined_logic": {{ |
| "name_variable": "Name_Identified_From_Instructions", |
| "pseudocode": "Your fully formatted pseudocode as a single string with \\n newlines." |
| }} |
| }} |
| ```` |
| |
| If no code blocks are found: |
| |
| ```json |
| {{ |
| "refined_logic": {{ |
| "name_variable": "Name_Identified_From_Instructions", |
| "pseudocode": "No Code-blocks" |
| }} |
| }} |
| ``` |
| |
| ----- |
| |
| ## EXAMPLES |
| |
| **Example 1: Looping and Conditionals** |
| |
| ``` |
| when green flag clicked |
| go to x: (240) y: (-100) |
| set [speed v] to (-5) |
| forever |
| change x by ([speed v]) |
| if <((x position)) < (-240)> then |
| go to x: (240) y: (-100) |
| end |
| end |
| end |
| ``` |
| |
| **Example 2: Events and Broadcasting** |
| |
| ``` |
| when I receive [Game Over v] |
| if <((score)) > (([High Score v]))> then |
| set [High Score v] to ([score v]) |
| end |
| switch backdrop to [Game Over v] |
| end |
| ``` |
| """ |
| image_input = { |
| "type": "image_url", |
| "image_url": { |
| |
| "url": clean_base64_for_model(image[cnt]) |
| } |
| } |
|
|
| content = [ |
| {"type": "text", "text": refinement_prompt}, |
| image_input |
| ] |
|
|
| try: |
| |
| response = agent.invoke({"messages": [{"role": "user", "content": content}]}) |
| llm_output_raw = response["messages"][-1].content.strip() |
| print(f"llm_output_raw: {response}") |
| parsed_llm_output = extract_json_from_llm_response(llm_output_raw) |
| result = parsed_llm_output |
| print(f"result:\n\n {result}") |
| |
| except json.JSONDecodeError as error_json: |
| correction_prompt = f""" |
| Fix this malformed response and return only the corrected JSON: |
| |
| Input: {llm_output_raw if 'llm_output_raw' in locals() else 'No response available'} |
| |
| Extract the sprite name and pseudocode, then return in this exact format: |
| {{ |
| "refined_logic": {{ |
| "name_variable": "sprite_name", |
| "pseudocode": "pseudocode_here" |
| }} |
| }} |
| """ |
| try: |
| correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) |
| corrected_output = extract_json_from_llm_response(correction_response['messages'][-1].content) |
| result = corrected_output |
| print(f"result:\n\n {result}") |
| except Exception as e_corr: |
| logger.error(f"Failed to correct JSON output for even after retry: {e_corr}") |
| |
| |
| |
| state["pseudo_code"] = result |
| state["temp_pseudo_code"] += [result] |
| Data = state["temp_pseudo_code"] |
| print(f"[OVREALL REFINED PSEUDO CODE LOGIC]: {result}") |
| print(f"[OVREALL LISTS OF LOGICS]: {Data}") |
| logger.info("Plan refinement and block relation analysis completed for all plans.") |
| return state |
|
|
| |
| def node_optimizer(state: GameState): |
| logger.info("--- Running Node Optimizer Node ---") |
| project_json = state["project_json"] |
| raw = state.get("pseudo_code", {}) |
| refined_logic_data = raw.get("refined_logic", {}) |
| sprite_name = refined_logic_data.get("name_variable", "<unknown>") |
| pseudo = refined_logic_data.get("pseudocode", "") |
| sprite_name = {} |
| project_json_targets = state.get("project_json", {}).get("targets", []) |
| for target in project_json_targets: |
| sprite_name[target["name"]] = target["name"] |
| action_flow = state.get("action_plan", {}) |
| |
| try: |
| refined_logic_data["pseudocode"] = separate_scripts(str(pseudo)) |
| |
| state["pseudo_code"]["refined_logic"] = refined_logic_data |
| print(f"[The pseudo_code generated here]: { state['pseudo_code']}") |
| state["action_plan"] = transform_logic_to_action_flow(state["pseudo_code"]) |
| print(f"[The action plan generated here]: { state['action_plan']}") |
| |
| action_flow = state.get("action_plan", {}) |
| if action_flow.get("action_overall_flow", {}) == {}: |
| plan_data = action_flow.items() |
| else: |
| plan_data = action_flow.get("action_overall_flow", {}).items() |
| |
| refined_flow: Dict[str, Any] = {} |
| for sprite, sprite_data in plan_data: |
| refined_plans = [] |
| for plan in sprite_data.get("plans", []): |
| logic = plan.get("logic", "") |
| plan["opcode_counts"]= analyze_opcode_counts(str(logic)) |
| refined_plans.append(plan) |
| refined_flow[sprite] = { |
| "description": sprite_data.get("description", ""), |
| "plans": refined_plans |
| } |
| if refined_flow: |
| state["action_plan"] = refined_flow |
| logger.info("Node Optimization completed.") |
| |
| return state |
| except Exception as e: |
| logger.error(f"Error in Node Optimizer Node: {e}") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def overall_block_builder_node_2(state: GameState): |
| logger.info("--- Running OverallBlockBuilderNode ---") |
| print("--- Running OverallBlockBuilderNode ---") |
| project_json = state["project_json"] |
| targets = project_json["targets"] |
|
|
| |
| |
| |
| sprite_map = {} |
| stage_target = None |
| for target in targets: |
| raw_name = target.get("name", "") |
| proc_name, proc_key = process_text(raw_name) |
| if not target.get("isStage"): |
| sprite_map[proc_name] = target |
| else: |
| stage_target = target |
| sprite_map[proc_name] = target |
|
|
| action_plan = state.get("action_plan", {}) |
| print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) |
| if not action_plan: |
| logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") |
| return state |
|
|
| |
| script_y_offset = {} |
| |
| script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} |
|
|
| |
| if action_plan.get("action_overall_flow", {}) == {}: |
| plan_data = action_plan.items() |
| else: |
| plan_data = action_plan.get("action_overall_flow", {}).items() |
|
|
| |
| all_sprite_names = list(sprite_map.keys()) |
| all_variable_names = {} |
| all_list_names = {} |
| all_broadcast_messages = {} |
|
|
| for target in targets: |
| for var_id, var_info in target.get("variables", {}).items(): |
| all_variable_names[var_info[0]] = var_id |
| for list_id, list_info in target.get("lists", {}).items(): |
| all_list_names[list_info[0]] = list_id |
| for broadcast_id, broadcast_name in target.get("broadcasts", {}).items(): |
| all_broadcast_messages[broadcast_name] = broadcast_id |
|
|
| |
| for sprite_name, sprite_actions_data in plan_data: |
| |
| proc_sprite_name, proc_sprite_name_key = process_text(str(sprite_name)) |
|
|
| if proc_sprite_name in sprite_map: |
| current_sprite_target = sprite_map[proc_sprite_name] |
| if "blocks" not in current_sprite_target: |
| current_sprite_target["blocks"] = {} |
|
|
| |
| if proc_sprite_name not in script_y_offset: |
| script_y_offset[proc_sprite_name] = 0 |
| if proc_sprite_name not in script_x_offset_per_sprite: |
| script_x_offset_per_sprite[proc_sprite_name] = 0 |
|
|
| for plan_entry in sprite_actions_data.get("plans", []): |
| logic_sequence = str(plan_entry["logic"]) |
| opcode_counts = plan_entry.get("opcode_counts", {}) |
| refined_indent_logic = format_scratch_pseudo_code(logic_sequence) |
| print(f"\n--------------------------- refined indent logic: {refined_indent_logic}-------------------------------\n") |
| try: |
| generated_blocks = block_builder(opcode_counts, refined_indent_logic) |
|
|
| |
| if not isinstance(generated_blocks, dict): |
| logger.error(f"block_builder for sprite '{sprite_name}' returned non-dict type: {type(generated_blocks)}. Skipping block update.") |
| continue |
|
|
| if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): |
| logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") |
| generated_blocks = generated_blocks["blocks"] |
|
|
| |
| for block_id, block_data in generated_blocks.items(): |
| if block_data.get("topLevel"): |
| block_data["x"] = script_x_offset_per_sprite.get(proc_sprite_name, 0) |
| block_data["y"] = script_y_offset[proc_sprite_name] |
| script_y_offset[proc_sprite_name] += 150 |
|
|
| current_sprite_target["blocks"].update(generated_blocks) |
| print(f"[current_sprite_target block updated]: {current_sprite_target['blocks']}") |
| state["iteration_count"] = 0 |
| logger.info(f"Action blocks added for sprite '{sprite_name}' (processed as '{proc_sprite_name}') by OverallBlockBuilderNode.") |
| except Exception as e: |
| logger.error(f"Error generating blocks for sprite '{sprite_name}' (processed as '{proc_sprite_name}'): {e}") |
| else: |
| |
| logger.debug(f"Plan entry for sprite '{sprite_name}' (processed as '{proc_sprite_name}') not found in sprite_map. Skipping.") |
| state["project_json"] = project_json |
| return state |
|
|
| |
| def variable_adder_node(state: GameState): |
| logger.info("--- Running Variable Adder Node ---") |
| project_json = state["project_json"] |
| try: |
| updated_project_json = variable_adder_main(project_json) |
| if updated_project_json is not None: |
| print("Variable added inside the project successfully!") |
| state["project_json"]=updated_project_json |
| else: |
| print("Variable adder unable to add any variable inside the project!") |
| state["project_json"]=project_json |
| state["page_count"] +=1 |
| return state |
| except Exception as e: |
| logger.error(f"Error in variable adder node while updating project_json': {e}") |
| raise |
|
|
| |
| def layer_order_correction(state: GameState): |
| """ |
| Ensures that all sprites (isStage: false) have unique layerOrder values >= 1. |
| If duplicates are found, they are reassigned sequentially. |
| """ |
| logger.info("--- Running Layer Order Correction Node ---") |
| try: |
| project_json = state.get("project_json", {}) |
| targets = project_json.get("targets", []) |
|
|
| |
| sprites = [t for t in targets if not t.get("isStage", False)] |
|
|
| |
| for idx, sprite in enumerate(sprites, start=1): |
| old_lo = sprite.get("layerOrder", None) |
| sprite["layerOrder"] = idx |
| logger.debug(f"Sprite '{sprite.get('name')}' layerOrder: {old_lo} -> {idx}") |
|
|
| |
| for target in targets: |
| if target.get("isStage", False): |
| target["layerOrder"] = 0 |
|
|
| |
| state["project_json"]["targets"] = targets |
| logger.info("Layer Order Correction completed successfully.") |
|
|
| return state |
|
|
| except Exception as e: |
| logger.error(f"Error in Layer Order Correction Node: {e}") |
| return state |
|
|
| |
| def processed_page_node(state: GameState): |
| logger.info("--- Processing the Pages Node ---") |
| image = state.get("project_image", "") |
| cnt =state["page_count"] |
| print(f"The page processed for page:--------------> {cnt}") |
| if cnt<len(image): |
| state["processing"]= True |
| else: |
| state["processing"]= False |
| return state |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def extract_images_from_pdf(pdf_stream, output_dir): |
| manipulated_json = {} |
| try: |
| pdf_id = uuid.uuid4().hex |
| elements = partition_pdf( |
| file=pdf_stream, |
| strategy="hi_res", |
| extract_image_block_types=["Image"], |
| hi_res_model_name="yolox", |
| extract_image_block_to_payload=False, |
| extract_image_block_output_dir=BLOCKS_DIR, |
| ) |
| file_elements = [element.to_dict() for element in elements] |
| sprite_count = 1 |
| for el in file_elements: |
| img_path = el["metadata"].get("image_path") |
|
|
| |
| if not img_path: |
| continue |
|
|
| with open(img_path, "rb") as f: |
| base_file = base64.b64encode(f.read()).decode("utf-8") |
|
|
| image_uuid = str(uuid.uuid4()) |
| manipulated_json[f"Sprite {sprite_count}"] = { |
| "base64": base_file, |
| "file-path": img_path, |
| "pdf-id": pdf_id, |
| "image-uuid": image_uuid, |
| } |
|
|
| sprite_count += 1 |
|
|
| return manipulated_json |
| except Exception as e: |
| raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") |
|
|
| def similarity_matching(sprites_data: dict, project_folder: str, top_k: int = 1, min_similarity: float = None) -> str: |
| print("🔍 Running similarity matching…") |
| import os |
| import json |
| import numpy as np |
| import torch |
| from PIL import Image, ImageOps, ImageEnhance |
| from imagededup.methods import PHash |
| from transformers import AutoImageProcessor, AutoModel |
| import io |
| import base64 |
| from pathlib import Path |
| import cv2 |
| |
| from image_match.goldberg import ImageSignature |
| import sys |
| import math |
| import hashlib |
| from typing import List, Tuple |
| os.makedirs(project_folder, exist_ok=True) |
|
|
| |
| |
| |
| backdrop_base_path = os.path.normpath(str(BACKDROP_DIR)) |
| sprite_base_path = os.path.normpath(str(SPRITE_DIR)) |
| code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR)) |
| |
| |
| project_json_path = os.path.join(project_folder, "project.json") |
|
|
| |
| |
| |
| sprite_ids, sprite_base64 = [], [] |
| for sid, sprite in sprites_data.items(): |
| sprite_ids.append(sid) |
| sprite_base64.append(sprite["base64"]) |
|
|
| sprite_images_bytes = [] |
| sprite_b64_clean = [] |
| for b64 in sprite_base64: |
| |
| raw_b64 = b64.split(",")[-1] |
| sprite_b64_clean.append(raw_b64) |
|
|
| |
| img = Image.open(BytesIO(base64.b64decode(raw_b64))).convert("RGB") |
| buffer = BytesIO() |
| img.save(buffer, format="PNG") |
| buffer.seek(0) |
| sprite_images_bytes.append(buffer) |
| |
| def hybrid_similarity_matching(sprite_images_bytes, sprite_ids, min_similarity=None, top_k=5, method_weights=(0.5,0.3,0.2)): |
| from PIL import Image |
| |
| embeddings_path = os.path.join(BLOCKS_DIR, "hybrid_embeddings.json") |
| hash_path = os.path.join(BLOCKS_DIR, "phash_data.json") |
| signature_path = os.path.join(BLOCKS_DIR, "signature_data.json") |
| |
| |
| embedding_json = {} |
| if os.path.exists(embeddings_path): |
| with open(embeddings_path, "r", encoding="utf-8") as f: |
| embedding_json = json.load(f) |
| |
| |
| hash_dict = {} |
| if os.path.exists(hash_path): |
| try: |
| with open(hash_path, "r", encoding="utf-8") as f: |
| hash_data = json.load(f) |
| for path, hash_str in hash_data.items(): |
| try: |
| hash_dict[path] = hash_str |
| except Exception: |
| pass |
| except Exception: |
| pass |
| |
| |
| signature_dict = {} |
| sig_data = {} |
| if os.path.exists(signature_path): |
| try: |
| with open(signature_path, "r", encoding="utf-8") as f: |
| sig_data = json.load(f) |
| for path, sig_list in sig_data.items(): |
| try: |
| signature_dict[path] = np.array(sig_list) |
| except Exception: |
| pass |
| except Exception: |
| pass |
| |
| |
| paths_list = [] |
| embeddings_list = [] |
| if isinstance(embedding_json, dict): |
| for p, emb in embedding_json.items(): |
| if isinstance(emb, dict): |
| maybe_emb = emb.get("embedding") or emb.get("embeddings") or emb.get("emb") |
| if maybe_emb is None: |
| continue |
| arr = np.asarray(maybe_emb, dtype=np.float32) |
| elif isinstance(emb, list): |
| arr = np.asarray(emb, dtype=np.float32) |
| else: |
| continue |
| paths_list.append(os.path.normpath(str(p))) |
| embeddings_list.append(arr) |
| elif isinstance(embedding_json, list): |
| for item in embedding_json: |
| if not isinstance(item, dict): |
| continue |
| p = item.get("path") or item.get("image_path") or item.get("file") or item.get("filename") or item.get("img_path") |
| emb = item.get("embeddings") or item.get("embedding") or item.get("features") or item.get("vector") or item.get("emb") |
| if p is None or emb is None: |
| continue |
| paths_list.append(os.path.normpath(str(p))) |
| embeddings_list.append(np.asarray(emb, dtype=np.float32)) |
| |
| if len(paths_list) == 0: |
| print("⚠ No reference images/embeddings found (this test harness may be running without data)") |
| |
| return [[] for _ in sprite_images_bytes], [[] for _ in sprite_images_bytes], [] |
| |
| ref_matrix = np.vstack(embeddings_list).astype(np.float32) |
| |
| |
| sprite_emb_list = [] |
| sprite_phash_list = [] |
| sprite_sig_list = [] |
| per_sprite_final_indices = [] |
| per_sprite_final_scores = [] |
| per_sprite_rerank_debug = [] |
| for i, sprite_bytes in enumerate(sprite_images_bytes): |
| sprite_pil = Image.open(sprite_bytes) |
| enhanced_sprite = process_image_cv2_from_pil(sprite_pil, scale=2) or sprite_pil |
| |
| |
| sprite_emb = get_dinov2_embedding_from_pil(preprocess_for_model(enhanced_sprite)) |
| sprite_emb = sprite_emb if sprite_emb is not None else np.zeros(ref_matrix.shape[1]) |
| sprite_emb_list.append(sprite_emb) |
| |
| sprite_hash_arr = preprocess_for_hash(enhanced_sprite) |
| sprite_phash = None |
| if sprite_hash_arr is not None: |
| try: sprite_phash = phash.encode_image(image_array=sprite_hash_arr) |
| except: pass |
| sprite_phash_list.append(sprite_phash) |
| |
| sprite_sig = None |
| embedding_results, phash_results, imgmatch_results, combined_results = run_query_search_flow( |
| query_b64=sprite_b64_clean[i], |
| processed_dir=BLOCKS_DIR, |
| embeddings_dict=embedding_json, |
| hash_dict=hash_data, |
| signature_obj_map=sig_data, |
| gis=gis, |
| phash=phash, |
| MAX_PHASH_BITS=64, |
| k=5 |
| ) |
| |
| rerank_result = choose_top_candidates(embedding_results, phash_results, imgmatch_results, |
| top_k=top_k, method_weights=method_weights, verbose=True) |
| per_sprite_rerank_debug.append(rerank_result) |
| |
| |
| final = None |
| if len(rerank_result["consensus_topk"]) > 0: |
| consensus = rerank_result["consensus_topk"] |
| best = max(consensus, key=lambda p: rerank_result["weighted_scores_full"].get(p, 0.0)) |
| final = best |
| else: |
| final = rerank_result["weighted_topk"][0][0] if rerank_result["weighted_topk"] else None |
| |
| |
| if final is not None and final in paths_list: |
| idx = paths_list.index(final) |
| score = rerank_result["weighted_scores_full"].get(final, 0.0) |
| per_sprite_final_indices.append([idx]) |
| per_sprite_final_scores.append([score]) |
| print(f"Sprite '{sprite_ids}' FINAL selected: {final} (index {idx}) score={score:.4f}") |
| else: |
| per_sprite_final_indices.append([]) |
| per_sprite_final_scores.append([]) |
| |
| return per_sprite_final_indices, per_sprite_final_scores, paths_list |
| |
|
|
| |
| per_sprite_matched_indices, per_sprite_scores, paths_list = hybrid_similarity_matching( |
| sprite_images_bytes, sprite_ids, min_similarity, top_k, method_weights=(0.5, 0.3, 0.2) |
| ) |
|
|
| |
| |
| |
| project_data = [] |
| backdrop_data = [] |
| copied_sprite_folders = set() |
| copied_backdrop_folders = set() |
|
|
| matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) |
| print("matched_indices------------------>",matched_indices) |
|
|
| import shutil |
| import json |
| import os |
| from pathlib import Path |
| |
| |
| sprite_base_p = Path(sprite_base_path).resolve(strict=False) |
| backdrop_base_p = Path(backdrop_base_path).resolve(strict=False) |
| project_folder_p = Path(project_folder) |
| project_folder_p.mkdir(parents=True, exist_ok=True) |
| |
| copied_sprite_folders = set() |
| copied_backdrop_folders = set() |
| |
| def display_like_windows_no_lead(p: Path) -> str: |
| """ |
| For human-readable logs only — convert Path to a string like: |
| "app\\blocks\\Backdrops\\Castle 2.sb3" (no leading slash). |
| """ |
| s = p.as_posix() |
| if s.startswith("/"): |
| s = s[1:] |
| return s.replace("/", "\\") |
| |
| def is_subpath(child: Path, parent: Path) -> bool: |
| """Robust membership test: is child under parent?""" |
| try: |
| |
| child.relative_to(parent) |
| return True |
| except Exception: |
| return False |
| |
| |
| matched_indices = sorted({idx for lst in per_sprite_matched_indices for idx in lst}) |
| print("matched_indices------------------>", matched_indices) |
| |
| for matched_idx in matched_indices: |
| |
| if not (0 <= matched_idx < len(paths_list)): |
| print(f" ⚠ matched_idx {matched_idx} out of range, skipping") |
| continue |
| |
| matched_image_path = paths_list[matched_idx] |
| matched_path_p = Path(matched_image_path).resolve(strict=False) |
| matched_folder_p = matched_path_p.parent |
| matched_filename = matched_path_p.name |
| |
| |
| matched_folder_display = display_like_windows_no_lead(matched_folder_p) |
| |
| print(f"Processing matched image: {matched_image_path}") |
| print(f" - Folder: {matched_folder_display}") |
| print(f" - Sprite path: {display_like_windows_no_lead(sprite_base_p)}") |
| print(f" - Backdrop path: {display_like_windows_no_lead(backdrop_base_p)}") |
| print(f" - Filename: {matched_filename}") |
| |
| |
| folder_key = matched_folder_p.as_posix() |
| |
| |
| if is_subpath(matched_folder_p, sprite_base_p) and folder_key not in copied_sprite_folders: |
| print(f"Processing SPRITE folder: {matched_folder_display}") |
| copied_sprite_folders.add(folder_key) |
| |
| sprite_json_path = matched_folder_p / "sprite.json" |
| print("sprite_json_path----------------------->", sprite_json_path) |
| print("copied sprite folder----------------------->", copied_sprite_folders) |
| if sprite_json_path.exists() and sprite_json_path.is_file(): |
| try: |
| with sprite_json_path.open("r", encoding="utf-8") as f: |
| sprite_info = json.load(f) |
| project_data.append(sprite_info) |
| print(f" ✓ Successfully read sprite.json from {matched_folder_display}") |
| except Exception as e: |
| print(f" ✗ Failed to read sprite.json in {matched_folder_display}: {repr(e)}") |
| else: |
| print(f" ⚠ No sprite.json in {matched_folder_display}") |
| |
| |
| try: |
| sprite_files = list(matched_folder_p.iterdir()) |
| except Exception as e: |
| sprite_files = [] |
| print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}") |
| |
| print(f" Files in sprite folder: {[p.name for p in sprite_files]}") |
| for p in sprite_files: |
| fname = p.name |
| if fname in (matched_filename, "sprite.json"): |
| print(f" Skipping {fname} (matched image or sprite.json)") |
| continue |
| if p.is_file(): |
| dst = project_folder_p / fname |
| try: |
| shutil.copy2(str(p), str(dst)) |
| print(f" ✓ Copied sprite asset: {p} -> {dst}") |
| except Exception as e: |
| print(f" ✗ Failed to copy sprite asset {p}: {repr(e)}") |
| else: |
| print(f" Skipping {fname} (not a file)") |
| |
| |
| if is_subpath(matched_folder_p, backdrop_base_p) and folder_key not in copied_backdrop_folders: |
| print(f"Processing BACKDROP folder: {matched_folder_display}") |
| copied_backdrop_folders.add(folder_key) |
| print("backdrop_base_path----------------------->", display_like_windows_no_lead(backdrop_base_p)) |
| print("copied backdrop folder----------------------->", copied_backdrop_folders) |
| |
| |
| backdrop_src = matched_folder_p / matched_filename |
| backdrop_dst = project_folder_p / matched_filename |
| if backdrop_src.exists() and backdrop_src.is_file(): |
| try: |
| shutil.copy2(str(backdrop_src), str(backdrop_dst)) |
| print(f" ✓ Copied matched backdrop image: {backdrop_src} -> {backdrop_dst}") |
| except Exception as e: |
| print(f" ✗ Failed to copy matched backdrop image {backdrop_src}: {repr(e)}") |
| else: |
| print(f" ⚠ Matched backdrop source not found: {backdrop_src}") |
| |
| |
| try: |
| backdrop_files = list(matched_folder_p.iterdir()) |
| except Exception as e: |
| backdrop_files = [] |
| print(f" ✗ Failed to list files in {matched_folder_display}: {repr(e)}") |
| |
| print(f" Files in backdrop folder: {[p.name for p in backdrop_files]}") |
| for p in backdrop_files: |
| fname = p.name |
| if fname in (matched_filename, "project.json"): |
| print(f" Skipping {fname} (matched image or project.json)") |
| continue |
| if p.is_file(): |
| dst = project_folder_p / fname |
| try: |
| shutil.copy2(str(p), str(dst)) |
| print(f" ✓ Copied backdrop asset: {p} -> {dst}") |
| except Exception as e: |
| print(f" ✗ Failed to copy backdrop asset {p}: {repr(e)}") |
| else: |
| print(f" Skipping {fname} (not a file)") |
| |
| |
| pj = matched_folder_p / "project.json" |
| if pj.exists() and pj.is_file(): |
| try: |
| with pj.open("r", encoding="utf-8") as f: |
| bd_json = json.load(f) |
| stage_count = 0 |
| for tgt in bd_json.get("targets", []): |
| if tgt.get("isStage"): |
| backdrop_data.append(tgt) |
| stage_count += 1 |
| print(f" ✓ Successfully read project.json from {matched_folder_display}, found {stage_count} stage(s)") |
| except Exception as e: |
| print(f" ✗ Failed to read project.json in {matched_folder_display}: {repr(e)}") |
| else: |
| print(f" ⚠ No project.json in {matched_folder_display}") |
| |
| print("---") |
| |
| final_project = { |
| "targets": [], "monitors": [], "extensions": [], |
| "meta": { |
| "semver": "3.0.0", |
| "vm": "11.3.0", |
| "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" |
| } |
| } |
|
|
| |
| |
| for spr in project_data: |
| if not spr.get("isStage", False): |
| final_project["targets"].append(spr) |
| |
| if backdrop_data: |
| all_costumes, sounds = [], [] |
| seen_costumes = set() |
| for i, bd in enumerate(backdrop_data): |
| for costume in bd.get("costumes", []): |
| key = (costume.get("name"), costume.get("assetId")) |
| if key not in seen_costumes: |
| seen_costumes.add(key) |
| all_costumes.append(costume) |
| if i == 0: |
| sounds = bd.get("sounds", []) |
| stage_obj={ |
| "isStage": True, |
| "name": "Stage", |
| "objName": "Stage", |
| "variables": {}, |
| "lists": {}, |
| "broadcasts": {}, |
| "blocks": {}, |
| "comments": {}, |
| "currentCostume": 1 if len(all_costumes) > 1 else 0, |
| "costumes": all_costumes, |
| "sounds": sounds, |
| "volume": 100, |
| "layerOrder": 0, |
| "tempo": 60, |
| "videoTransparency": 50, |
| "videoState": "on", |
| "textToSpeechLanguage": None |
| } |
| final_project["targets"].insert(0, stage_obj) |
| else: |
| logger.warning("⚠️ No backdrop matched. Using default static backdrop.") |
| default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg" |
| default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" |
| default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav" |
| default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" |
| try: |
| shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name)) |
| logger.info(f"✅ Default backdrop copied to project: {default_backdrop_name}") |
| shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name)) |
| logger.info(f"✅ Default backdrop sound copied to project: {default_backdrop_sound_name}") |
| except Exception as e: |
| logger.error(f"❌ Failed to copy default backdrop: {e}") |
| stage_obj={ |
| "isStage": True, |
| "name": "Stage", |
| "objName": "Stage", |
| "variables": {}, |
| "lists": {}, |
| "broadcasts": {}, |
| "blocks": {}, |
| "comments": {}, |
| "currentCostume": 0, |
| "costumes": [ |
| { |
| "assetId": default_backdrop_name.split(".")[0], |
| "name": "defaultBackdrop", |
| "md5ext": default_backdrop_name, |
| "dataFormat": "svg", |
| "rotationCenterX": 240, |
| "rotationCenterY": 180 |
| } |
| ], |
| "sounds": [ |
| { |
| "name": "pop", |
| "assetId": "83a9787d4cb6f3b7632b4ddfebf74367", |
| "dataFormat": "wav", |
| "format": "", |
| "rate": 48000, |
| "sampleCount": 1123, |
| "md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav" |
| } |
| ], |
| "volume": 100, |
| "layerOrder": 0, |
| "tempo": 60, |
| "videoTransparency": 50, |
| "videoState": "on", |
| "textToSpeechLanguage": None |
| } |
| final_project["targets"].insert(0, stage_obj) |
|
|
| with open(project_json_path, 'w') as f: |
| json.dump(final_project, f, indent=2) |
|
|
| return project_json_path |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
|
|
|
|
| def convert_pdf_stream_to_images(pdf_stream: io.BytesIO, dpi=300): |
| |
| pdf_stream.seek(0) |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: |
| tmp_pdf.write(pdf_stream.read()) |
| tmp_pdf_path = tmp_pdf.name |
| |
| |
| images = convert_from_path(tmp_pdf_path, dpi=dpi) |
| return images |
|
|
| def delay_for_tpm_node(state: GameState): |
| logger.info("--- Running DelayForTPMNode ---") |
| time.sleep(10) |
| logger.info("Delay completed.") |
| return state |
|
|
| |
| workflow = StateGraph(GameState) |
| workflow.add_node("pseudo_generator", pseudo_generator_node) |
| workflow.add_node("Node_optimizer", node_optimizer) |
| workflow.add_node("layer_optimizer", layer_order_correction) |
| workflow.add_node("block_builder", overall_block_builder_node_2) |
| workflow.add_node("variable_initializer", variable_adder_node) |
| workflow.add_node("page_processed", processed_page_node) |
|
|
| workflow.set_entry_point("page_processed") |
| |
| def decide_next_step(state: GameState): |
| if state.get("processing", False): |
| return "pseudo_generator" |
| else: |
| return "layer_optimizer" |
|
|
| workflow.add_conditional_edges( |
| "page_processed", |
| decide_next_step, |
| { |
| "pseudo_generator": "pseudo_generator", |
| "layer_optimizer": "layer_optimizer" |
| } |
| ) |
| |
| workflow.add_edge("pseudo_generator", "Node_optimizer") |
| workflow.add_edge("Node_optimizer", "block_builder") |
| workflow.add_edge("block_builder", "variable_initializer") |
| workflow.add_edge("variable_initializer", "page_processed") |
| workflow.add_edge("layer_optimizer", END) |
|
|
| app_graph = workflow.compile() |
|
|
| |
| def upscale_image(image: Image.Image, scale: int = 2) -> Image.Image: |
| """ |
| Upscales a PIL image by a given scale factor. |
| """ |
| try: |
| width, height = image.size |
| new_size = (width * scale, height * scale) |
| upscaled_image = image.resize(new_size, Image.LANCZOS) |
| logger.info(f"✅ Upscaled image to {new_size}") |
| return upscaled_image |
| except Exception as e: |
| logger.error(f"❌ Error during image upscaling: {str(e)}") |
| return image |
|
|
| @log_execution_time |
| def create_sb3_archive(project_folder, project_id): |
| """ |
| Zips the project folder and renames it to an .sb3 file. |
| |
| Args: |
| project_folder (str): The path to the directory containing the project.json and assets. |
| project_id (str): The unique ID for the project, used for naming the .sb3 file. |
| |
| Returns: |
| str: The path to the created .sb3 file, or None if an error occurred. |
| """ |
| print(" --------------------------------------- create_sb3_archive INITIALIZE ---------------------------------------") |
| output_filename = GEN_PROJECT_DIR / project_id |
| print(" --------------------------------------- output_filename ---------------------------------------",output_filename) |
| zip_path = None |
| sb3_path = None |
| try: |
| zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder) |
| print(" --------------------------------------- zip_path_str ---------------------------------------", output_filename, project_folder) |
| logger.info(f"Project folder zipped to: {zip_path}") |
| |
| |
| sb3_path = f"{output_filename}.sb3" |
| os.rename(zip_path, sb3_path) |
| print(" --------------------------------------- rename paths ---------------------------------------", zip_path, sb3_path) |
| logger.info(f"Renamed {zip_path} to {sb3_path}") |
|
|
| return sb3_path |
| except Exception as e: |
| logger.error(f"Error creating SB3 archive for {project_id}: {e}") |
| |
| if zip_path and os.path.exists(zip_path): |
| os.remove(zip_path) |
| if sb3_path and os.path.exists(sb3_path): |
| os.remove(sb3_path) |
| return sb3_path |
|
|
| |
| |
| def save_pdf_to_generated_dir(pdf_stream: io.BytesIO, project_id: str) -> str: |
| """ |
| Copies the PDF at `pdf_stream` into GEN_PROJECT_DIR/project_id/, |
| renaming it to <project_id>.pdf. |
| |
| Args: |
| pdf_stream (io.BytesIO): Any existing stream to a PDF file. |
| project_id (str): Your unique project identifier. |
| |
| Returns: |
| str: Path to the copied PDF in the generated directory, |
| or None if something went wrong. |
| """ |
| |
| try: |
| |
| output_dir = GEN_PROJECT_DIR / project_id |
| output_dir.mkdir(parents=True, exist_ok=True) |
| print(f"\n--------------------------------output_dir {output_dir}") |
| |
| |
| target_pdf = output_dir / f"{project_id}.pdf" |
| print(f"\n--------------------------------target_pdf {target_pdf}") |
| |
| |
| |
| if isinstance(pdf_stream, io.BytesIO): |
| with open(target_pdf, "wb") as f: |
| f.write(pdf_stream.getbuffer()) |
| else: |
| shutil.copy2(pdf_stream, target_pdf) |
| print(f"Copied PDF from {pdf_stream} → {target_pdf}") |
| logger.info(f"Copied PDF from {pdf_stream} → {target_pdf}") |
| |
| |
|
|
| return str(target_pdf) |
|
|
| except Exception as e: |
| logger.error(f"Failed to save PDF to generated dir: {e}", exc_info=True) |
| return None |
|
|
| @app.route('/') |
| def index(): |
| return render_template('app_index.html') |
|
|
| @app.route("/download_sb3/<project_id>", methods=["GET"]) |
| def download_sb3(project_id): |
| sb3_path = GEN_PROJECT_DIR / f"{project_id}.sb3" |
| if not sb3_path.exists(): |
| return jsonify({"error": "Scratch project file not found"}), 404 |
|
|
| return send_file( |
| sb3_path, |
| as_attachment=True, |
| download_name=sb3_path.name |
| ) |
|
|
| @app.route("/download_pdf/<project_id>", methods=["GET"]) |
| def download_pdf(project_id): |
| pdf_path = GEN_PROJECT_DIR / project_id / f"{project_id}.pdf" |
| if not pdf_path.exists(): |
| return jsonify({"error": "Scratch project file not found"}), 404 |
|
|
| return send_file( |
| pdf_path, |
| as_attachment=True, |
| download_name=pdf_path.name |
| ) |
|
|
| @app.route("/download_sound/<sound_id>", methods=["GET"]) |
| def download_sound(sound_id): |
| sound_path = SOUND_DIR / f"{sound_id}.wav" |
| if not sound_path.exists(): |
| return jsonify({"error": "Scratch project sound file not found"}), 404 |
|
|
| return send_file( |
| sound_path, |
| as_attachment=True, |
| download_name=sound_path.name |
| ) |
|
|
| |
| @app.route('/process_pdf', methods=['POST']) |
| def process_pdf(): |
| try: |
| logger.info("Received request to process PDF.") |
| if 'pdf_file' not in request.files: |
| logger.warning("No PDF file found in request.") |
| return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 |
|
|
| pdf_file = request.files['pdf_file'] |
| if pdf_file.filename == '': |
| return jsonify({"error": "Empty filename"}), 400 |
|
|
| |
| |
| |
| project_id = str(uuid.uuid4()).replace('-', '') |
| |
| project_folder = OUTPUT_DIR / project_id |
| |
| pdf_bytes = pdf_file.read() |
| pdf_stream = io.BytesIO(pdf_bytes) |
| logger.info(f"Saved uploaded PDF to: {pdf_stream}") |
|
|
| |
| start_time = time.time() |
| pdf= save_pdf_to_generated_dir(pdf_stream, project_id) |
| logger.info(f"Saved uploaded PDF to: {pdf_file}: {pdf}") |
| print("--------------------------------pdf_file_path---------------------",pdf_file,pdf_stream) |
| total_time = time.time() - start_time |
| print(f"-----------------------------Execution Time save_pdf_to_generated_dir() : {total_time}-----------------------------\n") |
| start_time = time.time() |
| |
| output_path = extract_images_from_pdf(pdf_stream,project_folder) |
| print(" --------------------------------------- zip_path_str ---------------------------------------", output_path) |
| total_time = time.time() - start_time |
| print(f"-----------------------------Execution Time extract_images_from_pdf() : {total_time}-----------------------------\n") |
| start_time = time.time() |
| project_output = similarity_matching(output_path, project_folder) |
| logger.info("Received request to process PDF.") |
| total_time = time.time() - start_time |
| print(f"-----------------------------Execution Time similarity_matching() : {total_time}-----------------------------\n") |
| |
| with open(project_output, 'r') as f: |
| project_skeleton = json.load(f) |
|
|
| if isinstance(pdf_stream, io.BytesIO): |
| images = convert_pdf_stream_to_images(pdf_stream, dpi=300) |
| else: |
| images = convert_from_path(pdf_stream, dpi=300) |
| |
| |
| initial_state_dict = { |
| "project_json": project_skeleton, |
| "description": "The pseudo code for the script", |
| "project_id": project_id, |
| "project_image": images, |
| "action_plan": {}, |
| "pseudo_code": {}, |
| "temporary_node": {}, |
| "processing":True, |
| "page_count": 0, |
| "temp_pseudo_code":[], |
| } |
| |
| final_state_dict = app_graph.invoke(initial_state_dict,config={"recursion_limit": 200}) |
| final_project_json = final_state_dict['project_json'] |
| |
| |
| |
| with open(project_output, "w") as f: |
| json.dump(final_project_json, f, indent=2) |
| logger.info(f"Final project JSON saved to {project_output}") |
|
|
| |
| sb3_file_path = create_sb3_archive(project_folder, project_id) |
| |
| if sb3_file_path: |
| logger.info(f"Successfully created SB3 file: {sb3_file_path}") |
| |
| download_url = f"https://prthm11-scratch-vision-game.hf.space/download_sb3/{project_id}" |
| pdf_url = f"https://prthm11-scratch-vision-game.hf.space/download_pdf/{project_id}" |
| print(f"DOWNLOAD_URL: {download_url}") |
| print(f"PDF_URL: {pdf_url}") |
| |
| return jsonify({ |
| "message": "✅ PDF processed successfully", |
| "output_json": "output_path", |
| "sprites": "result", |
| "project_output_json": "project_output", |
| "test_url": download_url |
| }) |
| else: |
| return jsonify({ |
| "message": "❌ Scanned images are not clear please retry!", |
| "isError": True, |
| "output_json": "output_path", |
| "sprites": "result", |
| "project_output_json": "project_output", |
| "test_url": download_url |
| }), 500 |
|
|
| except Exception as e: |
| logger.error(f"Error during processing the pdf workflow for project ID {project_id}: {e}", exc_info=True) |
| return jsonify({ |
| "message": "❌ Scanned images are not clear please retry!", |
| "isError": True, |
| "output_json": "output_path", |
| "sprites": "result", |
| "project_output_json": "project_output", |
| "test_url": "download_url" |
| }), 500 |
| |
| if __name__ == '__main__': |
| |
| app.run(host='0.0.0.0', port=7860, debug=True) |