"""Web application for docstrange document extraction.""" import os import sys import json import tempfile import importlib.metadata from pathlib import Path from typing import Optional from flask import Flask, request, jsonify, render_template, send_from_directory from werkzeug.utils import secure_filename from werkzeug.exceptions import RequestEntityTooLarge from .extractor import DocumentExtractor from .exceptions import ConversionError, UnsupportedFormatError, FileNotFoundError app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size # Global settings storage _settings = { 'api_key': None, 'nextcloud_url': None, 'nextcloud_user': None, 'nextcloud_password': None, 'nextcloud_verify_ssl': True } # Create a urllib3 warning suppressor for self-signed certs import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def check_gpu_availability(): """Check if GPU is available for processing.""" try: import torch return torch.cuda.is_available() except ImportError: return False def get_gpu_name(): """Get the name of the available GPU.""" try: import torch if torch.cuda.is_available(): return torch.cuda.get_device_name(0) except ImportError: pass return None def download_models(): """Download models synchronously before starting the app.""" print("Starting model download...") # Check GPU availability gpu_available = check_gpu_availability() if gpu_available: print("GPU detected - downloading GPU models") # Download GPU models extractor = DocumentExtractor(gpu=True) else: print("GPU not available - using cloud processing") # Use cloud processing when GPU is not available extractor = DocumentExtractor() # Test extraction to trigger model downloads print("Downloading models...") # Create a simple test file to trigger model downloads test_content = "Test document for model download." with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as tmp_file: tmp_file.write(test_content) test_file_path = tmp_file.name try: # This will trigger model downloads result = extractor.extract(test_file_path) print("Model download completed successfully") except Exception as e: print(f"Model download warning: {e}") # Don't fail completely, just log the warning finally: # Clean up test file if os.path.exists(test_file_path): os.unlink(test_file_path) def create_extractor_with_mode(processing_mode, api_key=None): """Create DocumentExtractor with proper error handling for processing mode.""" if processing_mode == 'gpu': if not check_gpu_availability(): raise ValueError("GPU mode selected but GPU is not available. Please install PyTorch with CUDA support.") return DocumentExtractor(gpu=True, api_key=api_key or _settings.get('api_key')) else: # cloud mode (default) return DocumentExtractor(api_key=api_key or _settings.get('api_key')) # Initialize the document extractor extractor = DocumentExtractor() @app.route('/') def index(): """Serve the main page.""" return render_template('index.html') @app.route('/static/') def static_files(filename): """Serve static files.""" return send_from_directory('static', filename) @app.route('/api/extract', methods=['POST']) def extract_document(): """API endpoint for document extraction with multi-format support.""" try: # Check if file was uploaded if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Get parameters output_format = request.form.get('output_format', 'markdown') processing_mode = request.form.get('processing_mode', 'cloud') api_key = request.form.get('api_key') or _settings.get('api_key') return_all_formats = request.form.get('all_formats', 'false') == 'true' # Create extractor based on processing mode try: extractor = create_extractor_with_mode(processing_mode, api_key) except ValueError as e: return jsonify({'error': str(e)}), 400 # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file: file.save(tmp_file.name) tmp_path = tmp_file.name try: # Extract content result = extractor.extract(tmp_path) # If requesting all formats, return dict with all formats if return_all_formats: content = { 'markdown': result.extract_markdown(), 'html': result.extract_html(), 'json': result.extract_data(), 'text': result.extract_text() } # Generate CSV if possible try: content['csv'] = result.extract_csv(include_all_tables=True) except Exception: content['csv'] = None else: # Convert to requested format if output_format == 'markdown': content = result.extract_markdown() elif output_format == 'html': content = result.extract_html() elif output_format == 'json': content = result.extract_data() elif output_format == 'csv': try: content = result.extract_csv(include_all_tables=True) except Exception as e: content = f"CSV extraction failed: {str(e)}" elif output_format == 'flat-json': content = result.extract_data() elif output_format == 'text': content = result.extract_text() else: content = result.extract_markdown() # Get metadata metadata = { 'file_type': Path(file.filename).suffix.lower(), 'file_name': file.filename, 'file_size': os.path.getsize(tmp_path), 'pages_processed': getattr(result, 'pages_processed', 1), 'processing_time': getattr(result, 'processing_time', 0), 'output_format': output_format, 'processing_mode': processing_mode, 'tables_found': len(getattr(result, 'tables', [])), 'images_found': len(getattr(result, 'images', [])) } return jsonify({ 'success': True, 'content': content, 'metadata': metadata }) finally: # Clean up temporary file if os.path.exists(tmp_path): os.unlink(tmp_path) except RequestEntityTooLarge: return jsonify({'error': 'File too large. Maximum size is 100MB.'}), 413 except UnsupportedFormatError as e: return jsonify({'error': f'Unsupported file format: {str(e)}'}), 400 except ConversionError as e: return jsonify({'error': f'Conversion error: {str(e)}'}), 500 except Exception as e: return jsonify({'error': f'Unexpected error: {str(e)}'}), 500 @app.route('/api/supported-formats') def get_supported_formats(): """Get list of supported file formats.""" formats = extractor.get_supported_formats() return jsonify({'formats': formats}) @app.route('/api/health') def health_check(): """Health check endpoint.""" return jsonify({'status': 'healthy', 'version': '1.0.0'}) @app.route('/api/system-info') def get_system_info(): """Get system information including GPU availability.""" gpu_available = check_gpu_availability() gpu_name = get_gpu_name() # Get docstrange version try: ds_version = importlib.metadata.version('docstrange') except Exception: ds_version = '1.1.8' # Get additional system info system_info = { 'gpu_available': gpu_available, 'gpu_name': gpu_name, 'python_version': f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}', 'docstrange_version': ds_version, 'processing_modes': { 'cloud': { 'available': True, 'description': 'Process using cloud API. Fast and requires no local setup.' }, 'gpu': { 'available': gpu_available, 'description': 'Process locally using GPU. Fastest local processing, requires CUDA.' if gpu_available else 'GPU not available. Install PyTorch with CUDA support.' } } } return jsonify(system_info) @app.route('/api/settings/api-key', methods=['POST']) def save_api_key(): """Save API key to session settings.""" data = request.get_json() if not data or 'api_key' not in data: return jsonify({'error': 'API key is required'}), 400 api_key = data['api_key'].strip() if not api_key: return jsonify({'error': 'API key cannot be empty'}), 400 _settings['api_key'] = api_key return jsonify({'success': True, 'message': 'API key saved successfully'}) @app.route('/api/settings/api-key', methods=['GET']) def get_api_key_status(): """Get API key status (does not return the key for security).""" has_key = _settings.get('api_key') is not None return jsonify({'has_api_key': has_key}) @app.route('/api/settings/api-key', methods=['DELETE']) def delete_api_key(): """Delete saved API key.""" _settings['api_key'] = None return jsonify({'success': True, 'message': 'API key removed'}) @app.route('/api/erpnext/extract', methods=['POST']) def erpnext_extract(): """ERPNext API endpoint for document extraction. Integrates with ERPNext by accepting file URLs or base64 content and returning structured JSON/Markdown suitable for ERPNext doctypes. Request JSON: { "file_url": "https://example.com/invoice.pdf", # OR "file_content": "base64_encoded_content", "file_name": "invoice.pdf", "output_format": "markdown|json|csv|html", "processing_mode": "cloud|gpu", "api_key": "optional_api_key" } """ try: data = request.get_json() if not data: return jsonify({'error': 'JSON body is required'}), 400 file_url = data.get('file_url') file_content = data.get('file_content') file_name = data.get('file_name', 'document.pdf') output_format = data.get('output_format', 'markdown') processing_mode = data.get('processing_mode', 'cloud') api_key = data.get('api_key') or _settings.get('api_key') if not file_url and not file_content: return jsonify({'error': 'Either file_url or file_content is required'}), 400 # Create extractor based on processing mode try: extractor = create_extractor_with_mode(processing_mode, api_key) except ValueError as e: return jsonify({'error': str(e)}), 400 # Get file content from URL or base64 import base64 import requests as http_requests if file_content: # Decode base64 content try: file_bytes = base64.b64decode(file_content) except Exception: return jsonify({'error': 'Invalid base64 content'}), 400 elif file_url: # Download from URL try: response = http_requests.get(file_url, timeout=60) response.raise_for_status() file_bytes = response.content except Exception as e: return jsonify({'error': f'Failed to download file: {str(e)}'}), 400 # Save to temporary file suffix = Path(file_name).suffix or '.pdf' with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: tmp_file.write(file_bytes) tmp_path = tmp_file.name try: # Extract content result = extractor.extract(tmp_path) # Convert to requested format if output_format == 'markdown': content = result.extract_markdown() elif output_format == 'html': content = result.extract_html() elif output_format == 'json': content = result.extract_data() elif output_format == 'csv': content = result.extract_csv(include_all_tables=True) elif output_format == 'text': content = result.extract_text() else: content = result.extract_markdown() # ERPNext-friendly response return jsonify({ 'success': True, 'data': content, 'format': output_format, 'metadata': { 'file_name': file_name, 'file_size': len(file_bytes), 'pages_processed': getattr(result, 'pages_processed', 1), 'processing_time': getattr(result, 'processing_time', 0), 'processing_mode': processing_mode } }) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except Exception as e: return jsonify({'error': f'ERPNext extraction error: {str(e)}'}), 500 @app.route('/api/nextcloud/settings', methods=['POST']) def save_nextcloud_settings(): """Save Nextcloud connection settings.""" data = request.get_json() if not data: return jsonify({'error': 'JSON body is required'}), 400 _settings['nextcloud_url'] = data.get('url', '').rstrip('/') _settings['nextcloud_user'] = data.get('user', '') _settings['nextcloud_password'] = data.get('password', '') _settings['nextcloud_verify_ssl'] = data.get('verify_ssl', True) return jsonify({'success': True, 'message': 'Nextcloud settings saved'}) @app.route('/api/nextcloud/test', methods=['POST']) def test_nextcloud_connection(): """Test Nextcloud WebDAV connection.""" try: data = request.get_json() or {} url = data.get('url') or _settings.get('nextcloud_url') user = data.get('user') or _settings.get('nextcloud_user') password = data.get('password') or _settings.get('nextcloud_password') verify_ssl = data.get('verify_ssl', _settings.get('nextcloud_verify_ssl', True)) if not url or not user or not password: return jsonify({'success': False, 'error': 'URL, username and password are required'}), 400 import requests as http_requests # Test WebDAV connection webdav_url = url.rstrip('/') + '/remote.php/dav/files/' + user response = http_requests.request( 'PROPFIND', webdav_url, auth=(user, password), headers={'Depth': '0'}, timeout=10, verify=verify_ssl ) if response.status_code in [200, 207]: return jsonify({'success': True, 'message': 'Connection successful', 'url': url.rstrip('/')}) else: return jsonify({'success': False, 'error': f'Connection failed: HTTP {response.status_code}'}), 400 except Exception as e: return jsonify({'success': False, 'error': f'Connection failed: {str(e)}'}), 400 @app.route('/api/nextcloud/browse', methods=['POST']) def nextcloud_browse(): """Browse Nextcloud files via WebDAV.""" try: data = request.get_json() if not data: return jsonify({'error': 'JSON body is required'}), 400 url = data.get('url') or _settings.get('nextcloud_url') user = data.get('user') or _settings.get('nextcloud_user') password = data.get('password') or _settings.get('nextcloud_password') verify_ssl = data.get('verify_ssl', _settings.get('nextcloud_verify_ssl', True)) path = data.get('path', '/') if not url or not user or not password: return jsonify({'error': 'Nextcloud credentials not configured'}), 400 import requests as http_requests import xml.etree.ElementTree as ET from urllib.parse import quote, unquote # Build WebDAV URL properly base = url.rstrip('/') clean_path = path if path.startswith('/') else '/' + path webdav_url = base + '/remote.php/dav/files/' + user + clean_path response = http_requests.request( 'PROPFIND', webdav_url, auth=(user, password), headers={'Depth': '1'}, timeout=15, verify=verify_ssl ) if response.status_code not in [200, 207]: return jsonify({'error': f'Failed to browse: HTTP {response.status_code}'}), 400 # Parse WebDAV XML response files = [] try: ns = {'d': 'DAV:'} root = ET.fromstring(response.text) # The current directory path for comparison current_href = '/remote.php/dav/files/' + user + clean_path for resp in root.findall('.//d:response', ns): href = resp.find('d:href', ns) if href is None: continue href_text = href.text # URL-decode the href decoded_href = unquote(href_text) # Skip the current directory itself normalized_href = decoded_href.rstrip('/') normalized_current = current_href.rstrip('/') if normalized_href == normalized_current or normalized_href == normalized_current + '/': continue propstat = resp.find('d:propstat', ns) prop = propstat.find('d:prop', ns) if propstat is not None else None if prop is not None: resourcetype = prop.find('d:resourcetype', ns) is_collection = resourcetype is not None and resourcetype.find('d:collection', ns) is not None content_length = prop.find('d:getcontentlength', ns) content_type = prop.find('d:getcontenttype', ns) last_modified = prop.find('d:getlastmodified', ns) # Extract the path relative to the user's files directory # href looks like: /remote.php/dav/files/admin/path/to/file user_files_prefix = '/remote.php/dav/files/' + user if decoded_href.startswith(user_files_prefix): relative_path = decoded_href[len(user_files_prefix):] if not relative_path.startswith('/'): relative_path = '/' + relative_path else: relative_path = decoded_href # Get display name from path display_name = relative_path.rstrip('/').split('/')[-1] if not display_name: display_name = 'Root' if is_collection: files.append({ 'type': 'folder', 'name': display_name, 'path': relative_path, 'size': None, 'modified': None }) else: files.append({ 'type': 'file', 'name': display_name, 'path': relative_path, 'size': int(content_length.text) if content_length is not None and content_length.text else None, 'content_type': content_type.text if content_type is not None and content_type.text else None, 'modified': last_modified.text if last_modified is not None and last_modified.text else None }) except ET.ParseError: pass # Sort: folders first, then by name files.sort(key=lambda f: (f['type'] != 'folder', f['name'].lower())) return jsonify({'success': True, 'path': path, 'files': files}) except Exception as e: return jsonify({'error': f'Browse error: {str(e)}'}), 500 @app.route('/api/nextcloud/download', methods=['POST']) def nextcloud_download(): """Download a file from Nextcloud and process it.""" try: data = request.get_json() if not data: return jsonify({'error': 'JSON body is required'}), 400 url = data.get('url') or _settings.get('nextcloud_url') user = data.get('user') or _settings.get('nextcloud_user') password = data.get('password') or _settings.get('nextcloud_password') verify_ssl = data.get('verify_ssl', _settings.get('nextcloud_verify_ssl', True)) file_path = data.get('path') output_format = data.get('output_format', 'markdown') processing_mode = data.get('processing_mode', 'cloud') api_key = data.get('api_key') or _settings.get('api_key') if not url or not user or not password or not file_path: return jsonify({'error': 'Missing required parameters'}), 400 import requests as http_requests # Build WebDAV URL properly base = url.rstrip('/') clean_path = file_path if file_path.startswith('/') else '/' + file_path webdav_url = base + '/remote.php/dav/files/' + user + clean_path response = http_requests.get( webdav_url, auth=(user, password), timeout=60, stream=True, verify=verify_ssl ) if response.status_code != 200: return jsonify({'error': f'Failed to download file: HTTP {response.status_code}'}), 400 # Get file name from path file_name = file_path.rstrip('/').split('/')[-1] file_bytes = response.content # Create extractor try: extractor = create_extractor_with_mode(processing_mode, api_key) except ValueError as e: return jsonify({'error': str(e)}), 400 # Save to temp file and process suffix = Path(file_name).suffix or '.pdf' with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: tmp_file.write(file_bytes) tmp_path = tmp_file.name try: result = extractor.extract(tmp_path) if output_format == 'markdown': content = result.extract_markdown() elif output_format == 'html': content = result.extract_html() elif output_format == 'json': content = result.extract_data() content = json.dumps(content, indent=2) elif output_format == 'csv': content = result.extract_csv(include_all_tables=True) elif output_format == 'text': content = result.extract_text() else: content = result.extract_markdown() return jsonify({ 'success': True, 'content': content, 'format': output_format, 'metadata': { 'file_name': file_name, 'file_path': file_path, 'file_size': len(file_bytes), 'pages_processed': getattr(result, 'pages_processed', 1), 'processing_time': getattr(result, 'processing_time', 0), 'processing_mode': processing_mode } }) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except Exception as e: return jsonify({'error': f'Download/processing error: {str(e)}'}), 500 # ===== BATCH PROCESSING & PREVIEW ENDPOINTS ===== # Extraction history storage (in-memory for now) _extraction_history = [] @app.route('/api/preview-file', methods=['POST']) def preview_file(): """Preview uploaded file metadata and basic info without full extraction.""" try: if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Save temporarily to get file info with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file: file.save(tmp_file.name) tmp_path = tmp_file.name try: # Get file metadata file_size = os.path.getsize(tmp_path) file_type = Path(file.filename).suffix.lower() # Get basic file info preview_data = { 'file_name': file.filename, 'file_size': file_size, 'file_size_human': format_file_size(file_size), 'file_type': file_type, 'mime_type': file.content_type, 'preview_url': None, 'is_previewable': False } # Generate preview for supported types if file_type in ['.pdf']: preview_data['is_previewable'] = True preview_data['preview_type'] = 'pdf' elif file_type in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: preview_data['is_previewable'] = True preview_data['preview_type'] = 'image' elif file_type in ['.txt', '.md', '.csv']: # Read first 1KB for text preview with open(tmp_path, 'r', encoding='utf-8', errors='ignore') as f: preview_data['text_preview'] = f.read(1000) preview_data['is_previewable'] = True preview_data['preview_type'] = 'text' return jsonify({ 'success': True, 'preview': preview_data }) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except Exception as e: return jsonify({'error': f'Preview error: {str(e)}'}), 500 @app.route('/api/batch-extract', methods=['POST']) def batch_extract(): """Extract multiple files with progress tracking.""" try: files = request.files.getlist('files') if not files or len(files) == 0: return jsonify({'error': 'No files provided'}), 400 output_format = request.form.get('output_format', 'markdown') processing_mode = request.form.get('processing_mode', 'cloud') api_key = request.form.get('api_key') or _settings.get('api_key') # Create extractor try: extractor = create_extractor_with_mode(processing_mode, api_key) except ValueError as e: return jsonify({'error': str(e)}), 400 results = [] total_files = len([f for f in files if f.filename]) processed = 0 for file in files: if not file.filename: continue processed += 1 try: with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file: file.save(tmp_file.name) tmp_path = tmp_file.name try: result = extractor.extract(tmp_path) # Get content in requested format if output_format == 'markdown': content = result.extract_markdown() elif output_format == 'html': content = result.extract_html() elif output_format == 'json': content = result.extract_data() elif output_format == 'text': content = result.extract_text() else: content = result.extract_markdown() file_result = { 'file_name': file.filename, 'status': 'success', 'content': content, 'metadata': { 'file_size': os.path.getsize(tmp_path), 'processing_time': getattr(result, 'processing_time', 0) } } results.append(file_result) # Add to history _extraction_history.append({ 'timestamp': __import__('datetime').datetime.now().isoformat(), 'file_name': file.filename, 'status': 'success', 'format': output_format }) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except Exception as e: results.append({ 'file_name': file.filename, 'status': 'error', 'error': str(e) }) return jsonify({ 'success': True, 'total': total_files, 'processed': processed, 'results': results }) except Exception as e: return jsonify({'error': f'Batch extraction error: {str(e)}'}), 500 @app.route('/api/extraction-history', methods=['GET']) def get_extraction_history(): """Get extraction history.""" return jsonify({ 'success': True, 'history': _extraction_history, 'total': len(_extraction_history) }) @app.route('/api/export-result', methods=['POST']) def export_result(): """Export extraction result in different formats.""" try: data = request.json content = data.get('content') export_format = data.get('format', 'markdown') file_name = data.get('file_name', 'document') if not content: return jsonify({'error': 'No content provided'}), 400 # Create response with appropriate headers from flask import Response if export_format == 'markdown': return Response( content if isinstance(content, str) else json.dumps(content, indent=2), mimetype='text/markdown', headers={'Content-Disposition': f'attachment; filename={file_name}.md'} ) elif export_format == 'html': return Response( content if isinstance(content, str) else json.dumps(content, indent=2), mimetype='text/html', headers={'Content-Disposition': f'attachment; filename={file_name}.html'} ) elif export_format == 'json': return Response( json.dumps(content, indent=2) if isinstance(content, dict) else content, mimetype='application/json', headers={'Content-Disposition': f'attachment; filename={file_name}.json'} ) elif export_format == 'csv': return Response( content if isinstance(content, str) else json.dumps(content, indent=2), mimetype='text/csv', headers={'Content-Disposition': f'attachment; filename={file_name}.csv'} ) elif export_format == 'text': return Response( content if isinstance(content, str) else json.dumps(content, indent=2), mimetype='text/plain', headers={'Content-Disposition': f'attachment; filename={file_name}.txt'} ) else: return jsonify({'error': f'Unsupported export format: {export_format}'}), 400 except Exception as e: return jsonify({'error': f'Export error: {str(e)}'}), 500 @app.route('/api/api-usage', methods=['GET']) def get_api_usage(): """Get API usage statistics for cloud mode.""" # This would integrate with actual API usage tracking # For now, return placeholder data return jsonify({ 'success': True, 'usage': { 'calls_today': 0, 'calls_this_month': 0, 'limit_per_month': 10000, 'remaining': 10000, 'reset_date': 'end of month' } }) def format_file_size(size_bytes): """Format file size in human-readable format.""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] import math i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return f"{s} {size_names[i]}" def run_web_app(host='0.0.0.0', port=8000, debug=False): """Run the web application.""" # Check GPU availability before starting the server print("Checking GPU availability...") gpu_available = check_gpu_availability() if gpu_available: print("GPU detected - proceeding with model download...") print("Downloading models before starting the web interface...") download_models() else: print("GPU not available - starting in cloud mode only") print("To enable GPU, install PyTorch with CUDA: pip install torch --index-url https://download.pytorch.org/whl/cu118") print(f"Starting docstrange web interface at http://{host}:{port}") print("Press Ctrl+C to stop the server") app.run(host=host, port=port, debug=debug) if __name__ == '__main__': run_web_app(debug=True)