Spaces:
Running
Running
| import os | |
| import uuid | |
| import base64 | |
| import cv2 | |
| import numpy as np | |
| import json | |
| from datetime import datetime | |
| from flask import current_app, request, jsonify, render_template, url_for, Response | |
| from .common import main_bp, get_db_connection, login_required, current_user | |
| from processing import crop_image_perspective, remove_color_from_image, create_pdf_from_full_images | |
| from redact import redact_pictures_in_image | |
| from strings import ROUTE_PROCESS_CROP_V2, METHOD_POST | |
| NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY") | |
| NVIDIA_NIM_AVAILABLE = bool(NVIDIA_API_KEY) | |
| def crop_interface_v2(session_id, image_index): | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| if not session_owner or session_owner['user_id'] != current_user.id: | |
| conn.close(); return "Unauthorized", 403 | |
| # Check if two-page mode is enabled | |
| two_page_mode = getattr(current_user, 'two_page_crop', 0) | |
| if two_page_mode: | |
| # In two-page mode, image_index represents the pair index (0 = pages 0-1, 1 = pages 2-3, etc.) | |
| pair_index = image_index | |
| left_page_index = pair_index * 2 | |
| right_page_index = left_page_index + 1 | |
| left_image = conn.execute("SELECT * FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (session_id, left_page_index)).fetchone() | |
| right_image = conn.execute("SELECT * FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (session_id, right_page_index)).fetchone() | |
| if not left_image: | |
| conn.close(); return "Original page/image not found.", 404 | |
| total_pages = conn.execute("SELECT COUNT(*) FROM images WHERE session_id = ? AND image_type = 'original'", (session_id,)).fetchone()[0] | |
| total_pairs = (total_pages + 1) // 2 # Round up for odd number of pages | |
| all_pages = [{'image_index': row['image_index'], 'filename': row['filename']} for row in conn.execute("SELECT image_index, filename FROM images WHERE session_id = ? AND image_type = 'original' ORDER BY image_index ASC", (session_id,)).fetchall()] | |
| conn.close() | |
| return render_template('cropv2.html', | |
| session_id=session_id, | |
| user_id=current_user.id, | |
| image_index=pair_index, | |
| image_info=left_image, | |
| right_image_info=dict(right_image) if right_image else None, | |
| total_pages=total_pairs, | |
| all_pages=all_pages, | |
| two_page_mode=True, | |
| left_page_index=left_page_index, | |
| right_page_index=right_page_index | |
| ) | |
| else: | |
| # Standard single-page mode | |
| image_info = conn.execute("SELECT * FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (session_id, image_index)).fetchone() | |
| if not image_info: conn.close(); return "Original page/image not found.", 404 | |
| total_pages = conn.execute("SELECT COUNT(*) FROM images WHERE session_id = ? AND image_type = 'original'", (session_id,)).fetchone()[0] | |
| all_pages = [{'image_index': row['image_index'], 'filename': row['filename']} for row in conn.execute("SELECT image_index, filename FROM images WHERE session_id = ? AND image_type = 'original' ORDER BY image_index ASC", (session_id,)).fetchall()] | |
| conn.close() | |
| return render_template('cropv2.html', | |
| session_id=session_id, | |
| user_id=current_user.id, | |
| image_index=image_index, | |
| image_info=image_info, | |
| total_pages=total_pages, | |
| all_pages=all_pages, | |
| two_page_mode=False | |
| ) | |
| def process_crop_v2(): | |
| data = request.json | |
| session_id, page_index, boxes_data, image_data_url = data['session_id'], data['image_index'], data['boxes'], data.get('imageData') | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| if not session_owner or session_owner['user_id'] != current_user.id: | |
| conn.close(); return jsonify({'error': 'Unauthorized'}), 403 | |
| page_info = conn.execute("SELECT filename FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (session_id, page_index)).fetchone() | |
| if not page_info: conn.close(); return jsonify({'error': 'Original page not found'}), 404 | |
| try: | |
| header, encoded = image_data_url.split(",", 1) | |
| image_data = base64.b64decode(encoded) | |
| temp_filename = f"temp_filtered_{page_info['filename']}" | |
| temp_path = os.path.join(current_app.config['PROCESSED_FOLDER'], temp_filename) | |
| with open(temp_path, "wb") as f: f.write(image_data) | |
| existing_cropped = conn.execute("SELECT id, processed_filename FROM images WHERE session_id = ? AND filename = ? AND image_type = 'cropped'", (session_id, page_info['filename'])).fetchall() | |
| for cropped_img in existing_cropped: | |
| if cropped_img['processed_filename']: | |
| try: os.remove(os.path.join(current_app.config['PROCESSED_FOLDER'], cropped_img['processed_filename'])) | |
| except OSError: pass | |
| conn.execute("DELETE FROM questions WHERE session_id = ? AND image_id = ?", (session_id, cropped_img['id'])) | |
| conn.execute("DELETE FROM images WHERE session_id = ? AND filename = ? AND image_type = 'cropped'", (session_id, page_info['filename'])) | |
| local_source_ids = set() | |
| for box in boxes_data: | |
| if box.get('remote_stitch_source'): | |
| src = box['remote_stitch_source'] | |
| if src.get('page_index') == page_index: local_source_ids.add(box['remote_stitch_source']['box']['id']) | |
| primary_boxes = [box for box in boxes_data if not box.get('stitch_to')] | |
| processed_boxes = [] | |
| for i, primary_box in enumerate(primary_boxes): | |
| if primary_box['id'] in local_source_ids: continue | |
| if primary_box.get('remote_stitch_source'): | |
| source_info = primary_box['remote_stitch_source'] | |
| source_page_index, source_box = source_info['page_index'], source_info['box'] | |
| if 'id' in source_box: | |
| source_img_row = conn.execute("SELECT id, processed_filename FROM images WHERE session_id = ? AND box_id = ?", (session_id, str(source_box['id']))).fetchone() | |
| if source_img_row: | |
| conn.execute("DELETE FROM questions WHERE image_id = ?", (source_img_row['id'],)) | |
| conn.execute("DELETE FROM images WHERE id = ?", (source_img_row['id'],)) | |
| source_page_db = conn.execute("SELECT filename FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (session_id, source_page_index)).fetchone() | |
| if source_page_db: | |
| source_path = os.path.join(current_app.config['UPLOAD_FOLDER'], source_page_db['filename']) | |
| if os.path.exists(source_path): | |
| src_points = [{'x': source_box['x'], 'y': source_box['y']}, {'x': source_box['x'] + source_box['w'], 'y': source_box['y']}, {'x': source_box['x'] + source_box['w'], 'y': source_box['y'] + source_box['h']}, {'x': source_box['x'], 'y': source_box['y'] + source_box['h']}] | |
| parent_crop = crop_image_perspective(source_path, src_points) | |
| child_points = [{'x': primary_box['x'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y'] + primary_box['h']}, {'x': primary_box['x'], 'y': primary_box['y'] + primary_box['h']}] | |
| child_crop = crop_image_perspective(temp_path, child_points) | |
| h1, w1 = parent_crop.shape[:2]; h2, w2 = child_crop.shape[:2]; max_width = max(w1, w2) | |
| stitched_image = np.full((h1 + h2, max_width, 3), 255, dtype=np.uint8) | |
| stitched_image[0:h1, (max_width - w1) // 2 : (max_width - w1) // 2 + w1] = parent_crop | |
| stitched_image[h1:h1 + h2, (max_width - w2) // 2 : (max_width - w2) // 2 + w2] = child_crop | |
| else: stitched_image = crop_image_perspective(temp_path, [{'x': primary_box['x'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y'] + primary_box['h']}, {'x': primary_box['x'], 'y': primary_box['y'] + primary_box['h']}]) | |
| else: stitched_image = crop_image_perspective(temp_path, [{'x': primary_box['x'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y'] + primary_box['h']}, {'x': primary_box['x'], 'y': primary_box['y'] + primary_box['h']}]) | |
| else: | |
| children = [box for box in boxes_data if box.get('stitch_to') == primary_box['id']] | |
| primary_crop = crop_image_perspective(temp_path, [{'x': primary_box['x'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y']}, {'x': primary_box['x'] + primary_box['w'], 'y': primary_box['y'] + primary_box['h']}, {'x': primary_box['x'], 'y': primary_box['y'] + primary_box['h']}]) | |
| stitched_image = primary_crop | |
| if children: | |
| child = children[0] | |
| child_crop = crop_image_perspective(temp_path, [{'x': child['x'], 'y': child['y']}, {'x': child['x'] + child['w'], 'y': child['y']}, {'x': child['x'] + child['w'], 'y': child['y'] + child['h']}, {'x': child['x'], 'y': child['y'] + child['h']}]) | |
| h1, w1 = primary_crop.shape[:2]; h2, w2 = child_crop.shape[:2]; max_width = max(w1, w2) | |
| stitched_image = np.full((h1 + h2, max_width, 3), 255, dtype=np.uint8) | |
| stitched_image[0:h1, (max_width - w1) // 2 : (max_width - w1) // 2 + w1] = primary_crop | |
| stitched_image[h1:h1 + h2, (max_width - w2) // 2 : (max_width - w2) // 2 + w2] = child_crop | |
| crop_filename = f"processed_{session_id}_page{page_index}_crop{i}.jpg" | |
| cv2.imwrite(os.path.join(current_app.config['PROCESSED_FOLDER'], crop_filename), stitched_image) | |
| processed_boxes.append({'original_filename': page_info['filename'], 'processed_filename': crop_filename, 'box_id': str(primary_box['id']), 'question_number': primary_box.get('question_number'), 'status': primary_box.get('status'), 'marked_solution': primary_box.get('marked_solution'), 'actual_solution': primary_box.get('actual_solution')}) | |
| max_idx = conn.execute('SELECT MAX(image_index) FROM images WHERE session_id = ?', (session_id,)).fetchone()[0] | |
| next_idx = (max_idx if max_idx is not None else -1) + 1 | |
| for i, p_box in enumerate(processed_boxes): | |
| conn.execute('INSERT INTO images (session_id, image_index, filename, original_name, processed_filename, image_type, box_id) VALUES (?, ?, ?, ?, ?, ?, ?)', (session_id, next_idx + i, p_box['original_filename'], f"Page {page_index + 1} - Q{i + 1}", p_box['processed_filename'], 'cropped', p_box['box_id'])) | |
| img_id = conn.execute('SELECT last_insert_rowid()').fetchone()[0] | |
| conn.execute("INSERT INTO questions (session_id, image_id, question_number, status, marked_solution, actual_solution) VALUES (?, ?, ?, ?, ?, ?)", (session_id, img_id, p_box.get('question_number'), p_box.get('status', 'unattempted'), p_box.get('marked_solution'), p_box.get('actual_solution'))) | |
| conn.commit(); os.remove(temp_path); return jsonify({'success': True, 'processed_count': len(processed_boxes)}) | |
| except Exception as e: conn.rollback(); return jsonify({'error': str(e)}), 500 | |
| finally: conn.close() | |
| def process_color_rm_batch(): | |
| data = request.json | |
| session_id, colors, threshold, bg_mode, region = data.get('session_id'), data.get('colors', []), data.get('threshold', 0.8), data.get('bg_mode', 'black'), data.get('region') | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| if not session_owner or session_owner['user_id'] != current_user.id: | |
| conn.close(); return jsonify({'error': 'Unauthorized'}), 403 | |
| original_images = conn.execute("SELECT * FROM images WHERE session_id = ? AND image_type = 'original' ORDER BY image_index", (session_id,)).fetchall() | |
| processed_count = 0 | |
| try: | |
| for img in original_images: | |
| original_path = os.path.join(current_app.config['UPLOAD_FOLDER'], img['filename']) | |
| if not os.path.exists(original_path): continue | |
| processed_img_cv = remove_color_from_image(original_path, colors, threshold, bg_mode, region) | |
| processed_filename = f"color_rm_{session_id}_{img['image_index']}_{datetime.now().strftime('%H%M%S')}.png" | |
| cv2.imwrite(os.path.join(current_app.config['PROCESSED_FOLDER'], processed_filename), processed_img_cv) | |
| existing = conn.execute("SELECT id FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'color_rm'", (session_id, img['image_index'])).fetchone() | |
| if existing: conn.execute("UPDATE images SET processed_filename = ?, filename = ? WHERE id = ?", (processed_filename, img['filename'], existing['id'])) | |
| else: conn.execute('INSERT INTO images (session_id, image_index, filename, original_name, processed_filename, image_type) VALUES (?, ?, ?, ?, ?, ?)', (session_id, img['image_index'], img['filename'], img['original_name'], processed_filename, 'color_rm')) | |
| processed_count += 1 | |
| conn.commit(); return jsonify({'success': True, 'count': processed_count}) | |
| except Exception as e: return jsonify({'error': str(e)}), 500 | |
| finally: conn.close() | |
| def color_rm_entry(): return render_template('color_rm_upload.html') | |
| def color_rm_interface(session_id, image_index): | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| if not session_owner or session_owner['user_id'] != current_user.id: | |
| conn.close(); return "Unauthorized", 403 | |
| image_info = conn.execute("SELECT * FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (session_id, image_index)).fetchone() | |
| if not image_info: conn.close(); return "Original page not found", 404 | |
| total_pages = conn.execute("SELECT COUNT(*) FROM images WHERE session_id = ? AND image_type = 'original'", (session_id,)).fetchone()[0] | |
| conn.close() | |
| return render_template('color_rm.html', session_id=session_id, user_id=current_user.id, image_index=image_index, image_info=dict(image_info), total_pages=total_pages) | |
| def process_color_rm(): | |
| data = request.json | |
| sid, idx, url = data.get('session_id'), data.get('image_index'), data.get('imageData') | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (sid,)).fetchone() | |
| if not session_owner or session_owner['user_id'] != current_user.id: | |
| conn.close(); return jsonify({'error': 'Unauthorized'}), 403 | |
| page_info = conn.execute("SELECT filename, original_name FROM images WHERE session_id = ? AND image_index = ? AND image_type = 'original'", (sid, idx)).fetchone() | |
| if not page_info: conn.close(); return jsonify({'error': 'Original page not found'}), 404 | |
| try: | |
| header, encoded = url.split(",", 1) | |
| image_data = base64.b64decode(encoded) | |
| fname = f"color_rm_{sid}_{idx}_{datetime.now().strftime('%H%M%S')}.png" | |
| with open(os.path.join(current_app.config['PROCESSED_FOLDER'], fname), "wb") as f: f.write(image_data) | |
| conn.execute('INSERT INTO images (session_id, image_index, filename, original_name, processed_filename, image_type) VALUES (?, ?, ?, ?, ?, ?)', (sid, idx, page_info['filename'], page_info['original_name'], fname, 'color_rm')) | |
| conn.commit(); return jsonify({'success': True, 'filename': fname, 'url': url_for('main.serve_processed_file', filename=fname)}) | |
| except Exception as e: return jsonify({'error': str(e)}), 500 | |
| finally: conn.close() | |
| def redact_status(session_id): | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| conn.close() | |
| if not session_owner or session_owner['user_id'] != current_user.id: return "Unauthorized", 403 | |
| return render_template('redact_status.html', session_id=session_id) | |
| def redaction_stream(session_id): | |
| def generate(): | |
| conn = get_db_connection() | |
| session_owner = conn.execute('SELECT user_id FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| if not session_owner or session_owner['user_id'] != current_user.id: | |
| conn.close(); yield f"data: {json.dumps({'error': 'Unauthorized'})}\n\n"; return | |
| if not NVIDIA_NIM_AVAILABLE: yield f"data: {json.dumps({'error': 'NVIDIA API Key not configured.'})}\n\n"; return | |
| images = conn.execute("SELECT id, filename FROM images WHERE session_id = ? AND image_type = 'original' ORDER BY image_index", (session_id,)).fetchall() | |
| if not images: conn.close(); yield f"data: {json.dumps({'error': 'No images found.'})}\n\n"; return | |
| redacted_paths, source_names, total = [], [], len(images) | |
| try: | |
| for i, img_row in enumerate(images): | |
| yield f"data: {json.dumps({'progress': int(((i + 1) / total) * 100), 'message': f'Redacting page {i + 1} of {total}...'})}\n\n" | |
| orig_path = os.path.join(current_app.config['UPLOAD_FOLDER'], img_row['filename']) | |
| if not os.path.exists(orig_path): continue | |
| redacted_img = redact_pictures_in_image(orig_path, NVIDIA_API_KEY) | |
| proc_name = f"redacted_{img_row['filename']}" | |
| proc_path = os.path.join(current_app.config['PROCESSED_FOLDER'], proc_name) | |
| redacted_img.save(proc_path, 'PNG'); redacted_paths.append(proc_path); source_names.append(img_row['filename']) | |
| conn.execute("UPDATE images SET processed_filename = ? WHERE id = ?", (proc_name, img_row['id'])); conn.commit() | |
| yield f"data: {json.dumps({'progress': 100, 'message': 'Assembling final PDF...'})}\n\n" | |
| pdf_name = f"redacted_document_{session_id}.pdf" | |
| pdf_path = os.path.join(current_app.config['OUTPUT_FOLDER'], pdf_name) | |
| if not create_pdf_from_full_images(redacted_paths, pdf_path): raise Exception("Failed to create PDF.") | |
| session_info = conn.execute('SELECT original_filename FROM sessions WHERE id = ?', (session_id,)).fetchone() | |
| conn.execute('INSERT INTO generated_pdfs (session_id, filename, subject, tags, notes, source_filename, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)', (session_id, pdf_name, f"Redacted - {session_info['original_filename'] if session_info else 'Document'}", 'redacted', 'Automatically redacted.', ", ".join(source_names), current_user.id)) | |
| conn.commit(); yield f"data: {json.dumps({'complete': True, 'download_url': url_for('main.download_file', filename=pdf_name)})}\n\n" | |
| except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" | |
| finally: conn.close() | |
| return Response(generate(), mimetype='text/event-stream') |