| """Web application for docstrange document extraction.""" |
|
|
| import os |
| import sys |
| import json |
| import tempfile |
| import importlib.metadata |
| from pathlib import Path |
| from typing import Optional |
| from flask import Flask, request, jsonify, render_template, send_from_directory |
| from werkzeug.utils import secure_filename |
| from werkzeug.exceptions import RequestEntityTooLarge |
|
|
| from .extractor import DocumentExtractor |
| from .exceptions import ConversionError, UnsupportedFormatError, FileNotFoundError |
|
|
| app = Flask(__name__) |
| app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 |
|
|
| |
| _settings = { |
| 'api_key': None, |
| 'nextcloud_url': None, |
| 'nextcloud_user': None, |
| 'nextcloud_password': None, |
| 'nextcloud_verify_ssl': True |
| } |
|
|
| |
| import urllib3 |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
| def check_gpu_availability(): |
| """Check if GPU is available for processing.""" |
| try: |
| import torch |
| return torch.cuda.is_available() |
| except ImportError: |
| return False |
|
|
| def get_gpu_name(): |
| """Get the name of the available GPU.""" |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| return torch.cuda.get_device_name(0) |
| except ImportError: |
| pass |
| return None |
|
|
| def download_models(): |
| """Download models synchronously before starting the app.""" |
| print("Starting model download...") |
|
|
| |
| gpu_available = check_gpu_availability() |
|
|
| if gpu_available: |
| print("GPU detected - downloading GPU models") |
| |
| extractor = DocumentExtractor(gpu=True) |
| else: |
| print("GPU not available - using cloud processing") |
| |
| extractor = DocumentExtractor() |
|
|
| |
| print("Downloading models...") |
|
|
| |
| test_content = "Test document for model download." |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as tmp_file: |
| tmp_file.write(test_content) |
| test_file_path = tmp_file.name |
|
|
| try: |
| |
| result = extractor.extract(test_file_path) |
| print("Model download completed successfully") |
| except Exception as e: |
| print(f"Model download warning: {e}") |
| |
| finally: |
| |
| if os.path.exists(test_file_path): |
| os.unlink(test_file_path) |
|
|
| def create_extractor_with_mode(processing_mode, api_key=None): |
| """Create DocumentExtractor with proper error handling for processing mode.""" |
| if processing_mode == 'gpu': |
| if not check_gpu_availability(): |
| raise ValueError("GPU mode selected but GPU is not available. Please install PyTorch with CUDA support.") |
| return DocumentExtractor(gpu=True, api_key=api_key or _settings.get('api_key')) |
| else: |
| return DocumentExtractor(api_key=api_key or _settings.get('api_key')) |
|
|
| |
| extractor = DocumentExtractor() |
|
|
| @app.route('/') |
| def index(): |
| """Serve the main page.""" |
| return render_template('index.html') |
|
|
| @app.route('/static/<path:filename>') |
| def static_files(filename): |
| """Serve static files.""" |
| return send_from_directory('static', filename) |
|
|
| @app.route('/api/extract', methods=['POST']) |
| def extract_document(): |
| """API endpoint for document extraction with multi-format support.""" |
| try: |
| |
| if 'file' not in request.files: |
| return jsonify({'error': 'No file provided'}), 400 |
|
|
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({'error': 'No file selected'}), 400 |
|
|
| |
| output_format = request.form.get('output_format', 'markdown') |
| processing_mode = request.form.get('processing_mode', 'cloud') |
| api_key = request.form.get('api_key') or _settings.get('api_key') |
| return_all_formats = request.form.get('all_formats', 'false') == 'true' |
|
|
| |
| try: |
| extractor = create_extractor_with_mode(processing_mode, api_key) |
| except ValueError as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file: |
| file.save(tmp_file.name) |
| tmp_path = tmp_file.name |
|
|
| try: |
| |
| result = extractor.extract(tmp_path) |
|
|
| |
| if return_all_formats: |
| content = { |
| 'markdown': result.extract_markdown(), |
| 'html': result.extract_html(), |
| 'json': result.extract_data(), |
| 'text': result.extract_text() |
| } |
| |
| try: |
| content['csv'] = result.extract_csv(include_all_tables=True) |
| except Exception: |
| content['csv'] = None |
| else: |
| |
| if output_format == 'markdown': |
| content = result.extract_markdown() |
| elif output_format == 'html': |
| content = result.extract_html() |
| elif output_format == 'json': |
| content = result.extract_data() |
| elif output_format == 'csv': |
| try: |
| content = result.extract_csv(include_all_tables=True) |
| except Exception as e: |
| content = f"CSV extraction failed: {str(e)}" |
| elif output_format == 'flat-json': |
| content = result.extract_data() |
| elif output_format == 'text': |
| content = result.extract_text() |
| else: |
| content = result.extract_markdown() |
|
|
| |
| metadata = { |
| 'file_type': Path(file.filename).suffix.lower(), |
| 'file_name': file.filename, |
| 'file_size': os.path.getsize(tmp_path), |
| 'pages_processed': getattr(result, 'pages_processed', 1), |
| 'processing_time': getattr(result, 'processing_time', 0), |
| 'output_format': output_format, |
| 'processing_mode': processing_mode, |
| 'tables_found': len(getattr(result, 'tables', [])), |
| 'images_found': len(getattr(result, 'images', [])) |
| } |
|
|
| return jsonify({ |
| 'success': True, |
| 'content': content, |
| 'metadata': metadata |
| }) |
|
|
| finally: |
| |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| except RequestEntityTooLarge: |
| return jsonify({'error': 'File too large. Maximum size is 100MB.'}), 413 |
| except UnsupportedFormatError as e: |
| return jsonify({'error': f'Unsupported file format: {str(e)}'}), 400 |
| except ConversionError as e: |
| return jsonify({'error': f'Conversion error: {str(e)}'}), 500 |
| except Exception as e: |
| return jsonify({'error': f'Unexpected error: {str(e)}'}), 500 |
|
|
| @app.route('/api/supported-formats') |
| def get_supported_formats(): |
| """Get list of supported file formats.""" |
| formats = extractor.get_supported_formats() |
| return jsonify({'formats': formats}) |
|
|
| @app.route('/api/health') |
| def health_check(): |
| """Health check endpoint.""" |
| return jsonify({'status': 'healthy', 'version': '1.0.0'}) |
|
|
| @app.route('/api/system-info') |
| def get_system_info(): |
| """Get system information including GPU availability.""" |
| gpu_available = check_gpu_availability() |
| gpu_name = get_gpu_name() |
|
|
| |
| try: |
| ds_version = importlib.metadata.version('docstrange') |
| except Exception: |
| ds_version = '1.1.8' |
|
|
| |
| system_info = { |
| 'gpu_available': gpu_available, |
| 'gpu_name': gpu_name, |
| 'python_version': f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}', |
| 'docstrange_version': ds_version, |
| 'processing_modes': { |
| 'cloud': { |
| 'available': True, |
| 'description': 'Process using cloud API. Fast and requires no local setup.' |
| }, |
| 'gpu': { |
| 'available': gpu_available, |
| 'description': 'Process locally using GPU. Fastest local processing, requires CUDA.' if gpu_available else 'GPU not available. Install PyTorch with CUDA support.' |
| } |
| } |
| } |
|
|
| return jsonify(system_info) |
|
|
| @app.route('/api/settings/api-key', methods=['POST']) |
| def save_api_key(): |
| """Save API key to session settings.""" |
| data = request.get_json() |
| if not data or 'api_key' not in data: |
| return jsonify({'error': 'API key is required'}), 400 |
|
|
| api_key = data['api_key'].strip() |
| if not api_key: |
| return jsonify({'error': 'API key cannot be empty'}), 400 |
|
|
| _settings['api_key'] = api_key |
| return jsonify({'success': True, 'message': 'API key saved successfully'}) |
|
|
| @app.route('/api/settings/api-key', methods=['GET']) |
| def get_api_key_status(): |
| """Get API key status (does not return the key for security).""" |
| has_key = _settings.get('api_key') is not None |
| return jsonify({'has_api_key': has_key}) |
|
|
| @app.route('/api/settings/api-key', methods=['DELETE']) |
| def delete_api_key(): |
| """Delete saved API key.""" |
| _settings['api_key'] = None |
| return jsonify({'success': True, 'message': 'API key removed'}) |
|
|
| @app.route('/api/erpnext/extract', methods=['POST']) |
| def erpnext_extract(): |
| """ERPNext API endpoint for document extraction. |
| |
| Integrates with ERPNext by accepting file URLs or base64 content |
| and returning structured JSON/Markdown suitable for ERPNext doctypes. |
| |
| Request JSON: |
| { |
| "file_url": "https://example.com/invoice.pdf", # OR |
| "file_content": "base64_encoded_content", |
| "file_name": "invoice.pdf", |
| "output_format": "markdown|json|csv|html", |
| "processing_mode": "cloud|gpu", |
| "api_key": "optional_api_key" |
| } |
| """ |
| try: |
| data = request.get_json() |
| if not data: |
| return jsonify({'error': 'JSON body is required'}), 400 |
|
|
| file_url = data.get('file_url') |
| file_content = data.get('file_content') |
| file_name = data.get('file_name', 'document.pdf') |
| output_format = data.get('output_format', 'markdown') |
| processing_mode = data.get('processing_mode', 'cloud') |
| api_key = data.get('api_key') or _settings.get('api_key') |
|
|
| if not file_url and not file_content: |
| return jsonify({'error': 'Either file_url or file_content is required'}), 400 |
|
|
| |
| try: |
| extractor = create_extractor_with_mode(processing_mode, api_key) |
| except ValueError as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| |
| import base64 |
| import requests as http_requests |
|
|
| if file_content: |
| |
| try: |
| file_bytes = base64.b64decode(file_content) |
| except Exception: |
| return jsonify({'error': 'Invalid base64 content'}), 400 |
| elif file_url: |
| |
| try: |
| response = http_requests.get(file_url, timeout=60) |
| response.raise_for_status() |
| file_bytes = response.content |
| except Exception as e: |
| return jsonify({'error': f'Failed to download file: {str(e)}'}), 400 |
|
|
| |
| suffix = Path(file_name).suffix or '.pdf' |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: |
| tmp_file.write(file_bytes) |
| tmp_path = tmp_file.name |
|
|
| try: |
| |
| result = extractor.extract(tmp_path) |
|
|
| |
| if output_format == 'markdown': |
| content = result.extract_markdown() |
| elif output_format == 'html': |
| content = result.extract_html() |
| elif output_format == 'json': |
| content = result.extract_data() |
| elif output_format == 'csv': |
| content = result.extract_csv(include_all_tables=True) |
| elif output_format == 'text': |
| content = result.extract_text() |
| else: |
| content = result.extract_markdown() |
|
|
| |
| return jsonify({ |
| 'success': True, |
| 'data': content, |
| 'format': output_format, |
| 'metadata': { |
| 'file_name': file_name, |
| 'file_size': len(file_bytes), |
| 'pages_processed': getattr(result, 'pages_processed', 1), |
| 'processing_time': getattr(result, 'processing_time', 0), |
| 'processing_mode': processing_mode |
| } |
| }) |
|
|
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| except Exception as e: |
| return jsonify({'error': f'ERPNext extraction error: {str(e)}'}), 500 |
|
|
| @app.route('/api/nextcloud/settings', methods=['POST']) |
| def save_nextcloud_settings(): |
| """Save Nextcloud connection settings.""" |
| data = request.get_json() |
| if not data: |
| return jsonify({'error': 'JSON body is required'}), 400 |
| |
| _settings['nextcloud_url'] = data.get('url', '').rstrip('/') |
| _settings['nextcloud_user'] = data.get('user', '') |
| _settings['nextcloud_password'] = data.get('password', '') |
| _settings['nextcloud_verify_ssl'] = data.get('verify_ssl', True) |
| |
| return jsonify({'success': True, 'message': 'Nextcloud settings saved'}) |
|
|
| @app.route('/api/nextcloud/test', methods=['POST']) |
| def test_nextcloud_connection(): |
| """Test Nextcloud WebDAV connection.""" |
| try: |
| data = request.get_json() or {} |
| url = data.get('url') or _settings.get('nextcloud_url') |
| user = data.get('user') or _settings.get('nextcloud_user') |
| password = data.get('password') or _settings.get('nextcloud_password') |
| verify_ssl = data.get('verify_ssl', _settings.get('nextcloud_verify_ssl', True)) |
| |
| if not url or not user or not password: |
| return jsonify({'success': False, 'error': 'URL, username and password are required'}), 400 |
| |
| import requests as http_requests |
| |
| |
| webdav_url = url.rstrip('/') + '/remote.php/dav/files/' + user |
| response = http_requests.request( |
| 'PROPFIND', |
| webdav_url, |
| auth=(user, password), |
| headers={'Depth': '0'}, |
| timeout=10, |
| verify=verify_ssl |
| ) |
| |
| if response.status_code in [200, 207]: |
| return jsonify({'success': True, 'message': 'Connection successful', 'url': url.rstrip('/')}) |
| else: |
| return jsonify({'success': False, 'error': f'Connection failed: HTTP {response.status_code}'}), 400 |
| except Exception as e: |
| return jsonify({'success': False, 'error': f'Connection failed: {str(e)}'}), 400 |
|
|
| @app.route('/api/nextcloud/browse', methods=['POST']) |
| def nextcloud_browse(): |
| """Browse Nextcloud files via WebDAV.""" |
| try: |
| data = request.get_json() |
| if not data: |
| return jsonify({'error': 'JSON body is required'}), 400 |
| |
| url = data.get('url') or _settings.get('nextcloud_url') |
| user = data.get('user') or _settings.get('nextcloud_user') |
| password = data.get('password') or _settings.get('nextcloud_password') |
| verify_ssl = data.get('verify_ssl', _settings.get('nextcloud_verify_ssl', True)) |
| path = data.get('path', '/') |
| |
| if not url or not user or not password: |
| return jsonify({'error': 'Nextcloud credentials not configured'}), 400 |
| |
| import requests as http_requests |
| import xml.etree.ElementTree as ET |
| from urllib.parse import quote, unquote |
| |
| |
| base = url.rstrip('/') |
| clean_path = path if path.startswith('/') else '/' + path |
| webdav_url = base + '/remote.php/dav/files/' + user + clean_path |
| |
| response = http_requests.request( |
| 'PROPFIND', |
| webdav_url, |
| auth=(user, password), |
| headers={'Depth': '1'}, |
| timeout=15, |
| verify=verify_ssl |
| ) |
| |
| if response.status_code not in [200, 207]: |
| return jsonify({'error': f'Failed to browse: HTTP {response.status_code}'}), 400 |
| |
| |
| files = [] |
| try: |
| ns = {'d': 'DAV:'} |
| root = ET.fromstring(response.text) |
| |
| |
| current_href = '/remote.php/dav/files/' + user + clean_path |
| |
| for resp in root.findall('.//d:response', ns): |
| href = resp.find('d:href', ns) |
| if href is None: |
| continue |
| |
| href_text = href.text |
| |
| decoded_href = unquote(href_text) |
| |
| |
| normalized_href = decoded_href.rstrip('/') |
| normalized_current = current_href.rstrip('/') |
| if normalized_href == normalized_current or normalized_href == normalized_current + '/': |
| continue |
| |
| propstat = resp.find('d:propstat', ns) |
| prop = propstat.find('d:prop', ns) if propstat is not None else None |
| |
| if prop is not None: |
| resourcetype = prop.find('d:resourcetype', ns) |
| is_collection = resourcetype is not None and resourcetype.find('d:collection', ns) is not None |
| |
| content_length = prop.find('d:getcontentlength', ns) |
| content_type = prop.find('d:getcontenttype', ns) |
| last_modified = prop.find('d:getlastmodified', ns) |
| |
| |
| |
| user_files_prefix = '/remote.php/dav/files/' + user |
| if decoded_href.startswith(user_files_prefix): |
| relative_path = decoded_href[len(user_files_prefix):] |
| if not relative_path.startswith('/'): |
| relative_path = '/' + relative_path |
| else: |
| relative_path = decoded_href |
| |
| |
| display_name = relative_path.rstrip('/').split('/')[-1] |
| if not display_name: |
| display_name = 'Root' |
| |
| if is_collection: |
| files.append({ |
| 'type': 'folder', |
| 'name': display_name, |
| 'path': relative_path, |
| 'size': None, |
| 'modified': None |
| }) |
| else: |
| files.append({ |
| 'type': 'file', |
| 'name': display_name, |
| 'path': relative_path, |
| 'size': int(content_length.text) if content_length is not None and content_length.text else None, |
| 'content_type': content_type.text if content_type is not None and content_type.text else None, |
| 'modified': last_modified.text if last_modified is not None and last_modified.text else None |
| }) |
| except ET.ParseError: |
| pass |
| |
| |
| files.sort(key=lambda f: (f['type'] != 'folder', f['name'].lower())) |
| |
| return jsonify({'success': True, 'path': path, 'files': files}) |
| |
| except Exception as e: |
| return jsonify({'error': f'Browse error: {str(e)}'}), 500 |
|
|
| @app.route('/api/nextcloud/download', methods=['POST']) |
| def nextcloud_download(): |
| """Download a file from Nextcloud and process it.""" |
| try: |
| data = request.get_json() |
| if not data: |
| return jsonify({'error': 'JSON body is required'}), 400 |
| |
| url = data.get('url') or _settings.get('nextcloud_url') |
| user = data.get('user') or _settings.get('nextcloud_user') |
| password = data.get('password') or _settings.get('nextcloud_password') |
| verify_ssl = data.get('verify_ssl', _settings.get('nextcloud_verify_ssl', True)) |
| file_path = data.get('path') |
| output_format = data.get('output_format', 'markdown') |
| processing_mode = data.get('processing_mode', 'cloud') |
| api_key = data.get('api_key') or _settings.get('api_key') |
| |
| if not url or not user or not password or not file_path: |
| return jsonify({'error': 'Missing required parameters'}), 400 |
| |
| import requests as http_requests |
| |
| |
| base = url.rstrip('/') |
| clean_path = file_path if file_path.startswith('/') else '/' + file_path |
| webdav_url = base + '/remote.php/dav/files/' + user + clean_path |
| response = http_requests.get( |
| webdav_url, |
| auth=(user, password), |
| timeout=60, |
| stream=True, |
| verify=verify_ssl |
| ) |
| |
| if response.status_code != 200: |
| return jsonify({'error': f'Failed to download file: HTTP {response.status_code}'}), 400 |
| |
| |
| file_name = file_path.rstrip('/').split('/')[-1] |
| file_bytes = response.content |
| |
| |
| try: |
| extractor = create_extractor_with_mode(processing_mode, api_key) |
| except ValueError as e: |
| return jsonify({'error': str(e)}), 400 |
| |
| |
| suffix = Path(file_name).suffix or '.pdf' |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: |
| tmp_file.write(file_bytes) |
| tmp_path = tmp_file.name |
| |
| try: |
| result = extractor.extract(tmp_path) |
| |
| if output_format == 'markdown': |
| content = result.extract_markdown() |
| elif output_format == 'html': |
| content = result.extract_html() |
| elif output_format == 'json': |
| content = result.extract_data() |
| content = json.dumps(content, indent=2) |
| elif output_format == 'csv': |
| content = result.extract_csv(include_all_tables=True) |
| elif output_format == 'text': |
| content = result.extract_text() |
| else: |
| content = result.extract_markdown() |
| |
| return jsonify({ |
| 'success': True, |
| 'content': content, |
| 'format': output_format, |
| 'metadata': { |
| 'file_name': file_name, |
| 'file_path': file_path, |
| 'file_size': len(file_bytes), |
| 'pages_processed': getattr(result, 'pages_processed', 1), |
| 'processing_time': getattr(result, 'processing_time', 0), |
| 'processing_mode': processing_mode |
| } |
| }) |
| |
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
| |
| except Exception as e: |
| return jsonify({'error': f'Download/processing error: {str(e)}'}), 500 |
|
|
|
|
| |
|
|
| |
| _extraction_history = [] |
|
|
| @app.route('/api/preview-file', methods=['POST']) |
| def preview_file(): |
| """Preview uploaded file metadata and basic info without full extraction.""" |
| try: |
| if 'file' not in request.files: |
| return jsonify({'error': 'No file provided'}), 400 |
|
|
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({'error': 'No file selected'}), 400 |
|
|
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file: |
| file.save(tmp_file.name) |
| tmp_path = tmp_file.name |
|
|
| try: |
| |
| file_size = os.path.getsize(tmp_path) |
| file_type = Path(file.filename).suffix.lower() |
| |
| |
| preview_data = { |
| 'file_name': file.filename, |
| 'file_size': file_size, |
| 'file_size_human': format_file_size(file_size), |
| 'file_type': file_type, |
| 'mime_type': file.content_type, |
| 'preview_url': None, |
| 'is_previewable': False |
| } |
|
|
| |
| if file_type in ['.pdf']: |
| preview_data['is_previewable'] = True |
| preview_data['preview_type'] = 'pdf' |
| elif file_type in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: |
| preview_data['is_previewable'] = True |
| preview_data['preview_type'] = 'image' |
| elif file_type in ['.txt', '.md', '.csv']: |
| |
| with open(tmp_path, 'r', encoding='utf-8', errors='ignore') as f: |
| preview_data['text_preview'] = f.read(1000) |
| preview_data['is_previewable'] = True |
| preview_data['preview_type'] = 'text' |
|
|
| return jsonify({ |
| 'success': True, |
| 'preview': preview_data |
| }) |
|
|
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| except Exception as e: |
| return jsonify({'error': f'Preview error: {str(e)}'}), 500 |
|
|
|
|
| @app.route('/api/batch-extract', methods=['POST']) |
| def batch_extract(): |
| """Extract multiple files with progress tracking.""" |
| try: |
| files = request.files.getlist('files') |
| if not files or len(files) == 0: |
| return jsonify({'error': 'No files provided'}), 400 |
|
|
| output_format = request.form.get('output_format', 'markdown') |
| processing_mode = request.form.get('processing_mode', 'cloud') |
| api_key = request.form.get('api_key') or _settings.get('api_key') |
|
|
| |
| try: |
| extractor = create_extractor_with_mode(processing_mode, api_key) |
| except ValueError as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| results = [] |
| total_files = len([f for f in files if f.filename]) |
| processed = 0 |
|
|
| for file in files: |
| if not file.filename: |
| continue |
|
|
| processed += 1 |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file: |
| file.save(tmp_file.name) |
| tmp_path = tmp_file.name |
|
|
| try: |
| result = extractor.extract(tmp_path) |
| |
| |
| if output_format == 'markdown': |
| content = result.extract_markdown() |
| elif output_format == 'html': |
| content = result.extract_html() |
| elif output_format == 'json': |
| content = result.extract_data() |
| elif output_format == 'text': |
| content = result.extract_text() |
| else: |
| content = result.extract_markdown() |
|
|
| file_result = { |
| 'file_name': file.filename, |
| 'status': 'success', |
| 'content': content, |
| 'metadata': { |
| 'file_size': os.path.getsize(tmp_path), |
| 'processing_time': getattr(result, 'processing_time', 0) |
| } |
| } |
| results.append(file_result) |
|
|
| |
| _extraction_history.append({ |
| 'timestamp': __import__('datetime').datetime.now().isoformat(), |
| 'file_name': file.filename, |
| 'status': 'success', |
| 'format': output_format |
| }) |
|
|
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| except Exception as e: |
| results.append({ |
| 'file_name': file.filename, |
| 'status': 'error', |
| 'error': str(e) |
| }) |
|
|
| return jsonify({ |
| 'success': True, |
| 'total': total_files, |
| 'processed': processed, |
| 'results': results |
| }) |
|
|
| except Exception as e: |
| return jsonify({'error': f'Batch extraction error: {str(e)}'}), 500 |
|
|
|
|
| @app.route('/api/extraction-history', methods=['GET']) |
| def get_extraction_history(): |
| """Get extraction history.""" |
| return jsonify({ |
| 'success': True, |
| 'history': _extraction_history, |
| 'total': len(_extraction_history) |
| }) |
|
|
|
|
| @app.route('/api/export-result', methods=['POST']) |
| def export_result(): |
| """Export extraction result in different formats.""" |
| try: |
| data = request.json |
| content = data.get('content') |
| export_format = data.get('format', 'markdown') |
| file_name = data.get('file_name', 'document') |
|
|
| if not content: |
| return jsonify({'error': 'No content provided'}), 400 |
|
|
| |
| from flask import Response |
|
|
| if export_format == 'markdown': |
| return Response( |
| content if isinstance(content, str) else json.dumps(content, indent=2), |
| mimetype='text/markdown', |
| headers={'Content-Disposition': f'attachment; filename={file_name}.md'} |
| ) |
| elif export_format == 'html': |
| return Response( |
| content if isinstance(content, str) else json.dumps(content, indent=2), |
| mimetype='text/html', |
| headers={'Content-Disposition': f'attachment; filename={file_name}.html'} |
| ) |
| elif export_format == 'json': |
| return Response( |
| json.dumps(content, indent=2) if isinstance(content, dict) else content, |
| mimetype='application/json', |
| headers={'Content-Disposition': f'attachment; filename={file_name}.json'} |
| ) |
| elif export_format == 'csv': |
| return Response( |
| content if isinstance(content, str) else json.dumps(content, indent=2), |
| mimetype='text/csv', |
| headers={'Content-Disposition': f'attachment; filename={file_name}.csv'} |
| ) |
| elif export_format == 'text': |
| return Response( |
| content if isinstance(content, str) else json.dumps(content, indent=2), |
| mimetype='text/plain', |
| headers={'Content-Disposition': f'attachment; filename={file_name}.txt'} |
| ) |
| else: |
| return jsonify({'error': f'Unsupported export format: {export_format}'}), 400 |
|
|
| except Exception as e: |
| return jsonify({'error': f'Export error: {str(e)}'}), 500 |
|
|
|
|
| @app.route('/api/api-usage', methods=['GET']) |
| def get_api_usage(): |
| """Get API usage statistics for cloud mode.""" |
| |
| |
| return jsonify({ |
| 'success': True, |
| 'usage': { |
| 'calls_today': 0, |
| 'calls_this_month': 0, |
| 'limit_per_month': 10000, |
| 'remaining': 10000, |
| 'reset_date': 'end of month' |
| } |
| }) |
|
|
|
|
| def format_file_size(size_bytes): |
| """Format file size in human-readable format.""" |
| if size_bytes == 0: |
| return "0 B" |
| size_names = ["B", "KB", "MB", "GB", "TB"] |
| import math |
| i = int(math.floor(math.log(size_bytes, 1024))) |
| p = math.pow(1024, i) |
| s = round(size_bytes / p, 2) |
| return f"{s} {size_names[i]}" |
|
|
|
|
| def run_web_app(host='0.0.0.0', port=8000, debug=False): |
| """Run the web application.""" |
| |
| print("Checking GPU availability...") |
| gpu_available = check_gpu_availability() |
|
|
| if gpu_available: |
| print("GPU detected - proceeding with model download...") |
| print("Downloading models before starting the web interface...") |
| download_models() |
| else: |
| print("GPU not available - starting in cloud mode only") |
| print("To enable GPU, install PyTorch with CUDA: pip install torch --index-url https://download.pytorch.org/whl/cu118") |
|
|
| print(f"Starting docstrange web interface at http://{host}:{port}") |
| print("Press Ctrl+C to stop the server") |
| app.run(host=host, port=port, debug=debug) |
|
|
| if __name__ == '__main__': |
| run_web_app(debug=True) |