import gradio as gr import pandas as pd from datasets import load_dataset, Dataset from fuzzywuzzy import process from rdkit import Chem from rdkit.Chem import AllChem, Draw import io import tempfile import base64 import os from google import genai from google.genai import types from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image from svglib.svglib import svg2rlg from huggingface_hub import HfApi # Load dataset dataset = load_dataset("smitathkr1/organic_reactions_corrected") df = dataset['train'].to_pandas() # Global variable to store the last AI fix for potential database update last_ai_fix = None # Precompute unique values for autocomplete reaction_names = df['corrected_name'].unique().tolist() all_reactants = [] all_products = [] for _, row in df.iterrows(): if pd.notna(row['general_reactants']): all_reactants.append(row['general_reactants']) if pd.notna(row['general_products']): all_products.append(row['general_products']) unique_reactants = list(set(all_reactants)) unique_products = list(set(all_products)) def generate_reaction_svg(name): if not name: return "Please provide a reaction name." # Find the reaction result = df[df['corrected_name'].str.lower() == name.lower()] if not result.empty: row = result.iloc[0] reactants_smiles = '.'.join(row['reactants_smiles']) products_smiles = '.'.join(row['products_smiles']) reaction_smiles = f"{reactants_smiles}>>{products_smiles}" try: rxn = AllChem.ReactionFromSmarts(reaction_smiles) if rxn: svg = Draw.ReactionToImage(rxn, useSVG=True) return svg else: return "Failed to parse reaction SMILES." except Exception as e: return f"Error generating SVG: {str(e)}" return "Reaction not found." def generate_all_reactions_pdf(): # Create temporary file temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') temp_file.close() doc = SimpleDocTemplate(temp_file.name, pagesize=letter) styles = getSampleStyleSheet() story = [] # Title title_style = styles['Title'] story.append(Paragraph("Organic Reactions Database", title_style)) story.append(Spacer(1, 12)) for idx, row in df.iterrows(): # Reaction header reaction_title = f"Reaction {idx+1}: {row['corrected_name']}" story.append(Paragraph(reaction_title, styles['Heading2'])) # Generate SVG for this reaction try: reactants_smiles = '.'.join([s for s in row['reactants_smiles'] if s is not None]) products_smiles = '.'.join([s for s in row['products_smiles'] if s is not None]) if reactants_smiles and products_smiles: reaction_smiles = f"{reactants_smiles}>>{products_smiles}" rxn = AllChem.ReactionFromSmarts(reaction_smiles) if rxn: svg_content = Draw.ReactionToImage(rxn, useSVG=True) # Save SVG to temp file svg_temp = tempfile.NamedTemporaryFile(delete=False, suffix='.svg') svg_temp.write(svg_content.encode('utf-8')) svg_temp.close() # Convert SVG to ReportLab drawing drawing = svg2rlg(svg_temp.name) if drawing: # Scale the drawing to fit drawing.width = 400 drawing.height = 150 drawing.scale(0.8, 0.8) story.append(drawing) story.append(Spacer(1, 12)) # Clean up temp file os.unlink(svg_temp.name) except Exception as e: # If SVG generation fails, just continue pass # Handle potential None values in SMILES reactants_smiles = [s for s in row['reactants_smiles'] if s is not None] reagents_smiles = [s for s in row['reagents_smiles'] if s is not None] products_smiles = [s for s in row['products_smiles'] if s is not None] # Content reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' reagents = row['general_reagents'] if pd.notna(row['general_reagents']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' content = [ f"Reactants: {reactants}", f"Reactants SMILES: {', '.join(reactants_smiles) if reactants_smiles else 'None'}", f"Reagents: {reagents}", f"Reagents SMILES: {', '.join(reagents_smiles) if reagents_smiles else 'None'}", f"Products: {products}", f"Products SMILES: {', '.join(products_smiles) if products_smiles else 'None'}" ] for item in content: story.append(Paragraph(item, styles['Normal'])) story.append(Spacer(1, 12)) doc.build(story) return temp_file.name def search_by_reaction_name(query): if not query: return "Please enter a reaction name." # Exact match first result = df[df['corrected_name'].str.lower() == query.lower()] if not result.empty: row = result.iloc[0] reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' reagents = row['general_reagents'] if pd.notna(row['general_reagents']) else 'N/A' return f"**{row['corrected_name']}**\n\n**Reactants:** {reactants}\n\n**Reagents:** {reagents}\n\n**Products:** {products}\n\n**Description:** {row['description'][:500]}..." # Fuzzy match matches = process.extract(query, reaction_names, limit=1) if matches and matches[0][1] > 80: best_match = matches[0][0] result = df[df['corrected_name'] == best_match] row = result.iloc[0] reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' reagents = row['general_reagents'] if pd.notna(row['general_reagents']) else 'N/A' return f"**{row['corrected_name']}** (closest match)\n\n**Reactants:** {reactants}\n\n**Reagents:** {reagents}\n\n**Products:** {products}\n\n**Description:** {row['description'][:500]}..." return "No matching reaction found." def search_by_reactant(reactant): if not reactant: return "Please enter a reactant." matches = df[df['general_reactants'].str.lower().str.contains(reactant.lower(), na=False)] if not matches.empty: results = [] for _, row in matches.head(5).iterrows(): reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' results.append(f"**{row['corrected_name']}**: {reactants} → {products}") return "\n\n".join(results) # Fuzzy match for autocorrection fuzzy_matches = process.extract(reactant, unique_reactants, limit=3) if fuzzy_matches and fuzzy_matches[0][1] > 70: closest = fuzzy_matches[0][0] matches = df[df['general_reactants'].str.lower().str.contains(closest.lower(), na=False)] if not matches.empty: results = [f"Did you mean '{closest}'?\n"] for _, row in matches.head(5).iterrows(): reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' results.append(f"**{row['corrected_name']}**: {reactants} → {products}") return "\n\n".join(results) return "No reactions found with that reactant." def search_by_product(product): if not product: return "Please enter a product." matches = df[df['general_products'].str.lower().str.contains(product.lower(), na=False)] if not matches.empty: results = [] for _, row in matches.head(5).iterrows(): reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' results.append(f"**{row['corrected_name']}**: {reactants} → {products}") return "\n\n".join(results) # Fuzzy match for autocorrection fuzzy_matches = process.extract(product, unique_products, limit=3) if fuzzy_matches and fuzzy_matches[0][1] > 70: closest = fuzzy_matches[0][0] matches = df[df['general_products'].str.lower().str.contains(closest.lower(), na=False)] if not matches.empty: results = [f"Did you mean '{closest}'?\n"] for _, row in matches.head(5).iterrows(): reactants = row['general_reactants'] if pd.notna(row['general_reactants']) else 'N/A' products = row['general_products'] if pd.notna(row['general_products']) else 'N/A' results.append(f"**{row['corrected_name']}**: {reactants} → {products}") return "\n\n".join(results) return "No reactions found with that product." def get_autocomplete_reactions(query): if not query: return reaction_names[:10] matches = process.extract(query, reaction_names, limit=10) return [m[0] for m in matches if m[1] > 60] def fix_reaction_with_gemini(reaction_name, api_key): if not api_key: return "Please provide a Gemini API key." try: # Find the reaction row index result = df[df['corrected_name'].str.lower() == reaction_name.lower()] if result.empty: return f"❌ Reaction '{reaction_name}' not found in database." row_index = result.index[0] client = genai.Client(api_key=api_key) prompt = f"""Please provide detailed information about the organic reaction named "{reaction_name}". Include the correct reaction name, reactants, reagents, products, byproducts, reaction conditions, mechanism, and description. Make sure to provide accurate chemical information.""" contents = [ types.Content( role="user", parts=[types.Part.from_text(text=prompt)], ), ] generate_content_config = types.GenerateContentConfig( thinking_config=types.ThinkingConfig(thinking_budget=-1), response_mime_type="application/json", response_schema=genai.types.Schema( type=genai.types.Type.OBJECT, required=["reaction name", "reactants", "reagents", "products", "byproducts", "conditions", "mechanism", "description"], properties={ "reaction name": genai.types.Schema(type=genai.types.Type.STRING), "reactants": genai.types.Schema( type=genai.types.Type.ARRAY, items=genai.types.Schema(type=genai.types.Type.STRING), ), "reagents": genai.types.Schema( type=genai.types.Type.ARRAY, items=genai.types.Schema(type=genai.types.Type.STRING), ), "products": genai.types.Schema( type=genai.types.Type.ARRAY, items=genai.types.Schema(type=genai.types.Type.STRING), ), "byproducts": genai.types.Schema( type=genai.types.Type.ARRAY, items=genai.types.Schema(type=genai.types.Type.STRING), ), "conditions": genai.types.Schema(type=genai.types.Type.STRING), "mechanism": genai.types.Schema(type=genai.types.Type.STRING), "description": genai.types.Schema(type=genai.types.Type.STRING), }, ), ) response_text = "" for chunk in client.models.generate_content_stream( model="gemini-2.5-pro", contents=contents, config=generate_content_config, ): response_text += chunk.text # Parse the JSON response import json gemini_data = json.loads(response_text) # Store the updated data globally for potential database update global last_ai_fix last_ai_fix = { 'reaction_name': reaction_name, 'row_index': row_index, 'updated_data': gemini_data, 'timestamp': str(pd.Timestamp.now()) } return f"✅ **AI Fix Completed for '{reaction_name}'**\n\n**Updated Data:**\n- **Name:** {gemini_data.get('reaction name', 'N/A')}\n- **Reactants:** {', '.join(gemini_data.get('reactants', []))}\n- **Reagents:** {', '.join(gemini_data.get('reagents', []))}\n- **Products:** {', '.join(gemini_data.get('products', []))}\n- **Description:** {gemini_data.get('description', '')[:200]}...\n\n💡 **To save this fix to the database, enter the admin password below and click 'Update Database'.**" except Exception as e: return f"❌ Error calling Gemini API: {str(e)}" def update_database_with_ai_fix(password): if password != "Vvn@#411037": return "❌ Incorrect password. Database update denied." global last_ai_fix if not last_ai_fix: return "❌ No recent AI fix to save. Please fix a reaction first." try: # Update the global df using the stored row index global df idx = last_ai_fix['row_index'] # Verify the row still exists if idx not in df.index: return f"❌ Row index {idx} not found in database. The data may have been modified." # Store original values for logging original_name = df.at[idx, 'corrected_name'] # Update the dataframe df.at[idx, 'corrected_name'] = last_ai_fix['updated_data'].get('reaction name', last_ai_fix['reaction_name']) df.at[idx, 'general_reactants'] = ', '.join(last_ai_fix['updated_data'].get('reactants', [])) df.at[idx, 'general_reagents'] = ', '.join(last_ai_fix['updated_data'].get('reagents', [])) df.at[idx, 'general_products'] = ', '.join(last_ai_fix['updated_data'].get('products', [])) df.at[idx, 'description'] = last_ai_fix['updated_data'].get('description', df.at[idx, 'description']) # Try to push to Hugging Face hf_token = os.getenv('trial1') if hf_token: try: # Convert back to Hugging Face dataset updated_dataset = Dataset.from_pandas(df) # Initialize HF API api = HfApi() # Push to Hugging Face updated_dataset.push_to_hub( "smitathkr1/organic_reactions_corrected", token=hf_token, commit_message=f"AI fix: Updated reaction '{original_name}' -> '{df.at[idx, 'corrected_name']}'" ) push_success = True except Exception as push_error: push_success = False push_error_msg = str(push_error) else: push_success = False push_error_msg = "trial1 not found in environment variables" # Log the update log_entry = f"[{last_ai_fix['timestamp']}] Database updated: '{original_name}' -> '{df.at[idx, 'corrected_name']}' | HF Push: {'Success' if push_success else 'Failed: ' + push_error_msg}\n" with open('database_updates.log', 'a') as f: f.write(log_entry) # Update the global reaction_names list in case the name changed global reaction_names reaction_names = df['corrected_name'].unique().tolist() # Clear the last fix last_ai_fix = None success_msg = "✅ **Database Updated Successfully!**\n\n" if push_success: success_msg += "The reaction has been permanently updated on Hugging Face and is now live!\n\n" else: success_msg += "The reaction has been updated in the current session.\n" success_msg += f"**Note:** Could not push to Hugging Face: {push_error_msg}\n" success_msg += "Please check that trial1 is set in space secrets.\n\n" success_msg += "Changes logged to 'database_updates.log'." return success_msg except Exception as e: return f"❌ Error updating database: {str(e)}" def get_autocomplete_reactants(query): if not query: return unique_reactants[:10] matches = process.extract(query, unique_reactants, limit=10) return [m[0] for m in matches if m[1] > 60] def get_autocomplete_products(query): if not query: return unique_products[:10] matches = process.extract(query, unique_products, limit=10) return [m[0] for m in matches if m[1] > 60] with gr.Blocks(title="Organic Reactions Search") as demo: gr.Markdown("# Organic Reactions Search API") gr.Markdown("Search through the organic reactions dataset by name, reactant, or product.") with gr.Tab("Search by Reaction Name"): reaction_input = gr.Dropdown(label="Reaction Name", choices=reaction_names, allow_custom_value=True) reaction_output = gr.Markdown(label="Result") reaction_btn = gr.Button("Search") reaction_btn.click(search_by_reaction_name, inputs=reaction_input, outputs=reaction_output) with gr.Tab("Search by Reactant"): reactant_input = gr.Dropdown(label="Reactant", choices=unique_reactants, allow_custom_value=True) reactant_output = gr.Markdown(label="Results") reactant_btn = gr.Button("Search") reactant_btn.click(search_by_reactant, inputs=reactant_input, outputs=reactant_output) with gr.Tab("View Reaction SVG"): svg_input = gr.Dropdown(label="Reaction Name", choices=reaction_names, allow_custom_value=True) svg_output = gr.HTML(label="Reaction SVG") svg_btn = gr.Button("Generate SVG") svg_btn.click(generate_reaction_svg, inputs=svg_input, outputs=svg_output) with gr.Tab("Search by Product"): product_input = gr.Dropdown(label="Product", choices=unique_products, allow_custom_value=True) product_output = gr.Markdown(label="Results") product_btn = gr.Button("Search") product_btn.click(search_by_product, inputs=product_input, outputs=product_output) with gr.Tab("Download All Reactions PDF"): gr.Markdown("Download a comprehensive PDF containing all 828 reactions with their names, reactants, reagents, products, and SMILES strings.") pdf_btn = gr.Button("Generate and Download PDF") pdf_output = gr.File(label="Download PDF") pdf_btn.click(generate_all_reactions_pdf, outputs=pdf_output) with gr.Tab("View All Reactions (Table)"): gr.Markdown("Browse all 828 reactions in a tabular format. Use the AI Fix section below to improve reaction data.") # AI Fix section with gr.Row(): api_key_input = gr.Textbox(label="Gemini API Key", type="password", placeholder="Enter your Gemini API key") reaction_to_fix = gr.Dropdown(label="Select Reaction to Fix", choices=reaction_names) fix_button = gr.Button("Fix with AI") ai_status = gr.Markdown(label="AI Fix Status") fix_button.click(fix_reaction_with_gemini, inputs=[reaction_to_fix, api_key_input], outputs=ai_status) # Database update section gr.Markdown("---") gr.Markdown("**Database Update (Admin Only):**") with gr.Row(): admin_password = gr.Textbox(label="Admin Password", type="password", placeholder="Enter admin password to update database") update_db_button = gr.Button("Update Database", variant="secondary") update_status = gr.Markdown(label="Update Status") update_db_button.click(update_database_with_ai_fix, inputs=[admin_password], outputs=update_status) gr.Markdown("---") gr.Markdown("**Database Table:**") # Create HTML table (read-only for browsing) def create_reactions_table(): html = """
| Reaction Name | Reactants | Reactants SMILES | Reagents | Reagents SMILES | Products | Products SMILES | Description |
|---|---|---|---|---|---|---|---|
| {reaction_name} | {reactants} | {reactants_smiles} | {reagents} | {reagents_smiles} | {products} | {products_smiles} | {description} |