PRSHNTKUMR's picture
Update app.py
8304130 verified
"""
Complete Hugging Face Chemical Analyzer with MolScribe Integration
Production-ready system with Google Vision, Claude analysis, and N8N HTTP integration
"""
import gradio as gr
import json
import requests
import os
import io
import re
import tempfile
import base64
import concurrent.futures
import time
import threading
from typing import Dict, Optional, List, Tuple, Union, Any
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter
import numpy as np
import pandas as pd
from datetime import datetime
import logging
from functools import wraps
import traceback
import hashlib
# Enhanced logging setup for Hugging Face
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DependencyManager:
"""Manages optional dependencies with fallback support for Hugging Face"""
def __init__(self):
self.available_deps = {}
self.fallback_options = {}
self.detect_all_dependencies()
def detect_all_dependencies(self):
"""Detect all optional dependencies with HF-compatible fallbacks"""
# PDF Processing with dual support
self.available_deps['pdf_processing'] = False
try:
import fitz # PyMuPDF
self.available_deps['pymupdf'] = True
self.available_deps['pdf_processing'] = True
logger.info("βœ… PyMuPDF available")
except ImportError:
try:
import PyPDF2
self.available_deps['pypdf2'] = True
self.available_deps['pdf_processing'] = True
logger.info("βœ… PyPDF2 available as fallback")
except ImportError:
try:
import pdfplumber
self.available_deps['pdfplumber'] = True
self.available_deps['pdf_processing'] = True
logger.info("βœ… pdfplumber available as fallback")
except ImportError:
logger.warning("⚠️ No PDF processing libraries available")
# Google APIs
try:
from googleapiclient.discovery import build
from google.oauth2.service_account import Credentials
self.available_deps['google_drive'] = True
logger.info("βœ… Google Drive API available")
except ImportError:
self.available_deps['google_drive'] = False
logger.warning("⚠️ Google Drive API not available")
try:
from google.cloud import vision
self.available_deps['google_vision'] = True
logger.info("βœ… Google Vision API available")
except ImportError:
self.available_deps['google_vision'] = False
logger.warning("⚠️ Google Vision API not available")
# Chemical analysis tools
try:
from rdkit import Chem
from rdkit.Chem import Descriptors, Crippen, rdMolDescriptors
self.available_deps['rdkit'] = True
logger.info("βœ… RDKit available")
except ImportError:
self.available_deps['rdkit'] = False
logger.warning("⚠️ RDKit not available")
# MolScribe
try:
import molscribe
self.available_deps['molscribe'] = True
logger.info("βœ… MolScribe available")
except ImportError:
self.available_deps['molscribe'] = False
logger.warning("⚠️ MolScribe not available")
# Essential dependencies
try:
from PIL import Image
self.available_deps['pillow'] = True
except ImportError:
self.available_deps['pillow'] = False
logger.error("❌ Pillow required")
try:
import requests
self.available_deps['requests'] = True
except ImportError:
self.available_deps['requests'] = False
logger.error("❌ Requests required")
def is_available(self, dependency: str) -> bool:
return self.available_deps.get(dependency, False)
def get_status(self) -> Dict:
return self.available_deps.copy()
# Initialize dependency manager
deps = DependencyManager()
class HuggingFaceConfig:
"""Configuration manager for Hugging Face deployment"""
def __init__(self):
self.setup_environment()
def setup_environment(self):
"""Setup environment variables and secrets"""
# Hugging Face Spaces secrets
self.claude_api_key = os.getenv('CLAUDE_API_KEY', '')
self.google_credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON', '')
self.n8n_webhook_secret = os.getenv('N8N_WEBHOOK_SECRET', 'default_secret')
# Setup Google credentials if provided
if self.google_credentials:
try:
credentials_data = json.loads(self.google_credentials)
with open('/tmp/google_credentials.json', 'w') as f:
json.dump(credentials_data, f)
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/tmp/google_credentials.json'
logger.info("βœ… Google credentials configured from HF secrets")
except Exception as e:
logger.warning(f"⚠️ Google credentials setup failed: {e}")
def validate_api_key(self, api_key: str) -> bool:
"""Validate Claude API key format"""
return api_key and api_key.startswith('sk-ant-') and len(api_key) > 20
config = HuggingFaceConfig()
class GoogleVisionImageFilter:
"""Google Vision API integration with HF-compatible error handling"""
def __init__(self, debug_mode: bool = True):
self.available = deps.is_available('google_vision')
self.debug_mode = debug_mode
self.debug_log = []
self.client = None
self.setup_vision_client()
def log_debug(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
if self.debug_mode:
logger.info(f"[VISION] {formatted_message}")
self.debug_log.append(formatted_message)
def setup_vision_client(self):
"""Initialize Google Vision client for HF"""
if not self.available:
self.log_debug("❌ Google Vision API not available")
return
try:
from google.cloud import vision
# Check for credentials
creds_path = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS', '/tmp/google_credentials.json')
if os.path.exists(creds_path):
self.client = vision.ImageAnnotatorClient()
self.log_debug("βœ… Google Vision client initialized")
else:
self.log_debug(f"⚠️ Vision credentials not found")
self.available = False
except Exception as e:
self.log_debug(f"❌ Vision client setup failed: {e}")
self.available = False
def analyze_image_content(self, image: Image.Image) -> Dict:
"""Analyze image to determine if it contains chemical structures"""
if not self.available or not self.client:
# Fallback: assume chemical structure if Vision unavailable
return {
'is_chemical_structure': True,
'confidence': 0.5,
'content_type': 'unknown',
'error': 'Google Vision not available - processing anyway'
}
try:
# Convert PIL image to bytes
img_byte_arr = io.BytesIO()
if image.mode != 'RGB':
image = image.convert('RGB')
image.save(img_byte_arr, format='PNG')
img_byte_arr = img_byte_arr.getvalue()
from google.cloud import vision
vision_image = vision.Image(content=img_byte_arr)
# Perform multiple detection types
text_response = self.client.text_detection(image=vision_image)
texts = text_response.text_annotations
objects_response = self.client.object_localization(image=vision_image)
objects = objects_response.localized_object_annotations
label_response = self.client.label_detection(image=vision_image)
labels = label_response.label_annotations
# Analyze results
analysis = self._analyze_vision_results(texts, objects, labels, image.size)
self.log_debug(f"Vision analysis: {analysis['content_type']} (confidence: {analysis['confidence']:.2f})")
return analysis
except Exception as e:
self.log_debug(f"❌ Vision analysis failed: {e}")
return {
'is_chemical_structure': True, # Default to processing
'confidence': 0.5,
'content_type': 'unknown',
'error': str(e)
}
def _analyze_vision_results(self, texts, objects, labels, image_size) -> Dict:
"""Analyze Vision API results to classify content"""
width, height = image_size
image_area = width * height
text_score = 0.0
structure_score = 0.0
# Analyze text detection for chemical vs table content
if texts:
total_text_area = 0
chemical_keywords = 0
table_indicators = 0
for text in texts[1:]: # Skip first (full text)
vertices = text.bounding_poly.vertices
if len(vertices) >= 4:
text_width = abs(vertices[2].x - vertices[0].x)
text_height = abs(vertices[2].y - vertices[0].y)
total_text_area += text_width * text_height
text_content = text.description.lower()
# Chemical structure indicators
if any(chem in text_content for chem in [
'scheme', 'figure', 'compound', 'synthesis', 'reaction',
'mol', 'structure', 'formula'
]):
chemical_keywords += 1
# Table/data indicators
if any(table in text_content for table in [
'table', 'yield', '%', 'mp', 'melting', 'nmr', 'ir', 'ms',
'data', 'result', 'analysis'
]):
table_indicators += 1
text_density = total_text_area / image_area if image_area > 0 else 0
# High text density suggests tables/data
if text_density > 0.3:
text_score += 0.4
if table_indicators > chemical_keywords and table_indicators > 2:
text_score += 0.3
# Analyze object detection
for obj in objects:
obj_name = obj.name.lower()
if any(diagram_term in obj_name for diagram_term in [
'diagram', 'chart', 'figure', 'drawing', 'illustration'
]):
structure_score += 0.2
# Analyze labels
for label in labels:
label_name = label.description.lower()
confidence = label.score
# Chemical structure indicators
if any(chem_label in label_name for chem_label in [
'diagram', 'drawing', 'line art', 'figure', 'illustration',
'chemistry', 'molecule', 'formula'
]):
structure_score += confidence * 0.3
# Text/table indicators
if any(text_label in label_name for text_label in [
'text', 'document', 'table', 'data', 'spreadsheet'
]):
text_score += confidence * 0.2
# Image aspect ratio analysis
aspect_ratio = width / height if height > 0 else 1
if 0.3 <= aspect_ratio <= 3.0:
structure_score += 0.1
if aspect_ratio > 4.0 or aspect_ratio < 0.25:
text_score += 0.2
# Final classification
if structure_score > text_score:
is_chemical = True
confidence = min(structure_score, 0.9)
content_type = 'chemical_structure'
else:
is_chemical = False
confidence = min(text_score, 0.9)
content_type = 'text_or_table'
confidence = max(confidence, 0.1)
return {
'is_chemical_structure': is_chemical,
'confidence': confidence,
'content_type': content_type,
'text_score': text_score,
'structure_score': structure_score,
'analysis_details': {
'text_density': locals().get('text_density', 0),
'chemical_keywords': locals().get('chemical_keywords', 0),
'table_indicators': locals().get('table_indicators', 0),
'aspect_ratio': aspect_ratio
}
}
class MolScribeAnalyzer:
"""MolScribe-based chemical structure recognition optimized for HF"""
def __init__(self, debug_mode: bool = True):
self.available = deps.is_available('molscribe')
self.debug_mode = debug_mode
self.debug_log = []
self.stats = {
'total_predictions': 0,
'successful_predictions': 0,
'failed_predictions': 0,
'avg_processing_time': 0.0
}
self.setup_molscribe()
def log_debug(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
if self.debug_mode:
logger.info(f"[MOLSCRIBE] {formatted_message}")
self.debug_log.append(formatted_message)
def setup_molscribe(self):
"""Initialize and test MolScribe"""
if not self.available:
self.log_debug("❌ MolScribe not available")
return
try:
import molscribe
self.log_debug("βœ… MolScribe imported successfully")
# Test with simple image
test_image = Image.new('RGB', (200, 200), 'white')
draw = ImageDraw.Draw(test_image)
# Draw a simple benzene ring for testing
center_x, center_y = 100, 100
radius = 50
# Draw hexagon
points = []
for i in range(6):
angle = i * 60 * np.pi / 180
x = center_x + radius * np.cos(angle)
y = center_y + radius * np.sin(angle)
points.append((x, y))
for i in range(6):
start = points[i]
end = points[(i + 1) % 6]
draw.line([start, end], fill='black', width=3)
self.log_debug("Testing MolScribe with benzene structure...")
test_result = self._molscribe_predict(test_image, timeout_seconds=10)
if test_result.get('success'):
self.log_debug("βœ… MolScribe test successful")
else:
self.log_debug(f"⚠️ MolScribe test returned: {test_result}")
except Exception as e:
self.log_debug(f"❌ MolScribe setup failed: {e}")
self.available = False
def recognize_structure_with_timeout(self, image: Image.Image, timeout_seconds: int = 30) -> Dict:
"""Recognize structure with HF-compatible timeout"""
self.stats['total_predictions'] += 1
self.log_debug(f"Starting MolScribe recognition (attempt #{self.stats['total_predictions']})")
if not self.available:
self.stats['failed_predictions'] += 1
return {
'success': False,
'error': 'MolScribe not available. Install with: pip install MolScribe',
'method': 'MolScribe'
}
try:
# Preprocess image for optimal recognition
processed_image = self._preprocess_image(image)
start_time = time.time()
# Use concurrent.futures for timeout control
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self._molscribe_predict, processed_image, timeout_seconds)
try:
result = future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
self.stats['failed_predictions'] += 1
self.log_debug(f"❌ MolScribe timeout after {timeout_seconds} seconds")
return {
'success': False,
'error': f'MolScribe processing timeout ({timeout_seconds}s)',
'method': 'MolScribe'
}
processing_time = time.time() - start_time
if result.get('success'):
self.stats['successful_predictions'] += 1
result['processing_time'] = processing_time
self.log_debug(f"βœ… Structure recognized in {processing_time:.2f}s")
else:
self.stats['failed_predictions'] += 1
self.log_debug(f"❌ Recognition failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
self.stats['failed_predictions'] += 1
error_msg = f'MolScribe recognition failed: {str(e)}'
self.log_debug(f"❌ {error_msg}")
return {
'success': False,
'error': error_msg,
'method': 'MolScribe'
}
def _molscribe_predict(self, image: Image.Image, timeout_seconds: int) -> Dict:
"""Core MolScribe prediction with error handling"""
try:
import molscribe
from molscribe import MolScribe
from huggingface_hub import hf_hub_download
# Save image to temporary file
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file:
if image.mode != 'RGB':
image = image.convert('RGB')
image.save(tmp_file.name, 'PNG')
temp_path = tmp_file.name
try:
# Download model if needed
ckpt_path = hf_hub_download('yujieq/MolScribe', 'swin_base_char_aux_1m.pth')
model = MolScribe(ckpt_path)
# Call MolScribe
smiles_result = model.predict_image(temp_path)
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
# Validate result
if smiles_result and isinstance(smiles_result, str) and len(smiles_result.strip()) > 0:
smiles = smiles_result.strip()
# Check for garbage SMILES
if self._is_garbage_smiles(smiles):
return {
'success': False,
'error': 'MolScribe produced garbage SMILES (likely not a chemical structure)',
'raw_smiles': smiles[:100] + '...' if len(smiles) > 100 else smiles
}
# Validate with RDKit if available
is_valid = True
validation_error = None
canonical_smiles = smiles
if deps.is_available('rdkit'):
try:
from rdkit import Chem
mol = Chem.MolFromSmiles(smiles)
if mol is not None:
canonical_smiles = Chem.MolToSmiles(mol)
is_valid = True
else:
is_valid = False
validation_error = "Invalid SMILES structure"
except Exception as e:
is_valid = False
validation_error = str(e)
return {
'success': True,
'smiles': smiles,
'canonical_smiles': canonical_smiles,
'is_valid': is_valid,
'validation_error': validation_error,
'method': 'MolScribe'
}
else:
return {
'success': False,
'error': 'No structure detected by MolScribe',
'method': 'MolScribe'
}
finally:
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
except Exception as e:
return {
'success': False,
'error': f'MolScribe prediction failed: {str(e)}',
'method': 'MolScribe'
}
def _preprocess_image(self, image: Image.Image) -> Image.Image:
"""Enhanced image preprocessing for MolScribe"""
try:
if image.mode != 'RGB':
image = image.convert('RGB')
# Enhance contrast and sharpness
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(1.3)
enhancer = ImageEnhance.Sharpness(image)
image = enhancer.enhance(1.2)
# Resize intelligently for MolScribe
width, height = image.size
min_size = 200
max_size = 1024
if width < min_size or height < min_size:
scale_factor = min_size / min(width, height)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
elif width > max_size or height > max_size:
scale_factor = max_size / max(width, height)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
return image
except Exception as e:
logger.warning(f"Image preprocessing failed: {e}")
return image
def _is_garbage_smiles(self, smiles: str) -> bool:
"""Check if SMILES is garbage (repetitive patterns, too long, etc.)"""
# Check for excessively long SMILES
if len(smiles) > 1000:
return True
# Check for repetitive patterns
if any(pattern in smiles for pattern in [
'CC#CC#CC#CC#', 'CCCCCCCCCCCCCCCCCCCC',
'111111111', '222222222', '333333333'
]):
return True
# Check for too many dots (disconnected fragments)
if smiles.count('.') > 10:
return True
# Check for obvious errors
if smiles.count('(') != smiles.count(')'):
return True
if smiles.count('[') != smiles.count(']'):
return True
return False
class ClaudeDocumentAnalyzer:
"""Claude-powered document analysis for chemical literature"""
def __init__(self, debug_mode: bool = True):
self.debug_mode = debug_mode
self.debug_log = []
self.analysis_cache = {}
def log_debug(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
if self.debug_mode:
logger.info(f"[CLAUDE] {formatted_message}")
self.debug_log.append(formatted_message)
def analyze_document_structure(self, text_content: str, api_key: str) -> Dict:
"""Analyze document using Claude to identify chemical content locations"""
self.log_debug("Starting Claude document structure analysis...")
if not config.validate_api_key(api_key):
return {
'error': 'Valid Claude API key required (sk-ant-...)',
'regions_to_process': [],
'document_type': 'unknown'
}
try:
# Create cache key
cache_key = hashlib.md5(text_content[:1000].encode()).hexdigest()
if cache_key in self.analysis_cache:
self.log_debug("Using cached analysis")
return self.analysis_cache[cache_key]
# Create structured prompt
analysis_prompt = self._create_analysis_prompt(text_content)
headers = {
'Content-Type': 'application/json',
'x-api-key': api_key.strip(),
'anthropic-version': '2023-06-01'
}
payload = {
'model': 'claude-3-5-sonnet-20241022',
'max_tokens': 2000,
'messages': [{'role': 'user', 'content': analysis_prompt}]
}
self.log_debug("Sending document to Claude for analysis...")
response = requests.post(
'https://api.anthropic.com/v1/messages',
headers=headers,
json=payload,
timeout=60
)
if response.status_code == 200:
result = response.json()
analysis_text = result['content'][0]['text']
# Parse Claude's structured response
parsed_analysis = self._parse_claude_response(analysis_text)
# Cache the result
self.analysis_cache[cache_key] = {
'success': True,
'document_type': parsed_analysis['document_type'],
'structure_locations': parsed_analysis['structure_locations'],
'data_tables': parsed_analysis['data_tables'],
'chemical_entities': parsed_analysis['chemical_entities'],
'processing_priority': parsed_analysis['processing_priority'],
'claude_analysis': analysis_text,
'tokens_used': result.get('usage', {}).get('output_tokens', 'unknown')
}
self.log_debug(f"Claude identified {len(parsed_analysis['structure_locations'])} potential structure locations")
return self.analysis_cache[cache_key]
else:
error_msg = f'Claude API error {response.status_code}: {response.text[:200]}'
self.log_debug(f"Claude API failed: {error_msg}")
return {
'error': error_msg,
'regions_to_process': [],
'document_type': 'unknown'
}
except Exception as e:
error_msg = f'Claude analysis failed: {str(e)}'
self.log_debug(error_msg)
return {
'error': error_msg,
'regions_to_process': [],
'document_type': 'unknown'
}
def _create_analysis_prompt(self, text_content: str) -> str:
"""Create structured prompt for Claude document analysis"""
prompt = f"""Analyze this chemistry research document and provide a structured analysis to guide automated chemical structure recognition.
Document Text:
{text_content[:6000]}
Please provide a JSON response with the following structure:
{{
"document_type": "research_paper" | "review" | "patent" | "thesis" | "other",
"structure_locations": [
{{
"type": "reaction_scheme" | "individual_structure" | "mechanism",
"keywords": ["scheme", "figure", "compound"],
"page_likely": 1,
"description": "Brief description of what structure is expected",
"priority": "high" | "medium" | "low"
}}
],
"data_tables": [
{{
"type": "yields" | "properties" | "spectral_data" | "references",
"keywords": ["table", "yield", "melting point"],
"should_skip": true,
"page_likely": 2
}}
],
"chemical_entities": {{
"main_compounds": ["compound names found"],
"reagents": ["reagent names"],
"solvents": ["solvent names"],
"catalysts": ["catalyst names"]
}},
"processing_priority": [
"Focus on Scheme 1 - main reaction",
"Look for individual product structures",
"Skip data tables and references"
]
}}
Focus on identifying:
1. Actual chemical structure diagrams vs data tables
2. Reaction schemes vs individual compounds
3. Main synthetic routes vs supporting data
4. What should be processed vs what should be skipped
Respond only with valid JSON - no additional text."""
return prompt
def _parse_claude_response(self, response_text: str) -> Dict:
"""Parse Claude's JSON response with fallback parsing"""
try:
# Try to extract JSON from response
json_start = response_text.find('{')
json_end = response_text.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
json_str = response_text[json_start:json_end]
parsed = json.loads(json_str)
# Validate required fields
required_fields = ['document_type', 'structure_locations', 'data_tables', 'chemical_entities']
if all(field in parsed for field in required_fields):
return parsed
# Fallback parsing if JSON fails
self.log_debug("JSON parsing failed, using fallback text analysis")
return self._fallback_text_analysis(response_text)
except Exception as e:
self.log_debug(f"Response parsing failed: {e}, using fallback")
return self._fallback_text_analysis(response_text)
def _fallback_text_analysis(self, text: str) -> Dict:
"""Fallback analysis when JSON parsing fails"""
return {
'document_type': 'research_paper',
'structure_locations': [
{
'type': 'reaction_scheme',
'keywords': ['scheme', 'synthesis'],
'page_likely': 1,
'description': 'Main reaction scheme',
'priority': 'high'
}
],
'data_tables': [
{
'type': 'yields',
'keywords': ['table', 'yield', 'melting'],
'should_skip': True,
'page_likely': 2
}
],
'chemical_entities': {
'main_compounds': [],
'reagents': [],
'solvents': [],
'catalysts': []
},
'processing_priority': ['Focus on chemical structure diagrams', 'Skip data tables']
}
class GoogleDriveManager:
"""Google Drive integration for Hugging Face with service_account.json"""
def __init__(self):
self.service = None
self.setup_service()
def setup_service(self):
"""Initialize Google Drive service using service_account.json"""
if not deps.is_available('google_drive'):
logger.warning("Google Drive libraries not available")
return
try:
from googleapiclient.discovery import build
from google.oauth2.service_account import Credentials
# Look for service_account.json in multiple locations
possible_paths = [
'service_account.json',
'/app/service_account.json',
'/tmp/service_account.json',
os.path.join(os.getcwd(), 'service_account.json')
]
creds_file = None
for path in possible_paths:
if os.path.exists(path):
creds_file = path
break
if creds_file:
credentials = Credentials.from_service_account_file(
creds_file,
scopes=['https://www.googleapis.com/auth/drive.readonly']
)
self.service = build('drive', 'v3', credentials=credentials)
logger.info(f"Google Drive service initialized using {creds_file}")
else:
# Try from environment variable (HF Spaces secret)
if config.google_credentials:
credentials = Credentials.from_service_account_info(
json.loads(config.google_credentials),
scopes=['https://www.googleapis.com/auth/drive.readonly']
)
self.service = build('drive', 'v3', credentials=credentials)
logger.info("Google Drive service initialized from HF secret")
else:
logger.warning("service_account.json not found in any expected location")
except Exception as e:
logger.error(f"Google Drive setup failed: {e}")
def download_file(self, file_id: str) -> Dict:
"""Download file from Google Drive"""
if not self.service:
return {'error': 'Google Drive service not available'}
try:
# Get file metadata
file_info = self.service.files().get(fileId=file_id).execute()
# Download file content
from googleapiclient.http import MediaIoBaseDownload
request = self.service.files().get_media(fileId=file_id)
file_content = io.BytesIO()
downloader = MediaIoBaseDownload(file_content, request)
done = False
while done is False:
status, done = downloader.next_chunk()
file_content.seek(0)
logger.info(f"Downloaded file: {file_info['name']} ({len(file_content.getvalue())} bytes)")
return {
'success': True,
'file_name': file_info['name'],
'file_size': len(file_content.getvalue()),
'content': file_content.getvalue(),
'metadata': file_info
}
except Exception as e:
error_msg = f'Download failed: {str(e)}'
logger.error(error_msg)
return {'error': error_msg}
class PDFProcessor:
"""PDF processor with dual support (PyMuPDF/PyPDF2/pdfplumber)"""
def __init__(self, vision_filter, debug_mode: bool = True):
self.vision_filter = vision_filter
self.debug_mode = debug_mode
self.debug_log = []
self.available_processors = self._detect_pdf_processors()
def _detect_pdf_processors(self):
"""Detect available PDF processing libraries"""
processors = []
if deps.is_available('pymupdf'):
processors.append('pymupdf')
if deps.is_available('pypdf2'):
processors.append('pypdf2')
if deps.is_available('pdfplumber'):
processors.append('pdfplumber')
logger.info(f"Available PDF processors: {processors}")
return processors
def log_debug(self, message: str):
if self.debug_mode:
logger.info(f"[PDF] {message}")
self.debug_log.append(message)
def extract_content_with_vision_filtering(self, pdf_bytes: bytes, claude_analysis: Dict = None) -> Dict:
"""Extract content using best available PDF processor + Vision filtering"""
if not self.available_processors:
return {'error': 'No PDF processing libraries available'}
self.debug_log = []
self.log_debug("Starting Vision-enhanced PDF content extraction")
# Try processors in order of preference
for processor in ['pymupdf', 'pypdf2', 'pdfplumber']:
if processor in self.available_processors:
try:
if processor == 'pymupdf':
return self._extract_with_pymupdf(pdf_bytes, claude_analysis)
elif processor == 'pypdf2':
return self._extract_with_pypdf2(pdf_bytes, claude_analysis)
elif processor == 'pdfplumber':
return self._extract_with_pdfplumber(pdf_bytes, claude_analysis)
except Exception as e:
self.log_debug(f"{processor} failed: {e}, trying next processor")
continue
return {'error': 'All PDF processors failed'}
def _extract_with_pymupdf(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict:
"""Extract using PyMuPDF (preferred method)"""
import fitz
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
self.log_debug(f"Opened PDF with PyMuPDF: {len(doc)} pages")
results = {
'text_content': '',
'images': [],
'page_count': len(doc),
'extraction_stats': {},
'debug_info': [],
'processor_used': 'pymupdf'
}
all_text = []
total_images = 0
vision_filtered = 0
for page_num in range(min(len(doc), 10)): # Limit to 10 pages for HF
page = doc[page_num]
self.log_debug(f"Processing page {page_num + 1}")
# Extract text
page_text = page.get_text()
all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}")
# Find image regions
regions = self._identify_regions_pymupdf(page, page_text, page_num, claude_analysis)
for region_idx, region in enumerate(regions[:3]): # Limit to 3 per page
try:
# Render region
mat = fitz.Matrix(2.0, 2.0) # 2x zoom
clip = fitz.Rect(region['bbox'])
if clip.width > 50 and clip.height > 50:
pix = page.get_pixmap(matrix=mat, clip=clip)
img_data = pix.tobytes("png")
pil_image = Image.open(io.BytesIO(img_data))
# Vision API filtering
if self.vision_filter.available:
vision_analysis = self.vision_filter.analyze_image_content(pil_image)
if not vision_analysis['is_chemical_structure']:
vision_filtered += 1
self.log_debug(f"Vision filtered out region {region_idx}: {vision_analysis['content_type']}")
continue
results['images'].append({
'page': page_num + 1,
'index': region_idx,
'size': pil_image.size,
'image': pil_image,
'filename': f"page_{page_num+1}_region_{region_idx+1}.png",
'type': region.get('type', 'structure'),
'description': region.get('description', 'Chemical structure'),
'vision_analysis': vision_analysis if self.vision_filter.available else None
})
total_images += 1
pix = None # Free memory
except Exception as e:
self.log_debug(f"Error processing region {region_idx}: {e}")
results['text_content'] = '\n'.join(all_text)
results['extraction_stats'] = {
'pages_processed': len(doc),
'total_regions': total_images + vision_filtered,
'vision_approved': total_images,
'vision_filtered': vision_filtered,
'text_length': len(results['text_content'])
}
results['debug_info'] = self.debug_log.copy()
doc.close()
return results
def _extract_with_pypdf2(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict:
"""Extract using PyPDF2 (fallback method)"""
import PyPDF2
pdf_file = io.BytesIO(pdf_bytes)
pdf_reader = PyPDF2.PdfReader(pdf_file)
self.log_debug(f"Opened PDF with PyPDF2: {len(pdf_reader.pages)} pages")
# Extract text only (no images with PyPDF2)
all_text = []
for page_num, page in enumerate(pdf_reader.pages[:10]):
page_text = page.extract_text()
all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}")
return {
'text_content': '\n'.join(all_text),
'images': [], # PyPDF2 doesn't extract images well
'page_count': len(pdf_reader.pages),
'extraction_stats': {
'pages_processed': len(pdf_reader.pages),
'text_length': len('\n'.join(all_text)),
'images_extracted': 0
},
'processor_used': 'pypdf2',
'note': 'PyPDF2 used - text only, no image extraction'
}
def _extract_with_pdfplumber(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict:
"""Extract using pdfplumber (alternative fallback)"""
import pdfplumber
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
self.log_debug(f"Opened PDF with pdfplumber: {len(pdf.pages)} pages")
all_text = []
for page_num, page in enumerate(pdf.pages[:10]):
page_text = page.extract_text() or ""
all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}")
return {
'text_content': '\n'.join(all_text),
'images': [], # pdfplumber focuses on text
'page_count': len(pdf.pages),
'extraction_stats': {
'pages_processed': len(pdf.pages),
'text_length': len('\n'.join(all_text)),
'images_extracted': 0
},
'processor_used': 'pdfplumber',
'note': 'pdfplumber used - text extraction only'
}
def _identify_regions_pymupdf(self, page, page_text: str, page_num: int, claude_analysis: Dict) -> List[Dict]:
"""Identify regions using PyMuPDF"""
regions = []
try:
# Get text blocks
blocks = page.get_text("dict")
for block in blocks.get("blocks", []):
if "lines" in block:
block_text = ""
bbox = block.get("bbox", [0, 0, 0, 0])
for line in block["lines"]:
for span in line.get("spans", []):
block_text += span.get("text", "") + " "
block_text_lower = block_text.lower()
# Look for chemical structure indicators
chemical_keywords = ['scheme', 'figure', 'compound', 'structure', 'reaction']
if any(keyword in block_text_lower for keyword in chemical_keywords):
# Expand bbox for potential structure
expanded_bbox = [
max(0, bbox[0] - 100),
max(0, bbox[1] - 150),
min(page.rect.width, bbox[2] + 100),
min(page.rect.height, bbox[3] + 300)
]
regions.append({
'bbox': expanded_bbox,
'text': block_text.strip(),
'type': 'structure',
'description': 'Potential chemical structure',
'confidence': 0.7
})
except Exception as e:
self.log_debug(f"Error identifying regions: {e}")
# Add center region if no specific regions found
if not regions:
page_width = page.rect.width
page_height = page.rect.height
regions.append({
'bbox': [page_width * 0.1, page_height * 0.2, page_width * 0.9, page_height * 0.7],
'text': 'center_focus',
'type': 'structure',
'description': 'Center focus area',
'confidence': 0.5
})
return regions
class RDKitAnalyzer:
"""RDKit molecular analysis with HF compatibility"""
def __init__(self):
self.available = deps.is_available('rdkit')
def analyze_molecule(self, smiles: str) -> Dict:
"""Comprehensive molecular analysis using RDKit"""
if not self.available:
return {'error': 'RDKit not available'}
try:
from rdkit import Chem
from rdkit.Chem import Descriptors, Crippen, rdMolDescriptors
mol = Chem.MolFromSmiles(smiles)
if not mol:
return {'error': f'Invalid SMILES: {smiles}'}
properties = {
'smiles_input': smiles,
'canonical_smiles': Chem.MolToSmiles(mol),
'molecular_formula': rdMolDescriptors.CalcMolFormula(mol),
'molecular_weight': round(Descriptors.MolWt(mol), 2),
'exact_mass': round(Descriptors.ExactMolWt(mol), 4),
'logp': round(Crippen.MolLogP(mol), 2),
'tpsa': round(Descriptors.TPSA(mol), 2),
'hbd': Descriptors.NumHDonors(mol),
'hba': Descriptors.NumHAcceptors(mol),
'heavy_atoms': Descriptors.HeavyAtomCount(mol),
'rotatable_bonds': Descriptors.NumRotatableBonds(mol),
'aromatic_rings': Descriptors.NumAromaticRings(mol),
'ring_count': Descriptors.RingCount(mol),
'formal_charge': Chem.rdmolops.GetFormalCharge(mol),
}
# Drug-likeness assessment
mw = Descriptors.MolWt(mol)
logp = Crippen.MolLogP(mol)
hbd = Descriptors.NumHDonors(mol)
hba = Descriptors.NumHAcceptors(mol)
lipinski_violations = sum([mw > 500, logp > 5, hbd > 5, hba > 10])
properties.update({
'lipinski_violations': lipinski_violations,
'lipinski_compliant': lipinski_violations <= 1,
'drug_likeness': 'High' if lipinski_violations <= 1 else 'Medium' if lipinski_violations <= 2 else 'Low'
})
return properties
except Exception as e:
return {'error': f'RDKit analysis failed: {str(e)}'}
class N8NAPIHandler:
"""N8N HTTP node integration for automated workflows"""
def __init__(self):
self.processing_status = {}
self.request_counter = 0
def validate_request(self, request_data: Dict) -> Tuple[bool, str]:
"""Validate N8N HTTP request"""
try:
# Check for required fields
if 'file_id' not in request_data:
return False, "Missing required field: file_id"
if 'claude_api_key' not in request_data:
return False, "Missing required field: claude_api_key"
# Validate Claude API key format
if not config.validate_api_key(request_data['claude_api_key']):
return False, "Invalid Claude API key format"
return True, "Valid request"
except Exception as e:
return False, f"Request validation error: {str(e)}"
def process_n8n_request(self, request_data: Dict) -> Dict:
"""Process N8N HTTP request and return JSON response"""
try:
# Generate request ID
self.request_counter += 1
request_id = f"n8n_req_{self.request_counter}_{int(time.time())}"
# Validate request
is_valid, validation_message = self.validate_request(request_data)
if not is_valid:
return {
'success': False,
'request_id': request_id,
'error': validation_message,
'timestamp': datetime.now().isoformat()
}
file_id = request_data['file_id']
claude_api_key = request_data['claude_api_key']
doc_name = request_data.get('doc_name', 'N8N Document')
workflow_id = request_data.get('workflow_id', request_id)
# Set processing status
self.processing_status[request_id] = {
'status': 'processing',
'start_time': datetime.now().isoformat(),
'file_id': file_id,
'doc_name': doc_name,
'workflow_id': workflow_id
}
# Run the complete pipeline
result = self._run_complete_pipeline(file_id, claude_api_key, doc_name, request_id)
# Update status
self.processing_status[request_id].update({
'status': 'completed' if result.get('status') == 'success' else 'failed',
'end_time': datetime.now().isoformat(),
'result': result
})
# Return N8N-friendly response
if result.get('status') == 'success':
return {
'success': True,
'request_id': request_id,
'workflow_id': workflow_id,
'data': result,
'processing_time': self._calculate_processing_time(request_id),
'timestamp': datetime.now().isoformat()
}
else:
return {
'success': False,
'request_id': request_id,
'workflow_id': workflow_id,
'error': result.get('error', 'Unknown processing error'),
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'request_id': request_id if 'request_id' in locals() else 'unknown',
'error': f'Processing failed: {str(e)}',
'timestamp': datetime.now().isoformat()
}
def _calculate_processing_time(self, request_id: str) -> str:
"""Calculate processing time for a request"""
try:
status = self.processing_status.get(request_id, {})
start_time = status.get('start_time')
end_time = status.get('end_time')
if start_time and end_time:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00') if start_time.endswith('Z') else start_time)
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00') if end_time.endswith('Z') else end_time)
duration = (end_dt - start_dt).total_seconds()
return f"{duration:.2f} seconds"
return "Unknown"
except:
return "Unknown"
def _run_complete_pipeline(self, file_id: str, claude_api_key: str, doc_name: str, request_id: str) -> Dict:
"""Run the complete analysis pipeline"""
try:
# Step 1: Download from Google Drive
download_result = gdrive.download_file(file_id)
if 'error' in download_result:
return {
'status': 'error',
'step': 'download',
'error': f'Download failed: {download_result["error"]}'
}
# Step 2: Extract text for Claude analysis
pdf_content = pdf_processor.extract_content_with_vision_filtering(download_result['content'])
if 'error' in pdf_content:
return {
'status': 'error',
'step': 'text_extraction',
'error': f'Text extraction failed: {pdf_content["error"]}'
}
# Step 3: Claude document analysis
claude_analysis = claude_analyzer.analyze_document_structure(
pdf_content['text_content'],
claude_api_key.strip()
)
# Step 4: Vision-enhanced PDF processing
vision_enhanced_content = pdf_processor.extract_content_with_vision_filtering(
download_result['content'],
claude_analysis
)
# Step 5: MolScribe analysis on Vision-approved images
molscribe_results = []
if vision_enhanced_content['images']:
for img_data in vision_enhanced_content['images'][:5]: # Limit to 5 images
molscribe_result = molscribe_analyzer.recognize_structure_with_timeout(
img_data['image'],
timeout_seconds=30
)
molscribe_results.append({
'image_info': {
'page': img_data['page'],
'size': img_data['size'],
'type': img_data.get('type', 'structure'),
'description': img_data.get('description', 'Chemical structure'),
'vision_approved': True
},
'result': molscribe_result
})
# Step 6: RDKit analysis
rdkit_results = []
for molscribe_result in molscribe_results:
if molscribe_result['result'].get('success') and molscribe_result['result'].get('smiles'):
smiles = molscribe_result['result']['smiles']
rdkit_analysis = rdkit_analyzer.analyze_molecule(smiles)
rdkit_results.append({
'source': 'molscribe_vision_approved',
'smiles': smiles,
'analysis': rdkit_analysis,
'image_info': molscribe_result['image_info']
})
# Compile results
result = {
'status': 'success',
'request_id': request_id,
'file_info': {
'file_id': file_id,
'doc_name': doc_name,
'file_size_kb': round(download_result['file_size'] / 1024, 1)
},
'claude_analysis': {
'success': claude_analysis.get('success', False),
'document_type': claude_analysis.get('document_type', 'unknown'),
'structure_locations_identified': len(claude_analysis.get('structure_locations', [])),
'processing_guidance': claude_analysis.get('processing_priority', [])
},
'vision_enhanced_analysis': {
'pages': vision_enhanced_content['page_count'],
'vision_approved_regions': len(vision_enhanced_content['images']),
'processor_used': vision_enhanced_content.get('processor_used', 'unknown')
},
'molscribe_analysis': {
'images_processed': len(molscribe_results),
'structures_recognized': len([r for r in molscribe_results if r['result'].get('success')]),
'results': molscribe_results
},
'rdkit_analysis': {
'molecules_analyzed': len(rdkit_results),
'results': rdkit_results
},
'processing_summary': {
'total_structures_found': len([r for r in molscribe_results if r['result'].get('success')]),
'valid_molecules': len([r for r in rdkit_results if 'error' not in r['analysis']]),
'pipeline_components': ['Google Drive', 'Claude Analysis', 'Google Vision', 'MolScribe', 'RDKit']
}
}
return result
except Exception as e:
return {
'status': 'error',
'error': f'Pipeline processing failed: {str(e)}',
'request_id': request_id
}
# Initialize all components
gdrive = GoogleDriveManager()
claude_analyzer = ClaudeDocumentAnalyzer(debug_mode=True)
vision_filter = GoogleVisionImageFilter(debug_mode=True)
pdf_processor = PDFProcessor(vision_filter, debug_mode=True)
rdkit_analyzer = RDKitAnalyzer()
molscribe_analyzer = MolScribeAnalyzer(debug_mode=True)
n8n_handler = N8NAPIHandler()
# Core Functions for Gradio Interface
def test_system_status():
"""Test all system components"""
status = deps.get_status()
return {
'dependency_status': status,
'google_drive_service': 'connected' if gdrive.service else 'not connected',
'google_vision_client': 'connected' if vision_filter.available else 'not connected',
'pdf_processors': pdf_processor.available_processors,
'molscribe_ready': molscribe_analyzer.available,
'rdkit_ready': rdkit_analyzer.available,
'claude_analyzer_ready': True,
'n8n_api_ready': True,
'system_type': 'Hugging Face Chemical Analyzer with MolScribe',
'key_features': [
'Google Drive integration with service_account.json',
'Claude document structure analysis',
'Google Vision image/text filtering',
'MolScribe structure recognition',
'RDKit molecular analysis',
'N8N HTTP node integration',
'Dual dependency support'
]
}
def analyze_single_image_molscribe(image: Image.Image) -> str:
"""Analyze single image with Vision + MolScribe"""
if image is None:
return json.dumps({'status': 'error', 'error': 'No image provided'}, indent=2)
# Vision analysis first
vision_analysis = vision_filter.analyze_image_content(image)
result = {
'vision_analysis': vision_analysis,
'molscribe_processed': False,
'molscribe_result': None
}
# Process with MolScribe if Vision approves
if vision_analysis['is_chemical_structure']:
molscribe_result = molscribe_analyzer.recognize_structure_with_timeout(image, timeout_seconds=30)
result['molscribe_processed'] = True
result['molscribe_result'] = molscribe_result
# Add RDKit analysis if successful
if molscribe_result.get('success') and molscribe_result.get('smiles'):
rdkit_analysis = rdkit_analyzer.analyze_molecule(molscribe_result['smiles'])
result['rdkit_analysis'] = rdkit_analysis
else:
result['message'] = f"Vision API classified this as '{vision_analysis['content_type']}' - skipping MolScribe processing"
return json.dumps({
'status': 'success',
'analysis_result': result,
'molscribe_stats': molscribe_analyzer.stats
}, indent=2)
def process_complete_pipeline(file_id: str, claude_api_key: str) -> str:
"""Process complete pipeline for N8N or direct use"""
if not file_id:
return json.dumps({'status': 'error', 'error': 'File ID is required'}, indent=2)
if not config.validate_api_key(claude_api_key):
return json.dumps({'status': 'error', 'error': 'Valid Claude API key required'}, indent=2)
try:
# Create request data in N8N format
request_data = {
'file_id': file_id,
'claude_api_key': claude_api_key,
'doc_name': 'Direct Pipeline Request',
'workflow_id': f'direct_{int(time.time())}'
}
result = n8n_handler.process_n8n_request(request_data)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
'success': False,
'error': f'Pipeline processing failed: {str(e)}'
}, indent=2)
def process_pdf_direct(pdf_file, claude_api_key: str) -> str:
"""Process PDF file directly uploaded"""
if pdf_file is None:
return json.dumps({'status': 'error', 'error': 'No PDF file provided'}, indent=2)
if not config.validate_api_key(claude_api_key):
return json.dumps({'status': 'error', 'error': 'Valid Claude API key required'}, indent=2)
try:
# Read PDF bytes
pdf_bytes = pdf_file.read()
# Extract text for Claude analysis
pdf_content = pdf_processor.extract_content_with_vision_filtering(pdf_bytes)
if 'error' in pdf_content:
return json.dumps({
'status': 'error',
'step': 'text_extraction',
'error': f'Text extraction failed: {pdf_content["error"]}'
}, indent=2)
# Claude document analysis
claude_analysis = claude_analyzer.analyze_document_structure(
pdf_content['text_content'],
claude_api_key.strip()
)
# Vision-enhanced processing
vision_enhanced_content = pdf_processor.extract_content_with_vision_filtering(
pdf_bytes,
claude_analysis
)
# MolScribe analysis
molscribe_results = []
if vision_enhanced_content['images']:
for img_data in vision_enhanced_content['images'][:3]: # Limit for demo
molscribe_result = molscribe_analyzer.recognize_structure_with_timeout(
img_data['image'],
timeout_seconds=20
)
molscribe_results.append({
'image_info': {
'page': img_data['page'],
'size': img_data['size'],
'description': img_data.get('description', 'Chemical structure')
},
'result': molscribe_result
})
# RDKit analysis
rdkit_results = []
for molscribe_result in molscribe_results:
if molscribe_result['result'].get('success') and molscribe_result['result'].get('smiles'):
smiles = molscribe_result['result']['smiles']
rdkit_analysis = rdkit_analyzer.analyze_molecule(smiles)
rdkit_results.append({
'smiles': smiles,
'analysis': rdkit_analysis
})
result = {
'status': 'success',
'file_name': pdf_file.name,
'claude_analysis': {
'success': claude_analysis.get('success', False),
'document_type': claude_analysis.get('document_type', 'unknown')
},
'vision_analysis': {
'pages': vision_enhanced_content['page_count'],
'vision_approved_regions': len(vision_enhanced_content['images']),
'processor_used': vision_enhanced_content.get('processor_used', 'unknown')
},
'molscribe_analysis': {
'images_processed': len(molscribe_results),
'structures_recognized': len([r for r in molscribe_results if r['result'].get('success')]),
'results': molscribe_results
},
'rdkit_analysis': {
'molecules_analyzed': len(rdkit_results),
'results': rdkit_results
}
}
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
'status': 'error',
'error': f'PDF processing failed: {str(e)}'
}, indent=2)
def test_n8n_integration(file_id: str, claude_api_key: str, workflow_id: str = None) -> str:
"""Test N8N HTTP integration"""
if not file_id:
return json.dumps({
'success': False,
'error': 'File ID is required for testing'
}, indent=2)
if not config.validate_api_key(claude_api_key):
return json.dumps({
'success': False,
'error': 'Valid Claude API key required'
}, indent=2)
# Create N8N-style request
n8n_request = {
'file_id': file_id,
'claude_api_key': claude_api_key,
'doc_name': 'N8N Test Document',
'workflow_id': workflow_id or f'test_workflow_{int(time.time())}'
}
# Process through N8N handler
result = n8n_handler.process_n8n_request(n8n_request)
return json.dumps({
'n8n_integration_test': True,
'request_sent': n8n_request,
'response_received': result,
'test_status': 'success' if result.get('success') else 'failed'
}, indent=2)
# Create Gradio Interface
with gr.Blocks(title="Hugging Face Chemical Analyzer", theme=gr.themes.Soft()) as app:
gr.HTML("""
<h1 style='text-align: center; color: #2563eb;'>πŸ§ͺ Hugging Face Chemical Analyzer with MolScribe</h1>
<p style='text-align: center;'>Claude Analysis + Google Vision + MolScribe + RDKit + N8N HTTP Integration</p>
<p style='text-align: center; color: #28a745;'><strong>Production Ready:</strong> N8N HTTP nodes, Google Drive, Dual dependency support</p>
""")
with gr.Tabs():
# System Status Tab
with gr.TabItem("πŸ”§ System Status"):
gr.HTML("""
<h3>System Component Status</h3>
<p>Check all dependencies and integrations...</p>
""")
status_btn = gr.Button("πŸ”¬ Check System Status", variant="primary")
status_output = gr.Code(label="System Status", language="json")
status_btn.click(
lambda: json.dumps(test_system_status(), indent=2),
outputs=[status_output]
)
# Single Image Analysis
with gr.TabItem("πŸ‘οΈ Vision + MolScribe Analysis"):
gr.HTML("""
<h3>Single Image Analysis</h3>
<p>Upload an image: Vision API checks β†’ MolScribe processes β†’ RDKit analyzes</p>
""")
with gr.Row():
with gr.Column():
image_input = gr.Image(type="pil", label="Chemical Structure Image")
image_btn = gr.Button("πŸ” Analyze with MolScribe", variant="primary")
with gr.Column():
image_output = gr.Code(label="Analysis Results", language="json")
image_btn.click(
analyze_single_image_molscribe,
inputs=[image_input],
outputs=[image_output]
)
# Google Drive Pipeline
with gr.TabItem("πŸš€ Google Drive Pipeline"):
gr.HTML("""
<h3>Complete Google Drive Pipeline</h3>
<p>Full pipeline: Download β†’ Claude analyzes β†’ Vision filters β†’ MolScribe processes β†’ RDKit analyzes</p>
""")
with gr.Row():
with gr.Column():
drive_file_id = gr.Textbox(
label="Google Drive File ID",
placeholder="Enter Google Drive file ID",
info="Make sure file is shared or accessible by service account"
)
drive_claude_key = gr.Textbox(
label="Claude API Key",
type="password",
placeholder="sk-ant-...",
value=config.claude_api_key
)
drive_btn = gr.Button("πŸš€ Run Complete Pipeline", variant="primary")
with gr.Column():
drive_output = gr.Code(label="Pipeline Results", language="json")
drive_btn.click(
process_complete_pipeline,
inputs=[drive_file_id, drive_claude_key],
outputs=[drive_output]
)
# Direct PDF Upload
with gr.TabItem("πŸ“„ Direct PDF Analysis"):
gr.HTML("""
<h3>Direct PDF Upload & Analysis</h3>
<p>Upload PDF directly for immediate processing</p>
""")
with gr.Row():
with gr.Column():
pdf_input = gr.File(label="PDF File", file_types=[".pdf"])
pdf_claude_key = gr.Textbox(
label="Claude API Key",
type="password",
placeholder="sk-ant-...",
value=config.claude_api_key
)
pdf_btn = gr.Button("πŸ“‘ Analyze PDF", variant="primary")
with gr.Column():
pdf_output = gr.Code(label="PDF Analysis Results", language="json")
pdf_btn.click(
process_pdf_direct,
inputs=[pdf_input, pdf_claude_key],
outputs=[pdf_output]
)
# N8N HTTP Integration Testing
with gr.TabItem("πŸ”— N8N HTTP Integration"):
gr.HTML("""
<h3>N8N HTTP Node Integration</h3>
<p>Test N8N HTTP integration and get API documentation</p>
""")
with gr.Row():
with gr.Column():
n8n_file_id = gr.Textbox(
label="Google Drive File ID",
placeholder="Test file ID for N8N integration"
)
n8n_claude_key = gr.Textbox(
label="Claude API Key",
type="password",
placeholder="sk-ant-...",
value=config.claude_api_key
)
n8n_workflow_id = gr.Textbox(
label="Workflow ID (Optional)",
placeholder="test_workflow_123"
)
n8n_test_btn = gr.Button("πŸ§ͺ Test N8N Integration", variant="primary")
with gr.Column():
n8n_output = gr.Code(label="N8N Integration Test Results", language="json")
n8n_test_btn.click(
test_n8n_integration,
inputs=[n8n_file_id, n8n_claude_key, n8n_workflow_id],
outputs=[n8n_output]
)
gr.HTML("""
<h4>N8N HTTP Node Configuration:</h4>
<div style='background: #f8f9fa; padding: 15px; border-radius: 5px; margin: 10px 0;'>
<strong>Method:</strong> POST<br>
<strong>URL:</strong> <code>https://your-hf-space.hf.space/api/analyze</code><br>
<strong>Content-Type:</strong> application/json<br>
<strong>Body (JSON):</strong>
<pre>{
"file_id": "{{ $json.file_id }}",
"claude_api_key": "{{ $json.claude_api_key }}",
"doc_name": "{{ $json.doc_name }}",
"workflow_id": "{{ $json.workflow_id }}"
}</pre>
</div>
<h4>Available API Endpoints:</h4>
<ul>
<li><strong>POST /api/analyze</strong> - Main analysis endpoint</li>
<li><strong>GET /api/status/&lt;request_id&gt;</strong> - Check processing status</li>
<li><strong>GET /api/health</strong> - Health check</li>
<li><strong>POST /api/test</strong> - Test connectivity</li>
</ul>
<h4>N8N Workflow Example:</h4>
<ol>
<li><strong>HTTP Request Node:</strong> POST to /api/analyze with document data</li>
<li><strong>Wait Node (Optional):</strong> Brief pause for processing</li>
<li><strong>Code Node:</strong> Extract results from response.data</li>
<li><strong>Switch Node:</strong> Route based on success/failure</li>
<li><strong>Further Processing:</strong> Use extracted SMILES, molecular data, etc.</li>
</ol>
""")
# Status footer
gr.HTML(f"""
<div style='text-align: center; margin-top: 20px; color: #666; font-size: 0.9em;'>
<p><strong>Google Drive:</strong> {'βœ… Connected' if gdrive.service else '⚠️ Needs service_account.json'}</p>
<p><strong>Google Vision:</strong> {'βœ… Connected' if vision_filter.available else '⚠️ Needs credentials'}</p>
<p><strong>PDF Processing:</strong> {', '.join(pdf_processor.available_processors) if pdf_processor.available_processors else '❌ No processors'}</p>
<p><strong>MolScribe:</strong> {'βœ… Available' if molscribe_analyzer.available else '❌ Not Available'}</p>
<p><strong>RDKit:</strong> {'βœ… Available' if rdkit_analyzer.available else '❌ Not Available'}</p>
<p><strong>N8N HTTP API:</strong> βœ… Ready for HTTP nodes</p>
<p><strong>Deployment:</strong> βœ… Hugging Face Spaces compatible with N8N integration</p>
</div>
""")
if __name__ == "__main__":
# For Hugging Face Spaces - just run Gradio
# N8N can call the Gradio API directly
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)