Preformu / streamlit_app.py
Kevinshh's picture
Upload streamlit_app.py
607ffe5 verified
"""
Drug Stability & Compatibility Analysis Platform
=================================================
Simplified LLM-Driven Architecture
Features:
- Smart routing: Auto-detect analysis type from input
- Compatibility Analysis: SMILES + Excipient → ProfessionalAnalyzer
- Stability Analysis: File + Goal → LLM-driven analysis
- Universal Analysis: Natural language questions → LLM response
- User Authentication: Email registration, admin dashboard
Design Principles:
- Low barrier: Users describe what they want, system figures out how
- LLM-driven: LLM understands intent, chooses methods, generates reports
- Preserve expertise: Professional pharmaceutical knowledge in prompts
"""
import os
import sys
import tempfile
from pathlib import Path
from datetime import datetime
from typing import Optional, Tuple, List, Dict, Any
import streamlit as st
# Add project root to path
PROJECT_ROOT = Path(__file__).parent
sys.path.insert(0, str(PROJECT_ROOT))
# Import auth module
try:
from utils.auth import (
register_user, login_user, is_admin,
get_all_users, get_default_llm_config, set_default_llm_config
)
AUTH_AVAILABLE = True
except ImportError:
AUTH_AVAILABLE = False
# =============================================================================
# Page Config
# =============================================================================
st.set_page_config(
page_title="Pharma K 药物制剂相容性与稳定性分析专家系统",
page_icon="🧪",
layout="wide",
initial_sidebar_state="expanded"
)
# =============================================================================
# Custom CSS (Nordic Minimalism)
# =============================================================================
def load_css():
css_path = Path(__file__).parent / "assets" / "style.css"
if css_path.exists():
with open(css_path, "r", encoding="utf-8") as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
load_css()
# =============================================================================
# Initialize Components (Lazy Loading)
# =============================================================================
@st.cache_resource
def get_model_invoker():
"""Get or create ModelInvoker instance."""
from layers.model_invoker import ModelInvoker
return ModelInvoker()
@st.cache_resource
def get_professional_analyzer():
"""Get or create ProfessionalAnalyzer for compatibility analysis."""
try:
from layers.professional_analyzer import ProfessionalAnalyzer
return ProfessionalAnalyzer()
except ImportError as e:
st.warning(f"ProfessionalAnalyzer not available: {e}")
return None
@st.cache_resource
def get_molecule_renderer():
"""Get or create MoleculeRenderer."""
try:
from utils.molecule_renderer import MoleculeRenderer
return MoleculeRenderer()
except ImportError:
return None
# =============================================================================
# Smart Router
# =============================================================================
def detect_analysis_type(
smiles: str,
excipient: str,
goal: str,
files: List
) -> str:
"""
Automatically detect which analysis mode to use.
Returns:
"compatibility" | "stability" | "general" | "none"
"""
has_smiles = bool(smiles and smiles.strip())
has_excipient = bool(excipient and excipient.strip())
has_files = bool(files)
has_goal = bool(goal and goal.strip())
# Compatibility: SMILES + Excipient
if has_smiles and has_excipient:
return "compatibility"
# Stability: Files + Goal
if has_files and has_goal:
return "stability"
# General: Just a question
if has_goal and not has_files and not has_smiles:
return "general"
return "none"
# =============================================================================
# Report Branding Wrapper
# =============================================================================
def strip_outer_containers(content: str) -> str:
"""
Remove outer HTML containers from LLM output to avoid nested boxes.
IMPORTANT: Be careful not to remove actual content!
"""
import re
# Remove <!DOCTYPE> and <html> wrappers if present
content = re.sub(r'<!DOCTYPE[^>]*>', '', content, flags=re.IGNORECASE)
content = re.sub(r'<html[^>]*>', '', content, flags=re.IGNORECASE)
content = re.sub(r'</html>', '', content, flags=re.IGNORECASE)
content = re.sub(r'<head>.*?</head>', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'<body[^>]*>', '', content, flags=re.IGNORECASE)
content = re.sub(r'</body>', '', content, flags=re.IGNORECASE)
# Remove only specific header-style divs (short ones, not content-heavy)
# Only match if the div is short (less than 200 chars including tags)
def remove_short_header_divs(match):
full_match = match.group(0)
if len(full_match) < 300: # Only remove if it's a short header
return ''
return full_match # Keep longer content divs
content = re.sub(
r'<div[^>]*(?:background:\s*linear-gradient)[^>]*>.*?</div>',
remove_short_header_divs, content, flags=re.IGNORECASE | re.DOTALL
)
return content.strip()
def wrap_report_with_branding(content: str, report_type: str = "分析") -> str:
"""
Wrap report content with Pharma K branding header and footer.
First strips any outer containers from the content.
Uses Google Fonts for better Chinese character support in PDF.
"""
from datetime import datetime
# Strip nested containers
clean_content = strip_outer_containers(content)
branded_html = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Pharma K - {report_type}报告</title>
<!-- Google Fonts for Chinese PDF support -->
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans+SC:wght@300;400;500;700&display=swap" rel="stylesheet">
<style>
@import url('https://fonts.googleapis.com/css2?family=Noto+Sans+SC:wght@300;400;500;700&display=swap');
:root {{
--primary: #008080;
--primary-color: #003366;
--secondary: #2A9D8F;
--secondary-color: #0066cc;
--text-main: #2C3E50;
--text-light: #7F8C8D;
--bg-light: #F8F9FA;
--border-color: #EAEAEA;
--risk-none: #28a745;
--risk-low: #17a2b8;
--risk-medium: #ffc107;
--risk-high: #dc3545;
}}
@media print {{
.no-print {{ display: none; }}
.page-break {{ page-break-before: always; }}
body {{ background: white !important; -webkit-print-color-adjust: exact; }}
}}
* {{ box-sizing: border-box; }}
html, body {{
margin: 0;
padding: 0;
background: #EDF2F2; /* Screen background */
min-height: 100%;
}}
body {{
font-family: 'Noto Sans SC', 'Microsoft YaHei', 'PingFang SC', sans-serif;
line-height: 1.6;
color: var(--text-main);
font-size: 14px;
}}
.pharma-k-container {{
max-width: 850px;
margin: 40px auto;
background: white;
box-shadow: 0 4px 25px rgba(0,0,0,0.05); /* Soft shadow for depth */
border-radius: 8px; /* Slight rounding */
}}
/* Clean Header */
.pharma-k-header {{
background: white;
padding: 30px 40px;
border-bottom: 2px solid var(--primary);
display: flex;
justify-content: space-between;
align-items: center;
}}
.logo-box {{
color: var(--primary);
}}
.pharma-k-logo {{
font-size: 24px;
font-weight: 700;
letter-spacing: 0.5px;
color: var(--primary);
}}
.pharma-k-subtitle {{
font-size: 12px;
color: var(--text-light);
margin-top: 4px;
text-transform: uppercase;
letter-spacing: 1px;
}}
/* Content Area */
.report-content {{
padding: 40px;
}}
/* Typography */
.report-content h1 {{
font-size: 22px;
font-weight: 700;
color: var(--primary);
margin-bottom: 25px;
padding-bottom: 10px;
border-bottom: 1px solid var(--border-color);
}}
.report-content h2 {{
font-size: 18px;
font-weight: 600;
color: var(--text-main);
margin-top: 30px;
margin-bottom: 15px;
border-left: 4px solid var(--secondary);
padding-left: 12px;
}}
.report-content h3 {{
font-size: 15px;
font-weight: 600;
color: var(--text-main);
margin-top: 20px;
margin-bottom: 10px;
}}
.report-content p {{
margin: 10px 0;
color: #444;
text-align: justify;
}}
/* Tables - Minimalism */
.report-content table {{
width: 100%;
border-collapse: collapse;
margin: 20px 0;
font-size: 13px;
}}
.report-content th {{
border-bottom: 2px solid var(--primary);
color: var(--primary);
padding: 10px;
text-align: left;
font-weight: 600;
}}
.report-content td {{
border-bottom: 1px solid var(--border-color);
padding: 10px;
color: #555;
}}
.report-content tr:last-child td {{
border-bottom: none;
}}
.report-content td {{
padding: 6px 10px;
border-bottom: 1px solid #e0e0e0;
}}
.report-content tr:nth-child(even) {{
background: #f9f9f9;
}}
.report-content svg {{
display: block;
margin: 15px auto;
max-width: 100%;
}}
.pharma-k-footer {{
background: linear-gradient(135deg, #1a5f7a 0%, #2d8bb8 100%);
color: white;
padding: 12px 30px;
text-align: center;
font-size: 10px;
}}
.pharma-k-footer .brand {{
font-size: 12px;
font-weight: 700;
margin-bottom: 4px;
}}
/* QbD Report Sections */
.section {{ margin-bottom: 25px; page-break-inside: avoid; }}
.section-title {{
font-size: 16px;
font-weight: bold;
color: var(--primary-color);
margin-bottom: 15px;
padding-bottom: 5px;
border-bottom: 2px solid var(--border-color);
display: flex;
align-items: center;
}}
.section-number {{
background: var(--primary-color);
color: white;
width: 24px;
height: 24px;
border-radius: 50%;
display: inline-flex;
align-items: center;
justify-content: center;
margin-right: 10px;
font-size: 12px;
}}
.section-content {{ word-wrap: break-word; overflow-wrap: break-word; }}
/* QbD Grid Layout */
.qbd-grid {{ display: grid; grid-template-columns: 2fr 1fr; gap: 20px; }}
.structure-card {{
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 15px;
text-align: center;
background: white;
}}
.structure-image {{ max-width: 100%; max-height: 180px; object-fit: contain; }}
.risk-matrix-container {{
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 15px;
background: white;
}}
/* Report Meta Grid */
.report-meta-grid {{
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 15px;
margin-top: 15px;
background: var(--bg-light);
padding: 10px;
border-radius: 4px;
font-size: 12px;
}}
.meta-label {{ font-weight: bold; color: var(--secondary-color); display: block; }}
/* Functional Group Cards */
.group-card {{
background: #f8f9fa;
border: 1px solid var(--border-color);
border-radius: 6px;
padding: 12px;
margin-bottom: 10px;
}}
.group-header {{ display: flex; justify-content: space-between; align-items: center; margin-bottom: 5px; }}
.group-name {{ font-weight: 600; color: var(--primary-color); font-size: 13px; }}
.group-property {{ font-size: 10px; padding: 2px 6px; border-radius: 3px; font-weight: 500; }}
.property-acidic {{ background: #ffe0e0; color: #c62828; }}
.property-basic {{ background: #e0f0ff; color: #1565c0; }}
.property-neutral {{ background: #e8e8e8; color: #666; }}
.group-reactions {{ display: flex; flex-wrap: wrap; gap: 4px; margin-top: 8px; }}
.reaction-tag {{
display: inline-block;
background: var(--secondary-color);
color: white;
padding: 2px 8px;
border-radius: 3px;
font-size: 10px;
white-space: nowrap;
}}
/* Properties Table */
.properties-table {{ width: 100%; border-collapse: collapse; font-size: 12px; }}
.properties-table th {{ text-align: left; color: #666; width: 40%; padding: 6px; }}
.properties-table td {{ padding: 6px; font-weight: 500; }}
/* Action Checklist */
.action-list {{ list-style: none; padding: 0; margin: 0; }}
.action-item {{
margin-bottom: 8px;
padding: 8px 12px;
border-left: 3px solid transparent;
background: #fcfcfc;
border: 1px solid #eee;
border-radius: 4px;
}}
.action-must {{ border-left-color: var(--risk-high) !important; background: #fff5f5; }}
.action-suggest {{ border-left-color: var(--risk-medium) !important; background: #fffbf0; }}
.action-check {{ border-left-color: var(--risk-low) !important; background: #f0f8ff; }}
.highlight-tag {{
font-size: 10px;
text-transform: uppercase;
padding: 2px 4px;
border-radius: 2px;
margin-right: 8px;
color: white;
font-weight: bold;
}}
.bg-high {{ background-color: var(--risk-high); }}
.bg-medium {{ background-color: var(--risk-medium); color: #333; }}
.bg-low {{ background-color: var(--risk-low); }}
.bg-none {{ background-color: var(--risk-none); }}
</style>
</head>
<body>
<div class="pharma-k-container">
<!-- Clean Nordic Header -->
<div class="pharma-k-header">
<div class="logo-box">
<div class="pharma-k-logo">Pharma K</div>
<div class="pharma-k-subtitle">药物制剂相容性与稳定性分析专家系统</div>
</div>
<div style="text-align: right; color: var(--primary);">
<div style="font-size: 16px; font-weight: 700;">{report_type}报告</div>
<div style="font-size: 12px; color: var(--text-light);">{datetime.now().strftime('%Y-%m-%d')}</div>
</div>
</div>
<div class="report-content">
{clean_content}
</div>
<!-- Minimal Footer -->
<div style="text-align: center; padding: 30px; border-top: 1px solid var(--border-color); color: var(--text-light); font-size: 11px;">
<div style="font-weight: 500; margin-bottom: 5px;">Pharma K Analysis System</div>
<div>AI 辅助生成 · 仅供科研参考</div>
<div style="margin-top: 5px;">© {datetime.now().year} All Rights Reserved</div>
</div>
</div>
</body>
</html>"""
return branded_html
def generate_pdf_from_html(html_content: str) -> bytes:
"""
Generate PDF from HTML content.
Uses pdfkit/wkhtmltopdf if available, otherwise returns None.
Pre-processes HTML to show static charts instead of Plotly JavaScript charts.
"""
import re
# Pre-process: For PDF, hide Plotly charts and show static images
# Replace display:block with display:none for Plotly charts
pdf_html = re.sub(r'class="chart-plotly"\s*style="display:block;"',
'class="chart-plotly" style="display:none;"', html_content)
# Replace display:none with display:block for static charts
pdf_html = re.sub(r'class="chart-static"\s*style="display:none;"',
'class="chart-static" style="display:block;"', pdf_html)
# Remove Plotly JavaScript to reduce PDF size
pdf_html = re.sub(r'<script[^>]*plotly[^>]*>.*?</script>', '', pdf_html, flags=re.DOTALL | re.IGNORECASE)
try:
# Try using pdfkit (requires wkhtmltopdf installed)
import pdfkit
pdf_bytes = pdfkit.from_string(pdf_html, False, options={
'encoding': 'UTF-8',
'page-size': 'A4',
'margin-top': '10mm',
'margin-bottom': '10mm',
'margin-left': '10mm',
'margin-right': '10mm',
'enable-local-file-access': None,
})
return pdf_bytes
except:
pass
try:
# Try using weasyprint
from weasyprint import HTML
import io
pdf_buffer = io.BytesIO()
HTML(string=pdf_html).write_pdf(pdf_buffer)
return pdf_buffer.getvalue()
except:
pass
# If no PDF library available, return None
return None
# =============================================================================
# SVG Removal & Chart Data Parser
# =============================================================================
def remove_llm_svg(html_content: str) -> str:
"""
Remove any SVG elements generated by LLM (they often render incorrectly).
These should be replaced by CHART_DATA JSON which is then rendered by Plotly.
"""
import re
# Remove <svg>...</svg> blocks (LLM-generated charts)
html_content = re.sub(r'<svg[^>]*>.*?</svg>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
return html_content
# Chart Data Parser & Renderer
# =============================================================================
def parse_and_render_charts(html_content: str) -> str:
"""
Parse chart data from LLM output and replace with Plotly charts.
Uses Plotly for browser-based rendering with proper Chinese font support.
Falls back to matplotlib if Plotly is not available.
"""
import re
import json
def render_chart_plotly(chart_data: dict) -> str:
"""
Render a chart using Plotly for screen display AND matplotlib for PDF.
Uses CSS media queries to show appropriate version.
"""
try:
import plotly.graph_objects as go
fig = go.Figure()
colors = ['#2d8bb8', '#57c5b6', '#1a5f7a', '#f39c12', '#e74c3c']
for i, series in enumerate(chart_data.get('series', [])):
data = series.get('data', [])
if not data:
continue
# Filter out invalid data points
valid_data = [(x, y) for x, y in data if isinstance(x, (int, float)) and isinstance(y, (int, float))]
if not valid_data:
continue
x_vals = [p[0] for p in valid_data]
y_vals = [p[1] for p in valid_data]
fig.add_trace(go.Scatter(
x=x_vals,
y=y_vals,
mode='lines+markers+text',
name=series.get('name', f'系列{i+1}'),
line=dict(color=colors[i % len(colors)], width=2),
marker=dict(size=8),
text=[f'{y}%' for y in y_vals],
textposition='top center',
textfont=dict(size=10, color=colors[i % len(colors)])
))
fig.update_layout(
title=dict(
text=chart_data.get('title', '数据趋势图'),
font=dict(size=16, color='#1a5f7a'),
x=0.5
),
xaxis_title=chart_data.get('x_label', '时间'),
yaxis_title=chart_data.get('y_label', '含量 (%)'),
font=dict(family='Noto Sans SC, Microsoft YaHei, SimHei, sans-serif'),
plot_bgcolor='white',
paper_bgcolor='white',
xaxis=dict(gridcolor='#e0e0e0', gridwidth=1),
yaxis=dict(gridcolor='#e0e0e0', gridwidth=1),
showlegend=len(chart_data.get('series', [])) > 1,
margin=dict(l=60, r=40, t=60, b=60),
height=400
)
# Generate Plotly HTML for screen display
plotly_html = fig.to_html(full_html=False, include_plotlyjs='cdn')
# Also generate static image for PDF using matplotlib
static_img_html = render_chart_matplotlib(chart_data)
# Combine both with CSS media queries
# Screen: show Plotly interactive, hide static
# Print/PDF: hide Plotly, show static image
combined_html = f'''
<div style="margin:20px 0;">
<div class="chart-plotly" style="display:block;">{plotly_html}</div>
<div class="chart-static" style="display:none;">{static_img_html}</div>
<style>
@media print {{
.chart-plotly {{ display: none !important; }}
.chart-static {{ display: block !important; }}
}}
</style>
</div>
'''
return combined_html
except ImportError:
# Plotly not available, try matplotlib
return render_chart_matplotlib(chart_data)
except Exception as e:
return f'<div style="color:red;padding:10px;">图表生成错误: {str(e)}</div>'
def render_chart_matplotlib(chart_data: dict) -> str:
"""
Render chart using matplotlib with embedded Chinese font support.
Uses embedded font file in fonts/ directory for HuggingFace compatibility.
"""
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import io
import base64
from pathlib import Path
# Try to load embedded Chinese font
font_path = Path(__file__).parent / 'fonts' / 'NotoSansSC-Regular.otf'
chinese_font = None
if font_path.exists():
try:
chinese_font = fm.FontProperties(fname=str(font_path))
except:
pass
fig, ax = plt.subplots(figsize=(8, 5), facecolor='white')
colors = ['#2d8bb8', '#57c5b6', '#1a5f7a', '#f39c12', '#e74c3c']
for i, series in enumerate(chart_data.get('series', [])):
data = series.get('data', [])
if not data:
continue
valid_data = [(x, y) for x, y in data if isinstance(x, (int, float)) and isinstance(y, (int, float))]
if not valid_data:
continue
x_vals = [p[0] for p in valid_data]
y_vals = [p[1] for p in valid_data]
color = colors[i % len(colors)]
# Use Chinese name if font available, otherwise translate
series_name = series.get('name', f'Series {i+1}')
if not chinese_font:
# Comprehensive Chinese to English translation
series_name = (series_name
.replace('长期条件', 'Long-term')
.replace('加速条件', 'Accelerated')
.replace('处方', 'Formula ')
.replace('批次', 'Batch ')
.replace('实测数据', 'Measured')
.replace('预测趋势', 'Predicted')
.replace('预测', 'Predicted')
.replace('趋势', 'Trend')
.replace('实测', 'Measured')
.replace('杂质', 'Impurity')
.replace('含量', 'Content')
.replace('总杂', 'Total Impurity')
.replace('线', ''))
# Remove any remaining Chinese characters using regex
series_name = re.sub(r'[\u4e00-\u9fff]+', '', series_name).strip()
ax.plot(x_vals, y_vals, 'o-', color=color, linewidth=2,
markersize=8, label=series_name)
# Set labels - use Chinese if font available
if chinese_font:
xlabel = chart_data.get('x_label', '时间 (月)')
ylabel = chart_data.get('y_label', '含量 (%)')
title = chart_data.get('title', '数据趋势图')
ax.set_xlabel(xlabel, fontsize=11, fontproperties=chinese_font)
ax.set_ylabel(ylabel, fontsize=11, fontproperties=chinese_font)
ax.set_title(title, fontsize=14, fontweight='bold', color='#1a5f7a', fontproperties=chinese_font)
if len(chart_data.get('series', [])) > 1:
ax.legend(loc='best', framealpha=0.9, prop=chinese_font)
else:
ax.set_xlabel('Time (Month)', fontsize=11)
ax.set_ylabel('Content (%)', fontsize=11)
ax.set_title('Trend Chart', fontsize=14, fontweight='bold', color='#1a5f7a')
if len(chart_data.get('series', [])) > 1:
ax.legend(loc='best', framealpha=0.9)
ax.grid(True, linestyle='--', alpha=0.5)
plt.tight_layout()
buf = io.BytesIO()
fig.savefig(buf, format='png', dpi=150, bbox_inches='tight', facecolor='white')
buf.seek(0)
img_base64 = base64.b64encode(buf.getvalue()).decode('utf-8')
plt.close(fig)
return f'''<div style="text-align:center;margin:20px 0;">
<img src="data:image/png;base64,{img_base64}"
style="max-width:100%;border:1px solid #e0e0e0;border-radius:8px;"
alt="Chart"/>
</div>'''
except Exception as e:
return f'<div style="color:red;padding:10px;">图表生成错误: {str(e)}</div>'
def clean_json_string(json_str: str) -> str:
"""Clean JSON string by removing comments and fixing common issues."""
json_str = re.sub(r'//[^\n]*', '', json_str)
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
last_brace = max(json_str.rfind('}'), json_str.rfind(']'))
if last_brace > 0:
json_str = json_str[:last_brace + 1]
return json_str.strip()
# Strategy 1: Find <CHART_DATA> tags
pattern1 = r'<CHART_DATA>\s*(.*?)\s*</CHART_DATA>'
for match in re.finditer(pattern1, html_content, re.DOTALL | re.IGNORECASE):
json_str = clean_json_string(match.group(1))
try:
chart_data = json.loads(json_str)
chart_html = render_chart_plotly(chart_data)
html_content = html_content.replace(match.group(0), chart_html)
except json.JSONDecodeError:
error_html = '<div style="color:#856404;background:#fff3cd;padding:15px;border-radius:8px;margin:15px 0;"><strong>图表数据格式错误</strong></div>'
html_content = html_content.replace(match.group(0), error_html)
# Strategy 2: Find raw JSON with chart structure
pattern2 = r'\{\s*"title"\s*:\s*"[^"]*"[^}]*"series"\s*:\s*\[.*?\]\s*\}'
for match in re.finditer(pattern2, html_content, re.DOTALL):
json_str = clean_json_string(match.group(0))
try:
chart_data = json.loads(json_str)
if 'series' in chart_data and 'title' in chart_data:
chart_html = render_chart_plotly(chart_data)
html_content = html_content.replace(match.group(0), chart_html)
except json.JSONDecodeError:
pass
return html_content
# =============================================================================
# Analysis Functions
# =============================================================================
def run_compatibility_analysis(
smiles: str,
excipient: str,
api_key: str,
provider: str,
progress_callback=None
) -> Tuple[str, Optional[str]]:
"""
Run drug-excipient compatibility analysis using ProfessionalAnalyzer.
Restored from original app.deprecated.py.
"""
analyzer = get_professional_analyzer()
if not analyzer:
return "<div class='warning-box'>ProfessionalAnalyzer 未加载,请检查 professional_analyzer.py</div>", None
# Set LLM provider
if api_key and provider:
analyzer.model_invoker.set_provider(provider, api_key)
# Run analysis
try:
result = analyzer.analyze(
smiles=smiles.strip(),
excipient_name=excipient.strip(),
api_name=smiles[:20] + "..." if len(smiles) > 20 else smiles,
progress_callback=progress_callback
)
if not result.get("success"):
return f"<div class='warning-box'>分析失败: {result.get('error', 'Unknown error')}</div>", None
# Generate HTML report
html_report = analyzer.format_html_report(
analysis_result=result,
api_name=smiles[:30],
excipient_name=excipient.strip()
)
return html_report, None
except Exception as e:
import traceback
traceback.print_exc()
return f"<div class='warning-box'>分析出错: {str(e)}</div>", None
def run_stability_analysis(
goal: str,
files: List,
api_key: str,
provider: str
) -> str:
"""
Run stability analysis using LLM-driven approach.
Simplified: LLM receives data + goal → LLM outputs report.
"""
model_invoker = get_model_invoker()
# Set LLM provider
if api_key and provider:
model_invoker.set_provider(provider, api_key)
# Parse files to text
from utils.file_parsers import parse_file
all_text = ""
temp_paths = []
for uploaded_file in files:
try:
suffix = Path(uploaded_file.name).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(uploaded_file.getvalue())
tmp_path = tmp.name
temp_paths.append(tmp_path)
content = parse_file(tmp_path)
if content:
all_text += f"\n=== 文件: {uploaded_file.name} ===\n{content}\n"
except Exception as e:
st.warning(f"文件解析警告: {uploaded_file.name} - {str(e)}")
if not all_text.strip():
return "<div class='warning-box'>无法从上传文件中提取数据</div>"
# Truncate if too long
max_chars = 15000
if len(all_text) > max_chars:
all_text = all_text[:max_chars] + "\n... [内容已截断]"
# LLM-driven analysis prompt - Dynamic response to user needs
system_prompt = """你是资深的药物稳定性分析专家(Ph.D.级别)。你的核心任务是:
1. 准确理解用户的分析需求
2. 深入分析提供的稳定性数据
3. 针对用户的具体问题给出专业、详细的回答
【专业分析能力】
你具备以下分析能力,根据用户需求选择性使用:
- 动力学分析:零级/一级动力学拟合,计算k值、R²、货架期预测
- 批次比较:对比不同批次的稳定性表现,识别最优/最差批次
- 趋势判断:分析杂质变化趋势,识别异常数据点
- 风险评估:基于ICH Q1E原则评估稳定性风险
- 配方筛选:根据稳定性数据推荐最佳配方
【稳定性预测流程 - 当用户需要预测时必须遵循】
当用户要求进行稳定性预测时,请按以下流程执行:
1. **数据建模**:对输入数据分别进行零级动力学(y = y₀ + k·t)和一级动力学(ln(y) = ln(y₀) - k·t)拟合
2. **模型评估**:计算每个模型的R²(拟合优度),选择R²最高的模型作为最优模型
3. **参数报告**:明确报告所选模型的类型、k值、R²等关键参数
4. **预测计算**:使用最优模型进行外推预测,给出具体的预测值(如24个月后的杂质含量)
5. **结果可视化**:使用<CHART_DATA>格式输出实测数据点和预测趋势线
【图表输出格式 - 极其重要!】
⛔ 绝对禁止生成<svg>标签!你生成的SVG图表无法正确显示!
✅ 只能使用以下JSON格式输出图表数据(系统会自动渲染):
<CHART_DATA>
{
"title": "杂质含量对比",
"x_label": "批次",
"y_label": "杂质含量 (%)",
"series": [
{"name": "总杂", "data": [[1, 2.4], [2, 2.5], [3, 2.1], [4, 2.8]]}
]
}
</CHART_DATA>
注意:data数组中每个元素是[x坐标, y坐标],必须是数字,不是字符串!
【输出格式要求】
⚠️ 直接输出纯HTML代码,禁止任何Markdown!
✅ 使用: <h2>, <h3>, <p>, <table>, <ul>, <li>, <strong>, <em> 等HTML标签
❌ 禁止: ```html, ```, **, *, #, - 等Markdown符号
❌ 禁止: "这是报告"等元描述文字
❌ 禁止: 占位符如[数值]、[批次名称]等,必须填入实际数据
⛔ 绝对禁止: <svg>标签!不要画图!"""
user_prompt = f"""【用户分析需求】
{goal}
【稳定性数据】
{all_text}
【分析任务】
请仔细阅读用户的分析需求,然后:
1. 首先明确回答用户的核心问题
2. 使用数据中的实际数值进行分析和计算
3. 如果用户要求找出最优批次,请明确给出结论和依据
4. 如果需要图表,使用<CHART_DATA>JSON格式输出
5. 结论要具体、有数据支撑,不要使用占位符
直接输出HTML格式的分析报告。"""
try:
response = model_invoker.invoke(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=0.2 # Lower for more precise output
)
# Check for API errors
if response and hasattr(response, 'success') and not response.success:
error_msg = getattr(response, 'error', 'Unknown error')
return f"<div style='color:red;padding:20px;'>LLM API 调用失败: {error_msg}</div>"
# Check for placeholder mode
if response and hasattr(response, 'metadata'):
if response.metadata and response.metadata.get('mode') == 'placeholder':
return """<div style='padding:20px;background:#fff3cd;border-left:4px solid #ffc107;'>
<h3>⚠️ LLM API 未配置或配置无效</h3>
<p>请检查以下内容:</p>
<ul>
<li>确保已在侧边栏输入正确的 API Key</li>
<li>确保选择了正确的 LLM 提供商</li>
<li>如果使用 Kimi,API Key 格式应为 <code>sk-...</code></li>
</ul>
</div>"""
if response.metadata and response.metadata.get('fallback'):
error_msg = getattr(response, 'error', 'API调用失败')
return f"""<div style='padding:20px;background:#f8d7da;border-left:4px solid #dc3545;'>
<h3>❌ LLM 调用出错</h3>
<p><strong>错误信息:</strong>{error_msg}</p>
<p>请检查 API Key 是否正确,或尝试其他 LLM 提供商。</p>
</div>"""
# Handle response
if response and hasattr(response, 'content'):
content = response.content
elif isinstance(response, str):
content = response
else:
content = str(response)
# Post-processing: Clean up any Markdown artifacts
import re
# Remove markdown code blocks
content = re.sub(r'```html\s*', '', content)
content = re.sub(r'```svg\s*', '', content)
content = re.sub(r'```\s*', '', content)
# Remove markdown bold/italic that slipped through
content = re.sub(r'\*\*([^*]+)\*\*', r'<strong>\1</strong>', content)
content = re.sub(r'\*([^*]+)\*', r'<em>\1</em>', content)
# Remove markdown headers
content = re.sub(r'^#{1,6}\s+(.+)$', r'<h3>\1</h3>', content, flags=re.MULTILINE)
# Remove markdown list items
content = re.sub(r'^-\s+(.+)$', r'<li>\1</li>', content, flags=re.MULTILINE)
# Wrap in proper HTML container if not already
if not content.strip().startswith('<!DOCTYPE') and not content.strip().startswith('<html') and not content.strip().startswith('<div'):
content = f"""
<div style="font-family: 'Microsoft YaHei', sans-serif; line-height: 1.8; padding: 20px; max-width: 900px; margin: 0 auto;">
{content}
</div>
"""
# Parse and render chart data with Plotly (accurate coordinates)
# First remove any LLM-generated SVG (they render incorrectly)
content = remove_llm_svg(content)
content = parse_and_render_charts(content)
return content
except Exception as e:
import traceback
traceback.print_exc()
return f"<div style='color:red;padding:20px;'>LLM 分析出错: {str(e)}</div>"
def run_general_analysis(
question: str,
api_key: str,
provider: str
) -> str:
"""
Handle general pharmaceutical questions using LLM.
"""
model_invoker = get_model_invoker()
if api_key and provider:
model_invoker.set_provider(provider, api_key)
system_prompt = """你是药物稳定性和制剂研发领域的专家顾问。
请用专业但易懂的语言回答用户的问题。
如需要,可以引用ICH指南、FDA/EMA法规等权威来源。"""
try:
response = model_invoker.invoke(
system_prompt=system_prompt,
user_prompt=question,
temperature=0.5
)
if response and hasattr(response, 'content'):
content = response.content
elif isinstance(response, str):
content = response
else:
content = str(response)
return f"""
<div style="font-family: 'Microsoft YaHei', sans-serif; line-height: 1.8; padding: 20px; background: #f8fafc; border-radius: 10px;">
{content}
</div>
"""
except Exception as e:
return f"<div class='warning-box'>回答生成失败: {str(e)}</div>"
# =============================================================================
# Main Application
# =============================================================================
def main():
# Initialize session state
if 'analysis_result' not in st.session_state:
st.session_state.analysis_result = None
if 'analysis_type' not in st.session_state:
st.session_state.analysis_type = None
if 'user' not in st.session_state:
st.session_state.user = None
if 'show_admin_panel' not in st.session_state:
st.session_state.show_admin_panel = False
# Sidebar - Authentication & Settings
with st.sidebar:
# ========== Authentication Section ==========
if AUTH_AVAILABLE:
if st.session_state.user:
# Logged in user info
user = st.session_state.user
st.success(f"👤 {user['email']}")
if is_admin(user):
if st.button("⚙️ 管理员面板", use_container_width=True):
st.session_state.show_admin_panel = not st.session_state.show_admin_panel
if st.button("🚪 退出登录", use_container_width=True):
st.session_state.user = None
st.session_state.show_admin_panel = False
st.rerun()
else:
# Login/Register tabs
auth_tab = st.radio("", ["登录", "注册"], horizontal=True, label_visibility="collapsed")
if auth_tab == "登录":
email = st.text_input("邮箱/用户名", key="login_email")
password = st.text_input("密码", type="password", key="login_password")
if st.button("登录", use_container_width=True, type="primary"):
success, msg, user_info = login_user(email, password)
if success:
st.session_state.user = user_info
st.rerun()
else:
st.error(msg)
else:
email = st.text_input("邮箱", key="reg_email")
password = st.text_input("密码", type="password", key="reg_password")
password2 = st.text_input("确认密码", type="password", key="reg_password2")
if st.button("注册", use_container_width=True, type="primary"):
if password != password2:
st.error("两次密码不一致")
else:
success, msg = register_user(email, password)
if success:
st.success(msg)
else:
st.error(msg)
st.divider()
# ========== LLM Configuration ==========
# Default state: collapsed if API key exists, expanded if not
# Get default config
default_config = {'provider': 'kimi', 'api_key': ''}
if AUTH_AVAILABLE:
default_config = get_default_llm_config()
current_key = default_config.get('api_key', '')
expander_open = not bool(current_key)
with st.expander("🔑 API 配置 & 模型选择", expanded=expander_open):
provider_options = {
"Moonshot Kimi": "kimi",
"Google Gemini": "gemini",
"OpenAI": "openai",
"Deepseek": "deepseek",
"智谱清言 (GLM)": "zhipu"
}
# Find default provider name
default_provider_name = "Moonshot Kimi"
for name, val in provider_options.items():
if val == default_config.get('provider', 'kimi'):
default_provider_name = name
break
provider_name = st.selectbox(
"选择提供商",
list(provider_options.keys()),
index=list(provider_options.keys()).index(default_provider_name)
)
provider = provider_options[provider_name]
# API Key input
user_api_key = st.text_input("API Key", type="password", placeholder="留空则使用默认配置")
if user_api_key:
st.caption("✅ 已输入自定义 Key")
# Determine final API key
if user_api_key:
api_key = user_api_key
else:
api_key = default_config.get('api_key', '')
# Status Indicator in Sidebar (Outside expander)
if api_key:
st.success(f"🟢 已连接: {provider_options[provider_name]}")
else:
st.error("🔴 未配置 API Key")
st.divider()
st.header("ℹ️ 使用说明")
st.info("""
**📌 相容性分析**: SMILES + 辅料名称
**📌 稳定性分析**: 上传文件 + 分析目标
**📌 通用问答**: 直接提问
""")
# ========== Admin Panel (Modal-like) ==========
if st.session_state.show_admin_panel and st.session_state.user and is_admin(st.session_state.user):
st.markdown("---")
st.header("⚙️ 管理员面板")
admin_tab1, admin_tab2 = st.tabs(["👥 用户管理", "🔧 LLM 配置"])
with admin_tab1:
st.subheader("注册用户列表")
users = get_all_users()
if users:
for user in users:
col1, col2, col3 = st.columns([3, 2, 2])
with col1:
st.write(f"📧 {user['email']}")
with col2:
st.write(f"👤 {user['role']}")
with col3:
st.write(f"📅 {user['created_at'][:10] if user['created_at'] else 'N/A'}")
else:
st.info("暂无注册用户")
with admin_tab2:
st.subheader("默认 LLM 配置")
st.caption("用户未输入 API Key 时将使用此配置")
current_config = get_default_llm_config()
admin_provider = st.selectbox(
"默认提供商",
list(provider_options.keys()),
index=list(provider_options.values()).index(current_config.get('provider', 'kimi')),
key="admin_provider"
)
admin_api_key = st.text_input(
"默认 API Key",
value=current_config.get('api_key', ''),
type="password",
key="admin_api_key"
)
if st.button("💾 保存配置", type="primary"):
set_default_llm_config(provider_options[admin_provider], admin_api_key)
st.success("✅ 配置已保存")
st.markdown("---")
# ========== Main Header (Navbar Style) ==========
st.markdown("""
<div class="main-header" style="text-align: left !important; padding: 1rem 0 !important; margin-bottom: 2rem !important; border-bottom: 1px solid #eee;">
<div style="display: flex; align-items: center; justify-content: space-between;">
<div style="font-size: 1.5rem; font-weight: 700; color: #008080;">
Pharma K <span style="font-size: 0.9rem; font-weight: 400; color: #7F8C8D; margin-left: 10px;">专家系统</span>
</div>
<div style="font-size: 0.9rem; color: #7F8C8D;">
Next-Gen Drug Stability Analysis
</div>
</div>
</div>
""", unsafe_allow_html=True)
# Main content - Split Pane Layout
col_input, col_output = st.columns([4.5, 5.5], gap="medium")
with col_input:
st.markdown('<div class="nordic-card">', unsafe_allow_html=True)
# (Note: CSS targeting [data-testid="column"] handles the card look, but we keep this comment for clarity)
st.subheader("📝 任务控制台")
# Analysis mode tabs
tab_compat, tab_stability, tab_general = st.tabs([
"相容性", "稳定性", "问答"
])
with tab_compat:
st.caption("预测药物分子与辅料的相容性风险")
smiles = st.text_area(
"API SMILES",
placeholder="可使用问答功能获取 API SMILES(如:帮我查询对乙酰氨基酚的SMILES)",
height=100,
key="smiles_input"
)
excipient = st.text_input(
"辅料名称",
placeholder="例如: 乳糖, 微晶纤维素",
key="excipient_input"
)
# Show molecule structure preview
if smiles:
mol_renderer = get_molecule_renderer()
if mol_renderer and mol_renderer.is_available:
svg = mol_renderer.render_2d_svg(smiles, 300, 150)
if svg:
st.markdown(svg, unsafe_allow_html=True)
st.markdown("---")
compat_button = st.button("🚀 开始分析", use_container_width=True, type="primary", key="compat_btn")
with tab_stability:
st.caption("基于已有数据文件进行趋势与货架期分析")
uploaded_files = st.file_uploader(
"上传数据文件",
type=["xlsx", "xls", "docx", "doc", "pdf", "csv"],
accept_multiple_files=True,
key="stability_files",
label_visibility="collapsed"
)
if not uploaded_files:
st.info("👆 请先上传稳定性数据文件")
# Quick Action Chips
st.markdown('<div style="margin: 15px 0 8px; font-size: 13px; font-weight: 500; color: #666;">快捷指令</div>', unsafe_allow_html=True)
chip_col1, chip_col2, chip_col3 = st.columns(3)
def set_goal(text):
st.session_state.stability_goal_input = text
if "stability_goal_input" not in st.session_state:
st.session_state.stability_goal_input = ""
with chip_col1:
st.button("📈 趋势", use_container_width=True, type="secondary", on_click=set_goal, args=("请分析各批次的杂质增长趋势,并判断是否符合限度要求。",))
with chip_col2:
st.button("🔮 货架期", use_container_width=True, type="secondary", on_click=set_goal, args=("基于现有数据,请预测24个月时的含量数据。",))
with chip_col3:
st.button("🏆 筛选", use_container_width=True, type="secondary", on_click=set_goal, args=("对比不同处方批次,找出最稳定的处方。",))
stability_goal = st.text_area(
"详细分析目标",
value=st.session_state.stability_goal_input,
placeholder="或在此手动描述...",
height=100,
key="stability_goal_input_area",
on_change=lambda: st.session_state.update({"stability_goal_input": st.session_state.stability_goal_input_area})
)
st.markdown("---")
stability_button = st.button("🚀 开始分析", use_container_width=True, type="primary", key="stability_btn")
with tab_general:
st.caption("制剂研发领域的专业问答助手")
question = st.text_area(
"问题描述",
placeholder="例如: ICH指南对稳定性试验有什么要求?",
height=150,
key="general_question",
label_visibility="collapsed"
)
st.markdown("---")
general_button = st.button("💡 咨询AI", use_container_width=True, type="primary", key="general_btn")
with col_output:
st.subheader("📊 分析结果")
# Determine if we have any active result
is_active = False
if compat_button or stability_button or general_button:
is_active = True
if st.session_state.get('analysis_type') == 'stability' and st.session_state.get('analysis_result'):
is_active = True
# Empty State
if not is_active:
st.markdown("""
<div style="text-align: center; padding: 60px 20px; color: #999;">
<div style="font-size: 60px; margin-bottom: 20px;">🧬</div>
<h3 style="color: #666; margin-bottom: 10px;">准备就绪</h3>
<p>请在左侧选择分析模式并输入信息<br>AI 专家系统将为您生成专业报告</p>
<div style="margin-top: 30px; display: flex; justify-content: center; gap: 20px;">
<div style="background: #f0f2f6; padding: 10px 20px; border-radius: 20px; font-size: 12px;">相容性预测</div>
<div style="background: #f0f2f6; padding: 10px 20px; border-radius: 20px; font-size: 12px;">稳定性分析</div>
<div style="background: #f0f2f6; padding: 10px 20px; border-radius: 20px; font-size: 12px;">专家问答</div>
</div>
</div>
""", unsafe_allow_html=True)
result_container = st.container()
# Handle button clicks
with result_container:
# Compatibility Analysis
if compat_button:
if not smiles or not excipient:
st.warning("请输入 SMILES 结构式和辅料名称")
elif not api_key:
st.warning("请在侧边栏配置 API Key")
else:
with st.spinner("正在解析分子结构并进行相容性推演..."):
html_report, _ = run_compatibility_analysis(
smiles=smiles,
excipient=excipient,
api_key=api_key,
provider=provider
)
branded_report = wrap_report_with_branding(html_report, "相容性分析")
# Toolbar (Download Buttons)
c1, c2, c3 = st.columns([2, 1, 1])
with c2:
st.download_button("📥 HTML", branded_report, f"Pharma_K_Compat_{smiles[:5]}.html", "text/html")
with c3:
pdf_data = generate_pdf_from_html(branded_report)
if pdf_data:
st.download_button("📄 PDF", pdf_data, f"Pharma_K_Compat_{smiles[:5]}.pdf", "application/pdf")
st.components.v1.html(branded_report, height=800, scrolling=True)
# Stability Analysis Logic
elif stability_button:
if not uploaded_files or not st.session_state.stability_goal_input:
st.warning("请上传数据文件并描述分析目标")
elif not api_key:
st.warning("请在侧边栏配置 API Key")
else:
with st.spinner("正在进行多维度稳定性数据分析..."):
html_report = run_stability_analysis(
goal=st.session_state.stability_goal_input,
files=uploaded_files,
api_key=api_key,
provider=provider
)
if html_report and len(html_report.strip()) > 100:
branded_report = wrap_report_with_branding(html_report, "稳定性分析")
branded_report = parse_and_render_charts(branded_report)
st.session_state.analysis_result = branded_report
st.session_state.analysis_type = "stability"
st.rerun()
else:
st.error("分析未返回有效结果")
# Display stability analysis result from session state
if st.session_state.get('analysis_type') == "stability" and st.session_state.get('analysis_result'):
branded_report = st.session_state.analysis_result
# Toolbar
c1, c2, c3 = st.columns([2, 1, 1])
with c1:
st.success("✅ 分析完成")
with c2:
st.download_button("📥 HTML", branded_report, "Stability_Report.html", "text/html")
with c3:
pdf_data = generate_pdf_from_html(branded_report)
if pdf_data:
st.download_button("📄 PDF", pdf_data, "Stability_Report.pdf", "application/pdf")
st.components.v1.html(branded_report, height=800, scrolling=True)
# General Q&A
elif general_button:
if not question:
st.warning("请输入问题")
elif not api_key:
st.warning("请配置 API Key")
else:
with st.spinner("专家系统正在思考..."):
html_response = run_general_analysis(question, api_key, provider)
branded_response = wrap_report_with_branding(html_response, "专家问答")
# Toolbar
c1, c2, c3 = st.columns([2, 1, 1])
with c2:
st.download_button("📥 HTML", branded_response, "QA_Response.html", "text/html")
with c3:
pdf_data = generate_pdf_from_html(branded_response)
if pdf_data:
st.download_button("📄 PDF", pdf_data, "QA_Response.pdf", "application/pdf")
st.components.v1.html(branded_response, height=600, scrolling=True)
# =============================================================================
# Entry Point
# =============================================================================
if __name__ == "__main__":
main()