Spaces:
Sleeping
Sleeping
| """ | |
| app.py - All-Disciplines Knowledge Assistant (Gradio) | |
| - Default local transformers model: bigscience/bloomz-1b1 | |
| - On startup, if transformers is available, attempt to download/load the model and print status steps. | |
| """ | |
| import os | |
| import io | |
| import time | |
| import json | |
| import requests | |
| import importlib | |
| import threading | |
| from typing import List, Tuple | |
| import gradio as gr | |
| # ----------------- Configuration ----------------- | |
| DEFAULT_LOCAL_MODEL = "bigscience/bloomz-1b1" # default stronger open-source model | |
| DEFAULT_HF_MODEL = "gpt2" | |
| DEFAULT_OPENAI_MODEL = "gpt-4" | |
| SYSTEM_PROMPT = ( | |
| "You are an encyclopedic, English-only scientific knowledge assistant. " | |
| "Reply in clear, accurate English and adapt depth to the user's audience level (High School, Undergraduate, Graduate, Expert). " | |
| "Do NOT generate quizzes, exam questions, or practice problems. If asked, refuse politely and offer explanatory material. " | |
| "When appropriate, include suggested further reading (textbooks, review articles, or authoritative websites)." | |
| ) | |
| # ----------------- Capability detection (lazy) ----------------- | |
| def has_module(name: str) -> bool: | |
| return importlib.util.find_spec(name) is not None | |
| _HAS_TRANSFORMERS = has_module("transformers") | |
| _HAS_SYMPY = has_module("sympy") | |
| _HAS_PYPDF2 = has_module("PyPDF2") | |
| _HAS_OPENAI = has_module("openai") | |
| if _HAS_SYMPY: | |
| import sympy as sp # type: ignore | |
| # Global model/pipeline holder and status messages | |
| _LOCAL_PIPELINE = None | |
| _LOAD_STATUS = "Not started" # updated during startup | |
| _LOAD_ERROR = None | |
| # ----------------- Utilities: model loading with status ----------------- | |
| def set_status(msg: str): | |
| global _LOAD_STATUS | |
| _LOAD_STATUS = msg | |
| print(f"[MODEL-STATUS] {msg}", flush=True) | |
| def load_local_transformers_model(local_model: str = DEFAULT_LOCAL_MODEL): | |
| """ | |
| Synchronously attempt to load a local transformers model. | |
| This prints & updates stage messages so the container logs clearly show progress. | |
| """ | |
| global _LOCAL_PIPELINE, _LOAD_STATUS, _LOAD_ERROR | |
| if not _HAS_TRANSFORMERS: | |
| _LOAD_ERROR = "transformers package not installed; local model unavailable." | |
| set_status(_LOAD_ERROR) | |
| return None | |
| try: | |
| set_status(f"Checking availability of model '{local_model}' in cache or HF hub...") | |
| # lazy import to avoid import-time crash | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM # type: ignore | |
| set_status("Downloading / loading tokenizer (this may take a while)...") | |
| tokenizer = AutoTokenizer.from_pretrained(local_model, use_fast=True) | |
| set_status("Downloading / loading model weights (this may take a while and use significant disk/memory)...") | |
| # Try to reduce peak memory use; let transformers choose device | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained(local_model, low_cpu_mem_usage=True) | |
| except TypeError: | |
| # older transformers may not have low_cpu_mem_usage | |
| model = AutoModelForCausalLM.from_pretrained(local_model) | |
| set_status("Initializing text-generation pipeline...") | |
| # create a text-generation pipeline; do_sample=False for deterministic output by default | |
| _LOCAL_PIPELINE = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
| set_status(f"Model '{local_model}' is ready and loaded into pipeline.") | |
| return _LOCAL_PIPELINE | |
| except Exception as e: | |
| _LOAD_ERROR = f"Failed to load local model '{local_model}': {e}" | |
| set_status(_LOAD_ERROR) | |
| return None | |
| # ----------------- Generators ----------------- | |
| def gen_with_local_transformers(prompt: str, local_model: str = DEFAULT_LOCAL_MODEL, max_new_tokens: int = 256, temperature: float = 0.2) -> str: | |
| global _LOCAL_PIPELINE | |
| if _LOCAL_PIPELINE is None: | |
| # Try to load on demand (synchronous) | |
| load_local_transformers_model(local_model) | |
| if _LOCAL_PIPELINE is None: | |
| return "[Local transformers unavailable] Model pipeline not ready." | |
| try: | |
| out = _LOCAL_PIPELINE(prompt, max_new_tokens=max_new_tokens, do_sample=False) | |
| text = out[0].get("generated_text", "") | |
| if text.startswith(prompt): | |
| text = text[len(prompt) :].strip() | |
| return text | |
| except Exception as e: | |
| return f"[Local transformers generation error] {e}" | |
| def gen_with_hf_inference(prompt: str, hf_token: str, model: str = DEFAULT_HF_MODEL, max_new_tokens: int = 256, temperature: float = 0.2) -> str: | |
| if not hf_token: | |
| return "[HuggingFace error] No HF token provided." | |
| headers = {"Authorization": f"Bearer {hf_token}", "Content-Type": "application/json"} | |
| url = f"https://api-inference.huggingface.co/models/{model}" | |
| payload = {"inputs": prompt, "parameters": {"max_new_tokens": max_new_tokens, "temperature": temperature}} | |
| try: | |
| r = requests.post(url, headers=headers, json=payload, timeout=120) | |
| r.raise_for_status() | |
| data = r.json() | |
| if isinstance(data, dict): | |
| if "generated_text" in data: | |
| return data["generated_text"].strip() | |
| if "error" in data: | |
| return f"[HuggingFace error] {data['error']}" | |
| return json.dumps(data) | |
| if isinstance(data, list) and len(data) > 0: | |
| first = data[0] | |
| if isinstance(first, dict) and "generated_text" in first: | |
| return first["generated_text"].strip() | |
| return str(first) | |
| return str(data) | |
| except Exception as e: | |
| return f"[HuggingFace HTTP error] {e}" | |
| def gen_with_openai(prompt: str, openai_key: str, model: str = DEFAULT_OPENAI_MODEL, temperature: float = 0.2, max_tokens: int = 600) -> str: | |
| if not _HAS_OPENAI: | |
| return "[OpenAI error] openai package not installed." | |
| try: | |
| import openai # type: ignore | |
| openai.api_key = openai_key | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt}] | |
| resp = openai.ChatCompletion.create(model=model, messages=messages, temperature=temperature, max_tokens=max_tokens) | |
| return resp["choices"][0]["message"]["content"].strip() | |
| except Exception as e: | |
| return f"[OpenAI error] {e}" | |
| def offline_answer(prompt: str) -> str: | |
| simple_kb = { | |
| "what is gravity": "Gravity pulls masses toward each other. See Newton's law and Einstein's general relativity.", | |
| "what is dna": "DNA encodes genetic information; see molecular biology textbooks and NCBI resources.", | |
| } | |
| q = prompt.lower() | |
| for k, v in simple_kb.items(): | |
| if k in q: | |
| return v + " (Offline mode; configure an LLM backend for richer answers.)" | |
| return ( | |
| "Offline mode: limited knowledge. To get detailed, up-to-date answers, configure a backend (OpenAI, Hugging Face Inference, or local transformers). " | |
| "Example: ask 'What is gravity?' or 'Explain DNA structure.'" | |
| ) | |
| def generate_answer(prompt: str, backend: str, openai_key: str, hf_token: str, hf_model: str, local_model: str, temperature: float = 0.2) -> str: | |
| backend = backend or "offline" | |
| if backend == "transformers_local": | |
| return gen_with_local_transformers(prompt, local_model=local_model, temperature=temperature) | |
| if backend == "huggingface_inference": | |
| return gen_with_hf_inference(prompt, hf_token=hf_token, model=hf_model, temperature=temperature) | |
| if backend == "openai": | |
| return gen_with_openai(prompt, openai_key=openai_key, model=DEFAULT_OPENAI_MODEL, temperature=temperature) | |
| return offline_answer(prompt) | |
| # ----------------- file/text extraction ----------------- | |
| def extract_text_from_file_obj(file_obj) -> str: | |
| if file_obj is None: | |
| return "" | |
| try: | |
| raw = file_obj.read() | |
| if hasattr(file_obj, "name") and file_obj.name.lower().endswith(".pdf") and _HAS_PYPDF2: | |
| try: | |
| import PyPDF2 # type: ignore | |
| reader = PyPDF2.PdfReader(io.BytesIO(raw)) | |
| pages = [p.extract_text() or "" for p in reader.pages] | |
| return "\n".join(pages) | |
| except Exception: | |
| pass | |
| try: | |
| return raw.decode("utf-8", errors="ignore") | |
| except Exception: | |
| return raw.decode("latin-1", errors="ignore") | |
| except Exception: | |
| return "" | |
| # ----------------- math helper ----------------- | |
| def math_solve_or_explain(expr: str, prefer_steps: bool = True, backend: str = "transformers_local", openai_key: str = "", hf_token: str = "", hf_model: str = DEFAULT_HF_MODEL, local_model: str = DEFAULT_LOCAL_MODEL) -> str: | |
| if not expr: | |
| return "Error: empty expression." | |
| if _HAS_SYMPY: | |
| try: | |
| if "=" in expr: | |
| lhs, rhs = expr.split("=", 1) | |
| eq = sp.Eq(sp.sympify(lhs), sp.sympify(rhs)) | |
| sol = sp.solve(eq) | |
| base = f"Analytic solution: {sol}\n" | |
| else: | |
| val = sp.simplify(sp.sympify(expr)) | |
| base = f"Simplified/symbolic result:\n{sp.pretty(val)}\n" | |
| if prefer_steps: | |
| prompt = f"Provide a clear step-by-step derivation for: {expr}\nInclude explanations for each step." | |
| return base + "\nStep-by-step:\n" + generate_answer(prompt, backend, openai_key, hf_token, hf_model, local_model) | |
| return base | |
| except Exception as e: | |
| # fallback to LLM | |
| return f"SymPy parse error: {e}\nFallback to LLM...\n" + generate_answer(f"Derive/solve: {expr}", backend, openai_key, hf_token, hf_model, local_model) | |
| # no sympy | |
| return generate_answer(f"Derive/solve: {expr}", backend, openai_key, hf_token, hf_model, local_model) | |
| # ----------------- prompt builder ----------------- | |
| def build_science_prompt(question: str, discipline: str, audience: str, depth: str) -> str: | |
| prompt = ( | |
| f"Discipline: {discipline}\nAudience: {audience}\nDepth: {depth}\n\n" | |
| f"Question: {question}\n\n" | |
| "Please reply in clear English and include:\n" | |
| "1) A short direct answer (2-4 sentences).\n" | |
| "2) Underlying principles and reasoning (use LaTeX for equations if needed).\n" | |
| "3) Experimental/observational evidence if applicable.\n" | |
| "4) Real-world applications if applicable.\n" | |
| "5) Current consensus and open questions.\n" | |
| "6) Three suggested further reading items (textbooks, review articles, or authoritative websites).\n\n" | |
| "IMPORTANT: DO NOT generate quizzes, exam questions, or practice problems. If requested, refuse and provide explanatory content instead." | |
| ) | |
| return prompt | |
| # ----------------- disciplines (expanded) ----------------- | |
| SCIENCE_DISCIPLINES = [ | |
| "Physics", "Condensed Matter Physics", "Particle Physics", "Quantum Physics", "Astrophysics", | |
| "Chemistry", "Physical Chemistry", "Organic Chemistry", "Inorganic Chemistry", "Analytical Chemistry", | |
| "Biology", "Molecular Biology", "Cell Biology", "Genetics", "Evolutionary Biology", | |
| "Mathematics", "Applied Mathematics", "Statistics", "Probability", "Numerical Analysis", | |
| "Earth Science", "Geology", "Geophysics", "Oceanography", "Atmospheric Science", | |
| "Materials Science", "Nanoscience", "Biomaterials", | |
| "Engineering", "Mechanical Engineering", "Electrical Engineering", "Civil Engineering", "Aerospace Engineering", "Chemical Engineering", "Biomedical Engineering", "Robotics", | |
| "Computer Science", "AI/ML", "Theoretical CS", "Systems & Networking", "Human-Computer Interaction", | |
| "Neuroscience", "Cognitive Science", "Psychology", "Behavioral Neuroscience", | |
| "Ecology", "Environmental Science", "Climate Science", "Paleontology", "Planetary Science", | |
| "Biophysics", "Systems Biology", "Biomedical Research", "Philosophy of Science", "History of Science", | |
| "Interdisciplinary" | |
| ] | |
| # ----------------- Gradio functions ----------------- | |
| def chat_handler(user_message: str, history: List[Tuple[str, str]], discipline: str, audience: str, depth: str, | |
| backend: str, openai_key: str, hf_token: str, hf_model: str, local_model: str, temperature: float): | |
| if user_message is None: | |
| return history, history | |
| banned_terms = ["quiz", "exam", "test", "exercise", "practice problem", "problem set"] | |
| if any(t in user_message.lower() for t in banned_terms): | |
| reply = "I do not generate quizzes, exam questions, or practice problems. I can provide detailed explanations, derivations, and suggested readings." | |
| history = history + [(user_message, reply)] | |
| return history, history | |
| chinese_tokens = ["请", "出题", "练习题", "测验", "题目", "考试"] | |
| if any(t in user_message for t in chinese_tokens): | |
| reply = "Please ask your question in English. This assistant operates in English only." | |
| history = history + [(user_message, reply)] | |
| return history, history | |
| prompt = build_science_prompt(user_message, discipline, audience, depth) | |
| resp = generate_answer(prompt, backend, openai_key, hf_token, hf_model, local_model, temperature=temperature) | |
| history = history + [(user_message, resp)] | |
| return history, history | |
| def document_summarizer(file_obj, backend: str, openai_key: str, hf_token: str, hf_model: str, local_model: str, audience: str): | |
| text = extract_text_from_file_obj(file_obj) | |
| if not text: | |
| return "Could not read the file or it appears empty." | |
| excerpt = text[:20000] | |
| prompt = ( | |
| f"You are a scholarly reader. Audience: {audience}. Based on the text below, provide:\n" | |
| "1) A concise abstract (150-300 words).\n" | |
| "2) Key methods and data sources.\n" | |
| "3) Main conclusions and an assessment of confidence.\n" | |
| "4) Limitations and suggestions for future work.\n" | |
| "5) Suggested references or types of literature to check.\n\n" | |
| f"Text begins:\n{excerpt}" | |
| ) | |
| return generate_answer(prompt, backend, openai_key, hf_token, hf_model, local_model) | |
| def math_ui_handler(expr: str, prefer_steps: bool, backend: str, openai_key: str, hf_token: str, hf_model: str, local_model: str): | |
| return math_solve_or_explain(expr, prefer_steps, backend=backend, openai_key=openai_key, hf_token=hf_token, hf_model=hf_model, local_model=local_model) | |
| # ----------------- Build Gradio UI ----------------- | |
| def build_ui(): | |
| with gr.Blocks(title="All-Disciplines Knowledge Assistant (English)") as demo: | |
| gr.Markdown("# 🌐 All-Disciplines Knowledge Assistant — English Only") | |
| # show the model load status in the UI | |
| gr.Markdown(f"**Local model (default):** `{DEFAULT_LOCAL_MODEL}`") | |
| gr.Markdown(f"**Current load status:** `{_LOAD_STATUS}`") | |
| gr.Markdown("This assistant refuses to create quizzes/exams. Provide API keys below to enable OpenAI or Hugging Face Inference. " | |
| "If you want to use a local model, ensure `transformers` and `torch` are installed and provide the local model name (default above).") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| discipline = gr.Dropdown(label="Discipline", choices=SCIENCE_DISCIPLINES, value="Interdisciplinary") | |
| audience = gr.Dropdown(label="Audience level", choices=["High School", "Undergraduate", "Graduate", "Expert"], value="Undergraduate") | |
| depth = gr.Radio(label="Depth", choices=["overview", "detailed", "technical"], value="detailed") | |
| gr.Markdown("---\n**API keys / tokens (optional)**") | |
| openai_key = gr.Textbox(label="OpenAI API Key (paste here)", type="password") | |
| hf_token = gr.Textbox(label="Hugging Face API Token (paste here)", type="password") | |
| gr.Markdown("---\n**Backend selection**") | |
| backend = gr.Dropdown(label="Preferred backend", choices=["transformers_local", "huggingface_inference", "openai", "offline"], value=("transformers_local" if _HAS_TRANSFORMERS else ("huggingface_inference" if not _HAS_TRANSFORMERS else "offline"))) | |
| gr.Markdown("Model settings (for HF / local transformers)") | |
| hf_model = gr.Textbox(label="Hugging Face Inference model name (e.g. gpt2 or bigscience/bloom)", value=DEFAULT_HF_MODEL) | |
| local_model = gr.Textbox(label="Local transformers model name (for transformers_local)", value=DEFAULT_LOCAL_MODEL) | |
| temperature = gr.Slider(label="temperature", minimum=0.0, maximum=1.0, value=0.2, step=0.05) | |
| gr.Markdown("---\n**Conversation**") | |
| chatbot = gr.Chatbot(label="Conversation") | |
| state = gr.State([]) | |
| user_input = gr.Textbox(label="Enter your scientific question in English", lines=3) | |
| send = gr.Button("Ask") | |
| send.click(fn=chat_handler, | |
| inputs=[user_input, state, discipline, audience, depth, backend, openai_key, hf_token, hf_model, local_model, temperature], | |
| outputs=[chatbot, state]) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Upload paper / report (PDF or TXT)") | |
| file_in = gr.File(label="Upload PDF or TXT") | |
| file_audience = gr.Dropdown(label="Summary audience", choices=["Undergraduate", "Graduate", "Expert"], value="Graduate") | |
| summarize_btn = gr.Button("Extract key points & evaluate methods") | |
| summary_out = gr.Textbox(label="Summary & Evaluation", lines=15) | |
| summarize_btn.click(fn=document_summarizer, | |
| inputs=[file_in, backend, openai_key, hf_token, hf_model, local_model, file_audience], | |
| outputs=[summary_out]) | |
| gr.Markdown("---") | |
| gr.Markdown("### Symbolic math / derivations") | |
| expr = gr.Textbox(label="Enter expression or equation (LaTeX or plain)", lines=2) | |
| prefer_steps = gr.Checkbox(label="Provide detailed derivation steps if possible", value=True) | |
| math_run = gr.Button("Derive / Solve") | |
| math_out = gr.Textbox(label="Derivation result", lines=12) | |
| math_run.click(fn=math_ui_handler, | |
| inputs=[expr, prefer_steps, backend, openai_key, hf_token, hf_model, local_model], | |
| outputs=[math_out]) | |
| gr.Markdown("---\n**Disclaimer**: This assistant may produce incorrect or outdated information. For critical decisions, consult primary literature and domain experts.") | |
| return demo | |
| # ----------------- Main: load local model synchronously at startup (status shown) ----------------- | |
| if __name__ == "__main__": | |
| print("Starting All-Disciplines Knowledge Assistant...") | |
| print("Optional packages detected: transformers=", _HAS_TRANSFORMERS, "sympy=", _HAS_SYMPY, "PyPDF2=", _HAS_PYPDF2) | |
| # Attempt to load the default local model synchronously to show startup progress in logs | |
| if _HAS_TRANSFORMERS: | |
| print(f"Attempting to load default local model '{DEFAULT_LOCAL_MODEL}'. This may take time and download files. Check logs for progress.") | |
| set_status("Startup: beginning local model load...") | |
| load_local_transformers_model(DEFAULT_LOCAL_MODEL) | |
| else: | |
| set_status("transformers package not installed; local model unavailable (use HF Inference or OpenAI backends).") | |
| # Start Gradio app | |
| app = build_ui() | |
| app.launch(server_name="0.0.0.0", share=False) | |