""" Drug Stability & Compatibility Analysis Platform ================================================= Simplified LLM-Driven Architecture Features: - Smart routing: Auto-detect analysis type from input - Compatibility Analysis: SMILES + Excipient → ProfessionalAnalyzer - Stability Analysis: File + Goal → LLM-driven analysis - Universal Analysis: Natural language questions → LLM response - User Authentication: Email registration, admin dashboard Design Principles: - Low barrier: Users describe what they want, system figures out how - LLM-driven: LLM understands intent, chooses methods, generates reports - Preserve expertise: Professional pharmaceutical knowledge in prompts """ import os import sys import tempfile from pathlib import Path from datetime import datetime from typing import Optional, Tuple, List, Dict, Any import streamlit as st # Add project root to path PROJECT_ROOT = Path(__file__).parent sys.path.insert(0, str(PROJECT_ROOT)) # Import auth module try: from utils.auth import ( register_user, login_user, is_admin, get_all_users, get_default_llm_config, set_default_llm_config ) AUTH_AVAILABLE = True except ImportError: AUTH_AVAILABLE = False # ============================================================================= # Page Config # ============================================================================= st.set_page_config( page_title="Pharma K 药物制剂相容性与稳定性分析专家系统", page_icon="🧪", layout="wide", initial_sidebar_state="expanded" ) # ============================================================================= # Custom CSS (Nordic Minimalism) # ============================================================================= def load_css(): css_path = Path(__file__).parent / "assets" / "style.css" if css_path.exists(): with open(css_path, "r", encoding="utf-8") as f: st.markdown(f"", unsafe_allow_html=True) load_css() # ============================================================================= # Initialize Components (Lazy Loading) # ============================================================================= @st.cache_resource def get_model_invoker(): """Get or create ModelInvoker instance.""" from layers.model_invoker import ModelInvoker return ModelInvoker() @st.cache_resource def get_professional_analyzer(): """Get or create ProfessionalAnalyzer for compatibility analysis.""" try: from layers.professional_analyzer import ProfessionalAnalyzer return ProfessionalAnalyzer() except ImportError as e: st.warning(f"ProfessionalAnalyzer not available: {e}") return None @st.cache_resource def get_molecule_renderer(): """Get or create MoleculeRenderer.""" try: from utils.molecule_renderer import MoleculeRenderer return MoleculeRenderer() except ImportError: return None # ============================================================================= # Smart Router # ============================================================================= def detect_analysis_type( smiles: str, excipient: str, goal: str, files: List ) -> str: """ Automatically detect which analysis mode to use. Returns: "compatibility" | "stability" | "general" | "none" """ has_smiles = bool(smiles and smiles.strip()) has_excipient = bool(excipient and excipient.strip()) has_files = bool(files) has_goal = bool(goal and goal.strip()) # Compatibility: SMILES + Excipient if has_smiles and has_excipient: return "compatibility" # Stability: Files + Goal if has_files and has_goal: return "stability" # General: Just a question if has_goal and not has_files and not has_smiles: return "general" return "none" # ============================================================================= # Report Branding Wrapper # ============================================================================= def strip_outer_containers(content: str) -> str: """ Remove outer HTML containers from LLM output to avoid nested boxes. IMPORTANT: Be careful not to remove actual content! """ import re # Remove and wrappers if present content = re.sub(r']*>', '', content, flags=re.IGNORECASE) content = re.sub(r']*>', '', content, flags=re.IGNORECASE) content = re.sub(r'', '', content, flags=re.IGNORECASE) content = re.sub(r'.*?', '', content, flags=re.IGNORECASE | re.DOTALL) content = re.sub(r']*>', '', content, flags=re.IGNORECASE) content = re.sub(r'', '', content, flags=re.IGNORECASE) # Remove only specific header-style divs (short ones, not content-heavy) # Only match if the div is short (less than 200 chars including tags) def remove_short_header_divs(match): full_match = match.group(0) if len(full_match) < 300: # Only remove if it's a short header return '' return full_match # Keep longer content divs content = re.sub( r']*(?:background:\s*linear-gradient)[^>]*>.*?', remove_short_header_divs, content, flags=re.IGNORECASE | re.DOTALL ) return content.strip() def wrap_report_with_branding(content: str, report_type: str = "分析") -> str: """ Wrap report content with Pharma K branding header and footer. First strips any outer containers from the content. Uses Google Fonts for better Chinese character support in PDF. """ from datetime import datetime # Strip nested containers clean_content = strip_outer_containers(content) branded_html = f""" Pharma K - {report_type}报告
药物制剂相容性与稳定性分析专家系统
{report_type}报告
{datetime.now().strftime('%Y-%m-%d')}
{clean_content}
Pharma K Analysis System
AI 辅助生成 · 仅供科研参考
© {datetime.now().year} All Rights Reserved
""" return branded_html def generate_pdf_from_html(html_content: str) -> bytes: """ Generate PDF from HTML content. Uses pdfkit/wkhtmltopdf if available, otherwise returns None. Pre-processes HTML to show static charts instead of Plotly JavaScript charts. """ import re # Pre-process: For PDF, hide Plotly charts and show static images # Replace display:block with display:none for Plotly charts pdf_html = re.sub(r'class="chart-plotly"\s*style="display:block;"', 'class="chart-plotly" style="display:none;"', html_content) # Replace display:none with display:block for static charts pdf_html = re.sub(r'class="chart-static"\s*style="display:none;"', 'class="chart-static" style="display:block;"', pdf_html) # Remove Plotly JavaScript to reduce PDF size pdf_html = re.sub(r']*plotly[^>]*>.*?', '', pdf_html, flags=re.DOTALL | re.IGNORECASE) try: # Try using pdfkit (requires wkhtmltopdf installed) import pdfkit pdf_bytes = pdfkit.from_string(pdf_html, False, options={ 'encoding': 'UTF-8', 'page-size': 'A4', 'margin-top': '10mm', 'margin-bottom': '10mm', 'margin-left': '10mm', 'margin-right': '10mm', 'enable-local-file-access': None, }) return pdf_bytes except: pass try: # Try using weasyprint from weasyprint import HTML import io pdf_buffer = io.BytesIO() HTML(string=pdf_html).write_pdf(pdf_buffer) return pdf_buffer.getvalue() except: pass # If no PDF library available, return None return None # ============================================================================= # SVG Removal & Chart Data Parser # ============================================================================= def remove_llm_svg(html_content: str) -> str: """ Remove any SVG elements generated by LLM (they often render incorrectly). These should be replaced by CHART_DATA JSON which is then rendered by Plotly. """ import re # Remove ... blocks (LLM-generated charts) html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL | re.IGNORECASE) return html_content # Chart Data Parser & Renderer # ============================================================================= def parse_and_render_charts(html_content: str) -> str: """ Parse chart data from LLM output and replace with Plotly charts. Uses Plotly for browser-based rendering with proper Chinese font support. Falls back to matplotlib if Plotly is not available. """ import re import json def render_chart_plotly(chart_data: dict) -> str: """ Render a chart using Plotly for screen display AND matplotlib for PDF. Uses CSS media queries to show appropriate version. """ try: import plotly.graph_objects as go fig = go.Figure() colors = ['#2d8bb8', '#57c5b6', '#1a5f7a', '#f39c12', '#e74c3c'] for i, series in enumerate(chart_data.get('series', [])): data = series.get('data', []) if not data: continue # Filter out invalid data points valid_data = [(x, y) for x, y in data if isinstance(x, (int, float)) and isinstance(y, (int, float))] if not valid_data: continue x_vals = [p[0] for p in valid_data] y_vals = [p[1] for p in valid_data] fig.add_trace(go.Scatter( x=x_vals, y=y_vals, mode='lines+markers+text', name=series.get('name', f'系列{i+1}'), line=dict(color=colors[i % len(colors)], width=2), marker=dict(size=8), text=[f'{y}%' for y in y_vals], textposition='top center', textfont=dict(size=10, color=colors[i % len(colors)]) )) fig.update_layout( title=dict( text=chart_data.get('title', '数据趋势图'), font=dict(size=16, color='#1a5f7a'), x=0.5 ), xaxis_title=chart_data.get('x_label', '时间'), yaxis_title=chart_data.get('y_label', '含量 (%)'), font=dict(family='Noto Sans SC, Microsoft YaHei, SimHei, sans-serif'), plot_bgcolor='white', paper_bgcolor='white', xaxis=dict(gridcolor='#e0e0e0', gridwidth=1), yaxis=dict(gridcolor='#e0e0e0', gridwidth=1), showlegend=len(chart_data.get('series', [])) > 1, margin=dict(l=60, r=40, t=60, b=60), height=400 ) # Generate Plotly HTML for screen display plotly_html = fig.to_html(full_html=False, include_plotlyjs='cdn') # Also generate static image for PDF using matplotlib static_img_html = render_chart_matplotlib(chart_data) # Combine both with CSS media queries # Screen: show Plotly interactive, hide static # Print/PDF: hide Plotly, show static image combined_html = f'''
{plotly_html}
''' return combined_html except ImportError: # Plotly not available, try matplotlib return render_chart_matplotlib(chart_data) except Exception as e: return f'
图表生成错误: {str(e)}
' def render_chart_matplotlib(chart_data: dict) -> str: """ Render chart using matplotlib with embedded Chinese font support. Uses embedded font file in fonts/ directory for HuggingFace compatibility. """ try: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.font_manager as fm import io import base64 from pathlib import Path # Try to load embedded Chinese font font_path = Path(__file__).parent / 'fonts' / 'NotoSansSC-Regular.otf' chinese_font = None if font_path.exists(): try: chinese_font = fm.FontProperties(fname=str(font_path)) except: pass fig, ax = plt.subplots(figsize=(8, 5), facecolor='white') colors = ['#2d8bb8', '#57c5b6', '#1a5f7a', '#f39c12', '#e74c3c'] for i, series in enumerate(chart_data.get('series', [])): data = series.get('data', []) if not data: continue valid_data = [(x, y) for x, y in data if isinstance(x, (int, float)) and isinstance(y, (int, float))] if not valid_data: continue x_vals = [p[0] for p in valid_data] y_vals = [p[1] for p in valid_data] color = colors[i % len(colors)] # Use Chinese name if font available, otherwise translate series_name = series.get('name', f'Series {i+1}') if not chinese_font: # Comprehensive Chinese to English translation series_name = (series_name .replace('长期条件', 'Long-term') .replace('加速条件', 'Accelerated') .replace('处方', 'Formula ') .replace('批次', 'Batch ') .replace('实测数据', 'Measured') .replace('预测趋势', 'Predicted') .replace('预测', 'Predicted') .replace('趋势', 'Trend') .replace('实测', 'Measured') .replace('杂质', 'Impurity') .replace('含量', 'Content') .replace('总杂', 'Total Impurity') .replace('线', '')) # Remove any remaining Chinese characters using regex series_name = re.sub(r'[\u4e00-\u9fff]+', '', series_name).strip() ax.plot(x_vals, y_vals, 'o-', color=color, linewidth=2, markersize=8, label=series_name) # Set labels - use Chinese if font available if chinese_font: xlabel = chart_data.get('x_label', '时间 (月)') ylabel = chart_data.get('y_label', '含量 (%)') title = chart_data.get('title', '数据趋势图') ax.set_xlabel(xlabel, fontsize=11, fontproperties=chinese_font) ax.set_ylabel(ylabel, fontsize=11, fontproperties=chinese_font) ax.set_title(title, fontsize=14, fontweight='bold', color='#1a5f7a', fontproperties=chinese_font) if len(chart_data.get('series', [])) > 1: ax.legend(loc='best', framealpha=0.9, prop=chinese_font) else: ax.set_xlabel('Time (Month)', fontsize=11) ax.set_ylabel('Content (%)', fontsize=11) ax.set_title('Trend Chart', fontsize=14, fontweight='bold', color='#1a5f7a') if len(chart_data.get('series', [])) > 1: ax.legend(loc='best', framealpha=0.9) ax.grid(True, linestyle='--', alpha=0.5) plt.tight_layout() buf = io.BytesIO() fig.savefig(buf, format='png', dpi=150, bbox_inches='tight', facecolor='white') buf.seek(0) img_base64 = base64.b64encode(buf.getvalue()).decode('utf-8') plt.close(fig) return f'''
Chart
''' except Exception as e: return f'
图表生成错误: {str(e)}
' def clean_json_string(json_str: str) -> str: """Clean JSON string by removing comments and fixing common issues.""" json_str = re.sub(r'//[^\n]*', '', json_str) json_str = re.sub(r',\s*([}\]])', r'\1', json_str) last_brace = max(json_str.rfind('}'), json_str.rfind(']')) if last_brace > 0: json_str = json_str[:last_brace + 1] return json_str.strip() # Strategy 1: Find tags pattern1 = r'\s*(.*?)\s*' for match in re.finditer(pattern1, html_content, re.DOTALL | re.IGNORECASE): json_str = clean_json_string(match.group(1)) try: chart_data = json.loads(json_str) chart_html = render_chart_plotly(chart_data) html_content = html_content.replace(match.group(0), chart_html) except json.JSONDecodeError: error_html = '
图表数据格式错误
' html_content = html_content.replace(match.group(0), error_html) # Strategy 2: Find raw JSON with chart structure pattern2 = r'\{\s*"title"\s*:\s*"[^"]*"[^}]*"series"\s*:\s*\[.*?\]\s*\}' for match in re.finditer(pattern2, html_content, re.DOTALL): json_str = clean_json_string(match.group(0)) try: chart_data = json.loads(json_str) if 'series' in chart_data and 'title' in chart_data: chart_html = render_chart_plotly(chart_data) html_content = html_content.replace(match.group(0), chart_html) except json.JSONDecodeError: pass return html_content # ============================================================================= # Analysis Functions # ============================================================================= def run_compatibility_analysis( smiles: str, excipient: str, api_key: str, provider: str, progress_callback=None ) -> Tuple[str, Optional[str]]: """ Run drug-excipient compatibility analysis using ProfessionalAnalyzer. Restored from original app.deprecated.py. """ analyzer = get_professional_analyzer() if not analyzer: return "
ProfessionalAnalyzer 未加载,请检查 professional_analyzer.py
", None # Set LLM provider if api_key and provider: analyzer.model_invoker.set_provider(provider, api_key) # Run analysis try: result = analyzer.analyze( smiles=smiles.strip(), excipient_name=excipient.strip(), api_name=smiles[:20] + "..." if len(smiles) > 20 else smiles, progress_callback=progress_callback ) if not result.get("success"): return f"
分析失败: {result.get('error', 'Unknown error')}
", None # Generate HTML report html_report = analyzer.format_html_report( analysis_result=result, api_name=smiles[:30], excipient_name=excipient.strip() ) return html_report, None except Exception as e: import traceback traceback.print_exc() return f"
分析出错: {str(e)}
", None def run_stability_analysis( goal: str, files: List, api_key: str, provider: str ) -> str: """ Run stability analysis using LLM-driven approach. Simplified: LLM receives data + goal → LLM outputs report. """ model_invoker = get_model_invoker() # Set LLM provider if api_key and provider: model_invoker.set_provider(provider, api_key) # Parse files to text from utils.file_parsers import parse_file all_text = "" temp_paths = [] for uploaded_file in files: try: suffix = Path(uploaded_file.name).suffix with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(uploaded_file.getvalue()) tmp_path = tmp.name temp_paths.append(tmp_path) content = parse_file(tmp_path) if content: all_text += f"\n=== 文件: {uploaded_file.name} ===\n{content}\n" except Exception as e: st.warning(f"文件解析警告: {uploaded_file.name} - {str(e)}") if not all_text.strip(): return "
无法从上传文件中提取数据
" # Truncate if too long max_chars = 15000 if len(all_text) > max_chars: all_text = all_text[:max_chars] + "\n... [内容已截断]" # LLM-driven analysis prompt - Dynamic response to user needs system_prompt = """你是资深的药物稳定性分析专家(Ph.D.级别)。你的核心任务是: 1. 准确理解用户的分析需求 2. 深入分析提供的稳定性数据 3. 针对用户的具体问题给出专业、详细的回答 【专业分析能力】 你具备以下分析能力,根据用户需求选择性使用: - 动力学分析:零级/一级动力学拟合,计算k值、R²、货架期预测 - 批次比较:对比不同批次的稳定性表现,识别最优/最差批次 - 趋势判断:分析杂质变化趋势,识别异常数据点 - 风险评估:基于ICH Q1E原则评估稳定性风险 - 配方筛选:根据稳定性数据推荐最佳配方 【稳定性预测流程 - 当用户需要预测时必须遵循】 当用户要求进行稳定性预测时,请按以下流程执行: 1. **数据建模**:对输入数据分别进行零级动力学(y = y₀ + k·t)和一级动力学(ln(y) = ln(y₀) - k·t)拟合 2. **模型评估**:计算每个模型的R²(拟合优度),选择R²最高的模型作为最优模型 3. **参数报告**:明确报告所选模型的类型、k值、R²等关键参数 4. **预测计算**:使用最优模型进行外推预测,给出具体的预测值(如24个月后的杂质含量) 5. **结果可视化**:使用格式输出实测数据点和预测趋势线 【图表输出格式 - 极其重要!】 ⛔ 绝对禁止生成标签!你生成的SVG图表无法正确显示! ✅ 只能使用以下JSON格式输出图表数据(系统会自动渲染): { "title": "杂质含量对比", "x_label": "批次", "y_label": "杂质含量 (%)", "series": [ {"name": "总杂", "data": [[1, 2.4], [2, 2.5], [3, 2.1], [4, 2.8]]} ] } 注意:data数组中每个元素是[x坐标, y坐标],必须是数字,不是字符串! 【输出格式要求】 ⚠️ 直接输出纯HTML代码,禁止任何Markdown! ✅ 使用:

,

,

, ,
    ,
  • , , 等HTML标签 ❌ 禁止: ```html, ```, **, *, #, - 等Markdown符号 ❌ 禁止: "这是报告"等元描述文字 ❌ 禁止: 占位符如[数值]、[批次名称]等,必须填入实际数据 ⛔ 绝对禁止: 标签!不要画图!""" user_prompt = f"""【用户分析需求】 {goal} 【稳定性数据】 {all_text} 【分析任务】 请仔细阅读用户的分析需求,然后: 1. 首先明确回答用户的核心问题 2. 使用数据中的实际数值进行分析和计算 3. 如果用户要求找出最优批次,请明确给出结论和依据 4. 如果需要图表,使用JSON格式输出 5. 结论要具体、有数据支撑,不要使用占位符 直接输出HTML格式的分析报告。""" try: response = model_invoker.invoke( system_prompt=system_prompt, user_prompt=user_prompt, temperature=0.2 # Lower for more precise output ) # Check for API errors if response and hasattr(response, 'success') and not response.success: error_msg = getattr(response, 'error', 'Unknown error') return f"
    LLM API 调用失败: {error_msg}
    " # Check for placeholder mode if response and hasattr(response, 'metadata'): if response.metadata and response.metadata.get('mode') == 'placeholder': return """

    ⚠️ LLM API 未配置或配置无效

    请检查以下内容:

    • 确保已在侧边栏输入正确的 API Key
    • 确保选择了正确的 LLM 提供商
    • 如果使用 Kimi,API Key 格式应为 sk-...
    """ if response.metadata and response.metadata.get('fallback'): error_msg = getattr(response, 'error', 'API调用失败') return f"""

    ❌ LLM 调用出错

    错误信息:{error_msg}

    请检查 API Key 是否正确,或尝试其他 LLM 提供商。

    """ # Handle response if response and hasattr(response, 'content'): content = response.content elif isinstance(response, str): content = response else: content = str(response) # Post-processing: Clean up any Markdown artifacts import re # Remove markdown code blocks content = re.sub(r'```html\s*', '', content) content = re.sub(r'```svg\s*', '', content) content = re.sub(r'```\s*', '', content) # Remove markdown bold/italic that slipped through content = re.sub(r'\*\*([^*]+)\*\*', r'\1', content) content = re.sub(r'\*([^*]+)\*', r'\1', content) # Remove markdown headers content = re.sub(r'^#{1,6}\s+(.+)$', r'

    \1

    ', content, flags=re.MULTILINE) # Remove markdown list items content = re.sub(r'^-\s+(.+)$', r'
  • \1
  • ', content, flags=re.MULTILINE) # Wrap in proper HTML container if not already if not content.strip().startswith(' {content} """ # Parse and render chart data with Plotly (accurate coordinates) # First remove any LLM-generated SVG (they render incorrectly) content = remove_llm_svg(content) content = parse_and_render_charts(content) return content except Exception as e: import traceback traceback.print_exc() return f"
    LLM 分析出错: {str(e)}
    " def run_general_analysis( question: str, api_key: str, provider: str ) -> str: """ Handle general pharmaceutical questions using LLM. """ model_invoker = get_model_invoker() if api_key and provider: model_invoker.set_provider(provider, api_key) system_prompt = """你是药物稳定性和制剂研发领域的专家顾问。 请用专业但易懂的语言回答用户的问题。 如需要,可以引用ICH指南、FDA/EMA法规等权威来源。""" try: response = model_invoker.invoke( system_prompt=system_prompt, user_prompt=question, temperature=0.5 ) if response and hasattr(response, 'content'): content = response.content elif isinstance(response, str): content = response else: content = str(response) return f"""
    {content}
    """ except Exception as e: return f"
    回答生成失败: {str(e)}
    " # ============================================================================= # Main Application # ============================================================================= def main(): # Initialize session state if 'analysis_result' not in st.session_state: st.session_state.analysis_result = None if 'analysis_type' not in st.session_state: st.session_state.analysis_type = None if 'user' not in st.session_state: st.session_state.user = None if 'show_admin_panel' not in st.session_state: st.session_state.show_admin_panel = False # Sidebar - Authentication & Settings with st.sidebar: # ========== Authentication Section ========== if AUTH_AVAILABLE: if st.session_state.user: # Logged in user info user = st.session_state.user st.success(f"👤 {user['email']}") if is_admin(user): if st.button("⚙️ 管理员面板", use_container_width=True): st.session_state.show_admin_panel = not st.session_state.show_admin_panel if st.button("🚪 退出登录", use_container_width=True): st.session_state.user = None st.session_state.show_admin_panel = False st.rerun() else: # Login/Register tabs auth_tab = st.radio("", ["登录", "注册"], horizontal=True, label_visibility="collapsed") if auth_tab == "登录": email = st.text_input("邮箱/用户名", key="login_email") password = st.text_input("密码", type="password", key="login_password") if st.button("登录", use_container_width=True, type="primary"): success, msg, user_info = login_user(email, password) if success: st.session_state.user = user_info st.rerun() else: st.error(msg) else: email = st.text_input("邮箱", key="reg_email") password = st.text_input("密码", type="password", key="reg_password") password2 = st.text_input("确认密码", type="password", key="reg_password2") if st.button("注册", use_container_width=True, type="primary"): if password != password2: st.error("两次密码不一致") else: success, msg = register_user(email, password) if success: st.success(msg) else: st.error(msg) st.divider() # ========== LLM Configuration ========== # Default state: collapsed if API key exists, expanded if not # Get default config default_config = {'provider': 'kimi', 'api_key': ''} if AUTH_AVAILABLE: default_config = get_default_llm_config() current_key = default_config.get('api_key', '') expander_open = not bool(current_key) with st.expander("🔑 API 配置 & 模型选择", expanded=expander_open): provider_options = { "Moonshot Kimi": "kimi", "Google Gemini": "gemini", "OpenAI": "openai", "Deepseek": "deepseek", "智谱清言 (GLM)": "zhipu" } # Find default provider name default_provider_name = "Moonshot Kimi" for name, val in provider_options.items(): if val == default_config.get('provider', 'kimi'): default_provider_name = name break provider_name = st.selectbox( "选择提供商", list(provider_options.keys()), index=list(provider_options.keys()).index(default_provider_name) ) provider = provider_options[provider_name] # API Key input user_api_key = st.text_input("API Key", type="password", placeholder="留空则使用默认配置") if user_api_key: st.caption("✅ 已输入自定义 Key") # Determine final API key if user_api_key: api_key = user_api_key else: api_key = default_config.get('api_key', '') # Status Indicator in Sidebar (Outside expander) if api_key: st.success(f"🟢 已连接: {provider_options[provider_name]}") else: st.error("🔴 未配置 API Key") st.divider() st.header("ℹ️ 使用说明") st.info(""" **📌 相容性分析**: SMILES + 辅料名称 **📌 稳定性分析**: 上传文件 + 分析目标 **📌 通用问答**: 直接提问 """) # ========== Admin Panel (Modal-like) ========== if st.session_state.show_admin_panel and st.session_state.user and is_admin(st.session_state.user): st.markdown("---") st.header("⚙️ 管理员面板") admin_tab1, admin_tab2 = st.tabs(["👥 用户管理", "🔧 LLM 配置"]) with admin_tab1: st.subheader("注册用户列表") users = get_all_users() if users: for user in users: col1, col2, col3 = st.columns([3, 2, 2]) with col1: st.write(f"📧 {user['email']}") with col2: st.write(f"👤 {user['role']}") with col3: st.write(f"📅 {user['created_at'][:10] if user['created_at'] else 'N/A'}") else: st.info("暂无注册用户") with admin_tab2: st.subheader("默认 LLM 配置") st.caption("用户未输入 API Key 时将使用此配置") current_config = get_default_llm_config() admin_provider = st.selectbox( "默认提供商", list(provider_options.keys()), index=list(provider_options.values()).index(current_config.get('provider', 'kimi')), key="admin_provider" ) admin_api_key = st.text_input( "默认 API Key", value=current_config.get('api_key', ''), type="password", key="admin_api_key" ) if st.button("💾 保存配置", type="primary"): set_default_llm_config(provider_options[admin_provider], admin_api_key) st.success("✅ 配置已保存") st.markdown("---") # ========== Main Header (Navbar Style) ========== st.markdown("""
    Pharma K 专家系统
    Next-Gen Drug Stability Analysis
    """, unsafe_allow_html=True) # Main content - Split Pane Layout col_input, col_output = st.columns([4.5, 5.5], gap="medium") with col_input: st.markdown('
    ', unsafe_allow_html=True) # (Note: CSS targeting [data-testid="column"] handles the card look, but we keep this comment for clarity) st.subheader("📝 任务控制台") # Analysis mode tabs tab_compat, tab_stability, tab_general = st.tabs([ "相容性", "稳定性", "问答" ]) with tab_compat: st.caption("预测药物分子与辅料的相容性风险") smiles = st.text_area( "API SMILES", placeholder="可使用问答功能获取 API SMILES(如:帮我查询对乙酰氨基酚的SMILES)", height=100, key="smiles_input" ) excipient = st.text_input( "辅料名称", placeholder="例如: 乳糖, 微晶纤维素", key="excipient_input" ) # Show molecule structure preview if smiles: mol_renderer = get_molecule_renderer() if mol_renderer and mol_renderer.is_available: svg = mol_renderer.render_2d_svg(smiles, 300, 150) if svg: st.markdown(svg, unsafe_allow_html=True) st.markdown("---") compat_button = st.button("🚀 开始分析", use_container_width=True, type="primary", key="compat_btn") with tab_stability: st.caption("基于已有数据文件进行趋势与货架期分析") uploaded_files = st.file_uploader( "上传数据文件", type=["xlsx", "xls", "docx", "doc", "pdf", "csv"], accept_multiple_files=True, key="stability_files", label_visibility="collapsed" ) if not uploaded_files: st.info("👆 请先上传稳定性数据文件") # Quick Action Chips st.markdown('
    快捷指令
    ', unsafe_allow_html=True) chip_col1, chip_col2, chip_col3 = st.columns(3) def set_goal(text): st.session_state.stability_goal_input = text if "stability_goal_input" not in st.session_state: st.session_state.stability_goal_input = "" with chip_col1: st.button("📈 趋势", use_container_width=True, type="secondary", on_click=set_goal, args=("请分析各批次的杂质增长趋势,并判断是否符合限度要求。",)) with chip_col2: st.button("🔮 货架期", use_container_width=True, type="secondary", on_click=set_goal, args=("基于现有数据,请预测24个月时的含量数据。",)) with chip_col3: st.button("🏆 筛选", use_container_width=True, type="secondary", on_click=set_goal, args=("对比不同处方批次,找出最稳定的处方。",)) stability_goal = st.text_area( "详细分析目标", value=st.session_state.stability_goal_input, placeholder="或在此手动描述...", height=100, key="stability_goal_input_area", on_change=lambda: st.session_state.update({"stability_goal_input": st.session_state.stability_goal_input_area}) ) st.markdown("---") stability_button = st.button("🚀 开始分析", use_container_width=True, type="primary", key="stability_btn") with tab_general: st.caption("制剂研发领域的专业问答助手") question = st.text_area( "问题描述", placeholder="例如: ICH指南对稳定性试验有什么要求?", height=150, key="general_question", label_visibility="collapsed" ) st.markdown("---") general_button = st.button("💡 咨询AI", use_container_width=True, type="primary", key="general_btn") with col_output: st.subheader("📊 分析结果") # Determine if we have any active result is_active = False if compat_button or stability_button or general_button: is_active = True if st.session_state.get('analysis_type') == 'stability' and st.session_state.get('analysis_result'): is_active = True # Empty State if not is_active: st.markdown("""
    🧬

    准备就绪

    请在左侧选择分析模式并输入信息
    AI 专家系统将为您生成专业报告

    相容性预测
    稳定性分析
    专家问答
    """, unsafe_allow_html=True) result_container = st.container() # Handle button clicks with result_container: # Compatibility Analysis if compat_button: if not smiles or not excipient: st.warning("请输入 SMILES 结构式和辅料名称") elif not api_key: st.warning("请在侧边栏配置 API Key") else: with st.spinner("正在解析分子结构并进行相容性推演..."): html_report, _ = run_compatibility_analysis( smiles=smiles, excipient=excipient, api_key=api_key, provider=provider ) branded_report = wrap_report_with_branding(html_report, "相容性分析") # Toolbar (Download Buttons) c1, c2, c3 = st.columns([2, 1, 1]) with c2: st.download_button("📥 HTML", branded_report, f"Pharma_K_Compat_{smiles[:5]}.html", "text/html") with c3: pdf_data = generate_pdf_from_html(branded_report) if pdf_data: st.download_button("📄 PDF", pdf_data, f"Pharma_K_Compat_{smiles[:5]}.pdf", "application/pdf") st.components.v1.html(branded_report, height=800, scrolling=True) # Stability Analysis Logic elif stability_button: if not uploaded_files or not st.session_state.stability_goal_input: st.warning("请上传数据文件并描述分析目标") elif not api_key: st.warning("请在侧边栏配置 API Key") else: with st.spinner("正在进行多维度稳定性数据分析..."): html_report = run_stability_analysis( goal=st.session_state.stability_goal_input, files=uploaded_files, api_key=api_key, provider=provider ) if html_report and len(html_report.strip()) > 100: branded_report = wrap_report_with_branding(html_report, "稳定性分析") branded_report = parse_and_render_charts(branded_report) st.session_state.analysis_result = branded_report st.session_state.analysis_type = "stability" st.rerun() else: st.error("分析未返回有效结果") # Display stability analysis result from session state if st.session_state.get('analysis_type') == "stability" and st.session_state.get('analysis_result'): branded_report = st.session_state.analysis_result # Toolbar c1, c2, c3 = st.columns([2, 1, 1]) with c1: st.success("✅ 分析完成") with c2: st.download_button("📥 HTML", branded_report, "Stability_Report.html", "text/html") with c3: pdf_data = generate_pdf_from_html(branded_report) if pdf_data: st.download_button("📄 PDF", pdf_data, "Stability_Report.pdf", "application/pdf") st.components.v1.html(branded_report, height=800, scrolling=True) # General Q&A elif general_button: if not question: st.warning("请输入问题") elif not api_key: st.warning("请配置 API Key") else: with st.spinner("专家系统正在思考..."): html_response = run_general_analysis(question, api_key, provider) branded_response = wrap_report_with_branding(html_response, "专家问答") # Toolbar c1, c2, c3 = st.columns([2, 1, 1]) with c2: st.download_button("📥 HTML", branded_response, "QA_Response.html", "text/html") with c3: pdf_data = generate_pdf_from_html(branded_response) if pdf_data: st.download_button("📄 PDF", pdf_data, "QA_Response.pdf", "application/pdf") st.components.v1.html(branded_response, height=600, scrolling=True) # ============================================================================= # Entry Point # ============================================================================= if __name__ == "__main__": main()