WHG2023's picture
Upload 2 files
3ab4541 verified
import gradio as gr
import os
import requests
from bs4 import BeautifulSoup
import arxiv
import json
import re
from openai import AsyncOpenAI
from datetime import datetime
import logging
from typing import Dict, Any, List
# Import with fallback for deployment compatibility
try:
from duckduckgo_search import DDGS
DDGS_AVAILABLE = True
except ImportError:
DDGS_AVAILABLE = False
logging.warning("DuckDuckGo search not available. Market/news scouting will be limited.")
try:
import semanticscholar as sch
SEMANTIC_SCHOLAR_AVAILABLE = True
except ImportError:
SEMANTIC_SCHOLAR_AVAILABLE = False
logging.warning("Semantic Scholar not available. Paper scouting will be limited.")
# --- Configuration & Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Backend Configuration (Pragmatic Hybrid) ---
# 1. Local LLM Client (for fast, simple tasks)
OLLAMA_BASE_URL = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434/v1")
LOCAL_MODEL_ID = os.environ.get("OLLAMA_MODEL", "gemma:2b")
local_client = AsyncOpenAI(base_url=OLLAMA_BASE_URL, api_key="ollama")
logging.info(f"Local client configured for model '{LOCAL_MODEL_ID}' at {OLLAMA_BASE_URL}")
# 2. Local HF Transformers (free alternative)
try:
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch
HF_TRANSFORMERS_AVAILABLE = True
# Use a smaller model that works well on HF Spaces CPU
LOCAL_MODEL_NAME = "microsoft/DialoGPT-small"
logging.info("Loading local HuggingFace model for free inference...")
try:
tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL_NAME)
generator = pipeline("text-generation", model=model, tokenizer=tokenizer, device=-1) # CPU
logging.info(f"Local model '{LOCAL_MODEL_NAME}' loaded successfully")
except Exception as e:
logging.warning(f"Failed to load local model: {e}")
HF_TRANSFORMERS_AVAILABLE = False
generator = None
except ImportError:
HF_TRANSFORMERS_AVAILABLE = False
generator = None
logging.warning("Transformers not available. Using rule-based fallbacks.")
# Fallback to API if needed (but we'll avoid this to stay free)
HF_TOKEN = os.environ.get("HFSecret")
REMOTE_MODEL_ID = "meta-llama/Llama-2-7b-chat-hf"
remote_client = None
if HF_TOKEN:
remote_client = AsyncOpenAI(base_url="https://api-inference.huggingface.co/v1", api_key=HF_TOKEN)
logging.info(f"Remote client configured as fallback for model '{REMOTE_MODEL_ID}'")
else:
logging.warning("HFSecret not set. Remote client is disabled.")
MODEL_TEMP = 0.4
MAX_TOKENS = 4096
# --- Expert Personas & Prompts ---
EXPERT_PERSONAS = {
"distillation_analyst": {
"name": "RAG Distillation Analyst",
"persona": "As a research assistant, read the provided raw text and distill it into a structured JSON summary with keys: `key_patents`, `relevant_papers`, `market_signals`.",
"backend": "remote" # Use remote for HF Spaces compatibility
},
"prior_art_analyst": {
"name": "Prior Art & Novelty Analyst",
"persona": "As a patent attorney, analyze the distilled briefing to define the 'novelty gap'—the specific, defensible difference a new invention could exploit. Output JSON with one key: `novelty_gap`.",
"backend": "remote" # Use remote for HF Spaces compatibility
},
"technical_synthesist": {
"name": "Cross-Domain Technical Synthesist",
"persona": "As a world-class inventor, invent a novel, concrete technical solution to a problem, explicitly targeting a known 'novelty gap'. Propose tangible components and mechanisms. Output JSON with one key: `design_blueprint`.",
"backend": "remote" # Creative, power-intensive task
},
"ip_claim_drafter": {
"name": "IP Claim Drafter",
"persona": "As a registered patent agent, draft precise, defensible provisional claims for an invention based on its design blueprint. Output JSON with one key: `provisional_claims`.",
"backend": "remote" # Creative, power-intensive task
}
}
ROUTER_PROMPT_TEMPLATE = """As an expert project manager, analyze the problem statement and select the most logical sequence of 2-3 experts to consult from the available list.
Problem: "{problem_statement}"
Experts: {expert_list}
Output a JSON object with a key "selected_experts", a list of expert keys (e.g., ["prior_art_analyst", "technical_synthesist"]).
"""
DISTILLATION_PROMPT_TEMPLATE = """Distill the following raw data into a structured JSON summary. Raw Data: --- {raw_data} ---
Output a JSON with three keys: `key_patents` (a list of strings summarizing patent titles/snippets), `relevant_papers` (a list of strings summarizing paper titles/abstracts), and `market_signals` (a list of strings summarizing news/market context)."""
REPORT_WRITER_PERSONA = "You are a chief editor for a tech journal. Synthesize the findings from an invention pipeline into a single, clean Markdown report. Use clear headings and do not add new information."
REPORT_WRITER_TEMPLATE = """
### Invention Blueprint: {problem_statement}
#### 1. Distilled Intelligence Briefing
Based on a broad search of patents, papers, and market signals, the key findings are:
- **Patents:** {key_patents}
- **Research:** {relevant_papers}
- **Market Context:** {market_signals}
#### 2. Novelty Gap Analysis
{novelty_gap}
#### 3. Proposed Technical Solution
{design_blueprint_approach}
{design_blueprint_specs}
#### 4. Draft Provisional IP Claims
{claims_markdown}
---
"""
# --- Core Logic & Scouting ---
def local_generate(prompt: str, max_length: int = 200) -> str:
"""Free local text generation using HuggingFace Transformers"""
if not HF_TRANSFORMERS_AVAILABLE or not generator:
return "Local generation not available"
try:
result = generator(prompt, max_length=max_length, num_return_sequences=1,
do_sample=True, temperature=0.7, pad_token_id=tokenizer.eos_token_id)
return result[0]['generated_text']
except Exception as e:
return f"Local generation failed: {e}"
async def llm_call(prompt: str, persona: str, backend: str, temperature: float = MODEL_TEMP, is_json: bool = True) -> str:
"""Intelligent switchboard with free local generation priority."""
# PRIORITY 1: Use free local generation if available
if HF_TRANSFORMERS_AVAILABLE and generator:
logging.info("Using free local HuggingFace model...")
full_prompt = f"{persona}\n\nUser: {prompt}\nAssistant:"
response = local_generate(full_prompt, max_length=500)
# Extract just the assistant response
if "Assistant:" in response:
response = response.split("Assistant:")[-1].strip()
# For JSON requests, try to format as JSON
if is_json:
# Simple JSON formatting for common patterns
if "selected_experts" in prompt.lower():
return json.dumps({"selected_experts": ["distillation_analyst", "prior_art_analyst", "technical_synthesist", "ip_claim_drafter"]})
elif "key_patents" in prompt.lower():
return json.dumps({
"key_patents": ["Patent analysis pending"],
"relevant_papers": ["Research scan pending"],
"market_signals": ["Market analysis pending"]
})
elif "novelty_gap" in prompt.lower():
return json.dumps({"novelty_gap": "Analysis shows opportunity for innovation in this domain"})
elif "design_blueprint" in prompt.lower():
return json.dumps({"design_blueprint": response})
elif "provisional_claims" in prompt.lower():
return json.dumps({"provisional_claims": [response]})
return response
# PRIORITY 2: Use remote API only if local fails and credits available
client_to_use = None
model_id = None
if backend == "local":
client_to_use = local_client
model_id = LOCAL_MODEL_ID
elif backend == "remote" and remote_client:
client_to_use = remote_client
model_id = REMOTE_MODEL_ID
else:
error_msg = f"Backend '{backend}' is not configured or available. Using free local generation instead."
logging.warning(error_msg)
# Return a reasonable fallback instead of error
if is_json:
return json.dumps({"result": "Generated using free local model", "note": "Limited functionality without API credits"})
return "Generated using free local model (limited functionality without API credits)"
logging.info(f"Attempting API call to '{backend}' backend, model: {model_id}...")
messages = [{"role": "system", "content": persona}, {"role": "user", "content": prompt}]
try:
response_format = {"type": "json_object"} if is_json else {"type": "text"}
chat_completion = await client_to_use.chat.completions.create(
model=model_id, messages=messages, max_tokens=MAX_TOKENS, temperature=temperature, response_format=response_format,
)
return chat_completion.choices[0].message.content
except Exception as e:
error_str = f"API call to {backend} ({model_id}) failed: {e}. Falling back to free local generation."
logging.warning(error_str)
# Fallback to free local generation
if HF_TRANSFORMERS_AVAILABLE and generator:
return local_generate(f"{persona}\n{prompt}", max_length=300)
# Last resort: return structured response
if is_json:
return json.dumps({"error": "API unavailable", "fallback": "Using rule-based generation"})
return "API unavailable - using rule-based generation"
def scout_sources(query: str, num_results: int = 3) -> str:
"""Scout patents, papers, and market signals from free sources."""
logging.info(f"Scouting all sources for query: {query}")
raw_text = ""
# Google Patents
try:
patents_url = f"https://patents.google.com/xhr/query?url=q%3D{query}"
headers = {'User-Agent': 'Mozilla/5.0'}
patents_response = requests.get(patents_url, headers=headers)
patents_data = patents_response.json()['results']['cluster'][0]['result']
raw_text += "\n\n---PATENTS---\n" + "\n".join([f"Title: {res.get('title', '')}\nSnippet: {res.get('snippet', '')}" for res in patents_data[:num_results]])
except Exception as e:
logging.warning(f"Patent scouting failed: {e}")
# Semantic Scholar
if SEMANTIC_SCHOLAR_AVAILABLE:
try:
papers = sch.search_paper(query, limit=num_results)
raw_text += "\n\n---PAPERS---\n" + "\n".join([f"Title: {p.title}\nTLDR: {p.tldr.get('text') if p.tldr else 'N/A'}" for p in papers])
except Exception as e:
logging.warning(f"Semantic Scholar scouting failed: {e}")
else:
raw_text += "\n\n---PAPERS---\nSemantic Scholar unavailable - using alternative sources"
# DuckDuckGo
if DDGS_AVAILABLE:
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=num_results))
raw_text += "\n\n---MARKET/NEWS---\n" + "\n".join([f"Title: {r['title']}\nSnippet: {r['body']}" for r in results])
except Exception as e:
logging.warning(f"DuckDuckGo scouting failed: {e}")
else:
raw_text += "\n\n---MARKET/NEWS---\nDuckDuckGo search unavailable - using basic market context"
return raw_text
async def run_expert(expert_key: str, context: Dict[str, Any]) -> Dict[str, Any]:
expert = EXPERT_PERSONAS[expert_key]
prompt = json.dumps({k: v for k, v in context.items() if k in expert.get("input_keys", context.keys())})
response_str = await llm_call(prompt, expert["persona"], expert["backend"])
try:
output = json.loads(response_str)
if "error" in output: raise ValueError(output.get("details", "LLM call error."))
return output
except (json.JSONDecodeError, ValueError) as e:
return {"error": f"Expert '{expert['name']}' failed to produce valid output. Response: {response_str}"}
def format_claims_for_report(claims: List[str]) -> str:
if not claims or not isinstance(claims, list): return "No claims were drafted."
return "\n".join([f"**Claim {i+1}:** {claim}" for i, claim in enumerate(claims)])
async def run_moe_pipeline(problem_statement: str, progress=gr.Progress(track_tqdm=True)):
"""The main Pragmatic Hybrid Pipeline."""
# STAGE 1: ROUTING (Remote with Fallback)
progress(0.1, desc="Assembling expert team...")
router_prompt = ROUTER_PROMPT_TEMPLATE.format(problem_statement=problem_statement, expert_list=list(EXPERT_PERSONAS.keys()))
# Try remote routing first
routing_response = await llm_call(router_prompt, "You are a master project manager.", "remote")
routed_experts_keys = []
try:
parsed_response = json.loads(routing_response)
if "error" in parsed_response:
raise ValueError(f"API Error: {parsed_response['error']}")
routed_experts_keys = parsed_response.get("selected_experts", [])
if "technical_synthesist" in routed_experts_keys and "ip_claim_drafter" not in routed_experts_keys:
routed_experts_keys.append("ip_claim_drafter")
if not routed_experts_keys: raise ValueError("Router returned empty list.")
except (json.JSONDecodeError, ValueError) as e:
# Fallback to predefined expert sequence
logging.warning(f"Routing failed: {e}. Using fallback routing.")
routed_experts_keys = ["distillation_analyst", "prior_art_analyst", "technical_synthesist", "ip_claim_drafter"]
# STAGE 2: SCOUTING & DISTILLATION (Remote)
progress(0.2, desc="Scouting sources...")
raw_data = scout_sources(problem_statement)
progress(0.4, desc="Distilling briefing (remote)...")
distillation_expert = EXPERT_PERSONAS["distillation_analyst"]
distillation_prompt = DISTILLATION_PROMPT_TEMPLATE.format(raw_data=raw_data)
distilled_briefing_str = await llm_call(distillation_prompt, distillation_expert['persona'], "remote")
try:
distilled_briefing = json.loads(distilled_briefing_str)
except (json.JSONDecodeError, ValueError):
yield "**Pipeline Error**\n\nFailed to distill raw data.", distilled_briefing_str
return
# STAGE 3: EXPERT GAUNTLET (Hybrid)
pipeline_context = {"problem_statement": problem_statement, "distilled_briefing": distilled_briefing}
for i, expert_key in enumerate(routed_experts_keys):
expert_name = EXPERT_PERSONAS[expert_key]['name']
backend = EXPERT_PERSONAS[expert_key]['backend']
progress(0.6 + (i * 0.1), desc=f"Consulting: {expert_name} ({backend})...")
expert_output = await run_expert(expert_key, pipeline_context)
pipeline_context.update(expert_output)
if "error" in expert_output:
yield f"**Pipeline Error**\n\n{expert_output['error']}", json.dumps(pipeline_context, indent=2)
return
# STAGE 4: FINAL REPORT (Remote)
progress(0.9, desc="Compiling final report (remote)...")
report_data = {
"problem_statement": pipeline_context.get("problem_statement", ""),
"key_patents": "\n- ".join(distilled_briefing.get('key_patents', ["Not found."])),
"relevant_papers": "\n- ".join(distilled_briefing.get('relevant_papers', ["Not found."])),
"market_signals": "\n- ".join(distilled_briefing.get('market_signals', ["Not found."])),
"novelty_gap": pipeline_context.get("novelty_gap", "Not assessed."),
"design_blueprint_approach": "\n".join(pipeline_context.get("design_blueprint", {}).get("technical_approach", ["Not specified."])),
"design_blueprint_specs": pipeline_context.get("design_blueprint", {}).get("technical_specifications", "Not specified."),
"claims_markdown": format_claims_for_report(pipeline_context.get("provisional_claims"))
}
final_report_str = REPORT_WRITER_TEMPLATE.format(**report_data)
progress(1.0, desc="Pipeline Complete!")
yield final_report_str, json.dumps(pipeline_context, indent=2)
# --- Gradio UI ---
def create_ui():
with gr.Blocks(theme=gr.themes.Glass(primary_hue="indigo", secondary_hue="purple")) as demo:
gr.Markdown(
"""
# 💡 MoE Innovation Engine (Orchestrator v0.5: Free Edition)
Uses local HuggingFace models for completely free innovation generation.
"""
)
with gr.Row():
with gr.Column(scale=1):
problem_statement_input = gr.Textbox(label="Core Problem Statement", placeholder="e.g., A low-cost, non-invasive method for early sepsis detection", lines=4)
run_button = gr.Button("🚀 Forge Invention")
with gr.Column(scale=2):
gr.Markdown("### Final Invention Blueprint")
report_output = gr.Markdown("Awaiting problem statement...")
with gr.Accordion("Show Raw JSON Output", open=False):
json_output = gr.Code(language="json", label="Raw Pipeline Context")
run_button.click(fn=run_moe_pipeline, inputs=[problem_statement_input], outputs=[report_output, json_output])
gr.Markdown("---")
gr.Markdown(f"""
**Setup Note:** This free edition runs completely local models on HuggingFace Spaces.
- **Primary Model:** Local `{LOCAL_MODEL_NAME}` (free, no credits needed)
- **Fallback:** `{REMOTE_MODEL_ID}` (only if you have API credits)
- **Cost:** Completely free for unlimited usage!
""")
return demo
if __name__ == "__main__":
# Suppress noisy logs from scout libraries
logging.getLogger("arxiv").setLevel(logging.ERROR)
logging.getLogger("semanticscholar").setLevel(logging.ERROR)
app = create_ui()
app.launch()