{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Approach 1: Local Llama2 via Ollama\n", "\n", "questions = [\n", " \"How do coral proteins help make eco-friendly fabrics without dyes?\",\n", " \"What environmental problems do coral-inspired textiles solve?\",\n", " \"What is industrial symbiosis and how does the Kalundborg example work?\",\n", " \"How do Metavision sensors work like human eyes to save energy?\",\n", " \"How does TISSIUM copy skin proteins for medical adhesives?\",\n", " \"How does DNA-level design create better fibers inspired by nature?\",\n", " \"Why is industrial symbiosis hard to implement despite benefits?\",\n", " \"How can biological systems inspire sustainable manufacturing?\",\n", " \"What other industries can use protein-based materials like Werewool?\",\n", " \"How could event-based cameras improve security systems?\",\n", " \"Design a factory network that works like coral reef partnerships - what features would it need?\"\n", "]\n", "\n", "\n", "import json\n", "import pandas as pd\n", "from langchain_ollama import OllamaLLM, OllamaEmbeddings\n", "from langchain_community.vectorstores import FAISS\n", "from langchain_core.prompts import PromptTemplate\n", "from langchain_core.output_parsers import StrOutputParser\n", "from operator import itemgetter\n", "import gradio as gr\n", "\n", "# Load and process data\n", "with open('mini_data.json', 'r', encoding='utf-8') as f:\n", " data = json.load(f)\n", "documents = [f\"Source: {item['Source']}\\nApplication: {item['Application']}\\nFunction1: {item['Function1']}\\nStrategy: {item['Strategy']}\" for item in data]\n", "\n", "# Local Llama2 setup\n", "local_model = OllamaLLM(model=\"llama2\")\n", "local_embeddings = OllamaEmbeddings(model=\"llama2\")\n", "vectorstore = FAISS.from_texts(documents, local_embeddings)\n", "retriever = vectorstore.as_retriever()\n", "\n", "# RAG pipeline\n", "template = \"\"\"Answer the question based on the context below. If unsure, reply \"I don't know\".\n", "Context: {context}\n", "Question: {question}\"\"\"\n", "prompt = PromptTemplate.from_template(template)\n", "local_chain = ({\"context\": itemgetter(\"question\") | retriever, \"question\": itemgetter(\"question\")} \n", " | prompt | local_model | StrOutputParser())\n", "\n", "# Chat interface\n", "def local_rag(question, history):\n", " response = local_chain.invoke({\"question\": question})\n", " history.append((question, response))\n", " return \"\", history\n", "\n", "with gr.Blocks() as local_demo:\n", " gr.Markdown(\"# Local Llama2 RAG Chatbot\")\n", " chatbot = gr.Chatbot()\n", " question = gr.Textbox(label=\"Ask about biomimicry:\")\n", " question.submit(local_rag, [question, chatbot], [question, chatbot])\n", " \n", "local_demo.launch()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Approach 2: Llama3.3 via API\n", "import json\n", "import gradio as gr\n", "from openai import OpenAI\n", "from operator import itemgetter\n", "\n", "# API configuration\n", "api_key = 'd9960fad1d2aaa16167902b0d26e369f'\n", "base_url = \"https://chat-ai.academiccloud.de/v1\"\n", "model = \"llama-3.3-70b-instruct\"\n", "\n", "# Initialize OpenAI client\n", "client = OpenAI(api_key=api_key, base_url=base_url)\n", "\n", "# Load and process data\n", "with open('mini_data.json', 'r', encoding='utf-8') as f:\n", " data = json.load(f)\n", "documents = [f\"Source: {item['Source']}\\nApplication: {item['Application']}\\nFunction1: {item['Function1']}\\nStrategy: {item['Strategy']}\" for item in data]\n", "\n", "def retrieve_context(question):\n", " \"\"\"Simple keyword-based retrieval since embeddings aren't available\"\"\"\n", " keywords = set(question.lower().split())\n", " relevant = []\n", " for doc in documents:\n", " if any(keyword in doc.lower() for keyword in keywords):\n", " relevant.append(doc)\n", " return \"\\n\\n\".join(relevant[:3]) # Return top 3 matches\n", "\n", "def generate_response(question):\n", " context = retrieve_context(question)\n", " response = client.chat.completions.create(\n", " messages=[\n", " {\"role\": \"system\", \"content\": f\"Answer based on context. If unsure, say 'I don't know'.\\nContext: {context}\"},\n", " {\"role\": \"user\", \"content\": question}\n", " ],\n", " model=model\n", " )\n", " return response.choices[0].message.content\n", "\n", "# Chat interface\n", "def cloud_rag(question, history):\n", " response = generate_response(question)\n", " history.append((question, response))\n", " return \"\", history\n", "\n", "with gr.Blocks() as demo:\n", " gr.Markdown(\"# AskNature RAG-based Chatbot\")\n", " chatbot = gr.Chatbot()\n", " question = gr.Textbox(label=\"Ask about biomimicry:\")\n", " question.submit(cloud_rag, [question, chatbot], [question, chatbot])\n", " \n", "demo.launch()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Enhanced Metadata Generation with Rate Control and Incremental Processing\n", "import json\n", "import time\n", "import random\n", "from typing import Dict, List\n", "from openai import OpenAI\n", "from tenacity import retry, stop_after_attempt, wait_random_exponential\n", "import os\n", "\n", "# Initialize OpenAI client\n", "client = OpenAI(\n", " api_key= 'd9960fad1d2aaa16167902b0d26e369f', # 'd1c9ed1ca70b9721dee1087d93f9662a',\n", " base_url=\"https://chat-ai.academiccloud.de/v1\"\n", ")\n", "\n", "@retry(stop=stop_after_attempt(5), wait=wait_random_exponential(min=2, max=60))\n", "def generate_metadata_tags(strategy_text: str) -> Dict:\n", " \"\"\"Generate structured metadata with enhanced error handling\"\"\"\n", " system_prompt = \"\"\"Analyze the technical text and generate structured metadata:\n", "1. **Technical Concepts** (array, max 5 items): Specific technical terms/methods\n", "2. **Biological Mechanisms** (array, max 3): Biological processes observed in nature\n", "3. **Industry Applications** (array, max 3): Practical commercial uses\n", "4. **Sustainability Impacts** (array, max 2): Environmental benefits\n", "\n", "Example Response:\n", "{\n", " \"technical_concepts\": [\"protein-based pigmentation\", \"DNA-level fiber design\"],\n", " \"biological_mechanisms\": [\"coral-algae symbiosis\"],\n", " \"industry_applications\": [\"textile manufacturing\"],\n", " \"sustainability_impacts\": [\"reduces chemical waste\"]\n", "}\"\"\"\n", "\n", " response = client.chat.completions.create(\n", " messages=[\n", " {\"role\": \"system\", \"content\": system_prompt},\n", " {\"role\": \"user\", \"content\": strategy_text}\n", " ],\n", " model=\"llama-3.3-70b-instruct\",\n", " temperature=0.1,\n", " response_format={\"type\": \"json_object\"}\n", " )\n", " \n", " return validate_metadata(json.loads(response.choices[0].message.content))\n", "\n", "def validate_metadata(metadata: Dict) -> Dict:\n", " \"\"\"Ensure metadata structure quality\"\"\"\n", " required_keys = {\n", " \"technical_concepts\": list,\n", " \"biological_mechanisms\": list,\n", " \"industry_applications\": list,\n", " \"sustainability_impacts\": list\n", " }\n", " \n", " for key, type_ in required_keys.items():\n", " if key not in metadata or not isinstance(metadata[key], type_):\n", " raise ValueError(f\"Invalid metadata format for {key}\")\n", " \n", " return metadata\n", "\n", "def enhance_dataset(input_file: str, output_file: str):\n", " \"\"\"Robust incremental metadata enhancement with rate control\"\"\"\n", " # Load existing enhanced data\n", " existing_data = []\n", " existing_hyperlinks = set()\n", " \n", " if os.path.exists(output_file):\n", " with open(output_file, 'r') as f:\n", " existing_data = json.load(f)\n", " existing_hyperlinks = {item[\"Hyperlink\"] for item in existing_data if \"Hyperlink\" in item}\n", " \n", " # Load input data and filter unprocessed items\n", " with open(input_file, 'r') as f:\n", " input_data = json.load(f)\n", " \n", " new_items = [item for item in input_data if item.get(\"Hyperlink\") not in existing_hyperlinks]\n", " \n", " if not new_items:\n", " print(\"All items already processed in the enhanced file.\")\n", " return\n", " else:\n", " output_length = len(existing_data)\n", " input_length = len(input_data)\n", " print(f\"Processing {len(new_items)} new items... out of {input_length} total\")\n", " \n", " results = existing_data.copy()\n", " error_count = 0\n", " total_items = len(new_items)\n", " \n", " for idx, item in enumerate(new_items):\n", " try:\n", " # Enhanced rate control with progressive backoff\n", " if idx > 0:\n", " base_delay = min(5 + (idx // 10), 30) # Progressive delay up to 30s\n", " delay = random.uniform(base_delay, base_delay + 5)\n", " time.sleep(delay)\n", " \n", " # Process item\n", " metadata = generate_metadata_tags(item[\"Strategy\"])\n", " enhanced_item = {**item, **metadata}\n", " results.append(enhanced_item)\n", " \n", " # Checkpoint saving\n", " if (idx + 1) % 5 == 0 or (idx + 1) == total_items:\n", " with open(output_file, 'w') as f:\n", " json.dump(results, f, indent=2)\n", " print(f\"Progress: {idx+1+output_length}/{input_length} items processed\")\n", " \n", " except Exception as e:\n", " error_count += 1\n", " print(f\"Error processing {item.get('Source', 'Unknown')}: {str(e)}\")\n", " # results.append(item) # Preserve original data\n", " \n", " print(f\"Processing complete. Success rate: {total_items-error_count}/{input_length}\")\n", "\n", "# Execute enhancement\n", "enhance_dataset(\"AskNatureNet_data.json\", \"AskNatureNet_data_enhanced.json\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Optimized RAG System with E5-Mistral Embeddings and Llama3-70B Generation\n", " \n", "import json\n", "import logging\n", "import re\n", "import os\n", "import pickle\n", "from typing import List, Tuple, Optional\n", "import gradio as gr\n", "from openai import OpenAI\n", "from functools import lru_cache\n", "from tenacity import retry, stop_after_attempt, wait_exponential\n", "from langchain_community.retrievers import BM25Retriever\n", "from langchain_community.vectorstores import FAISS\n", "from langchain_core.embeddings import Embeddings\n", "from langchain_core.documents import Document\n", "from collections import defaultdict\n", "import hashlib\n", "from tqdm import tqdm # For progress tracking\n", "from dotenv import load_dotenv\n", "load_dotenv()\n", "\n", "# --- Configuration ---\n", "FAISS_INDEX_PATH = \"faiss_index\"\n", "BM25_INDEX_PATH = \"bm25_index.pkl\"\n", "CACHE_VERSION = \"v1\" # Increment when data format changes\n", "embedding_model = \"e5-mistral-7b-instruct\"\n", "generation_model = \"meta-llama-3-70b-instruct\"\n", "data_file_name = \"AskNatureNet_data_enhanced.json\"\n", "API_CONFIG = {\n", " \"api_key\": os.getenv(\"OPENAI_API_KEY\"),\n", " \"base_url\": \"https://chat-ai.academiccloud.de/v1\"\n", "}\n", "CHUNK_SIZE = 800\n", "OVERLAP = 200\n", "EMBEDDING_BATCH_SIZE = 32 # Batch size for embedding API calls\n", "\n", "# Initialize clients\n", "client = OpenAI(**API_CONFIG)\n", "logging.basicConfig(level=logging.INFO)\n", "logger = logging.getLogger(__name__)\n", "\n", "# --- Helper Functions ---\n", "def get_data_hash(file_path: str) -> str:\n", " \"\"\"Generate hash of data file for cache validation\"\"\"\n", " with open(file_path, \"rb\") as f:\n", " return hashlib.md5(f.read()).hexdigest()\n", "\n", "# --- Custom Embedding Handler with Progress Tracking ---\n", "class MistralEmbeddings(Embeddings):\n", " \"\"\"E5-Mistral-7B embedding adapter with error handling and progress tracking\"\"\"\n", " def embed_documents(self, texts: List[str]) -> List[List[float]]:\n", " embeddings = []\n", " try:\n", " # Process in batches with progress tracking\n", " for i in tqdm(range(0, len(texts), EMBEDDING_BATCH_SIZE), desc=\"Embedding Progress\"):\n", " batch = texts[i:i + EMBEDDING_BATCH_SIZE]\n", " response = client.embeddings.create(\n", " input=batch,\n", " model=embedding_model,\n", " encoding_format=\"float\"\n", " )\n", " embeddings.extend([e.embedding for e in response.data])\n", " return embeddings\n", " except Exception as e:\n", " logger.error(f\"Embedding Error: {str(e)}\")\n", " return [[] for _ in texts]\n", "\n", " def embed_query(self, text: str) -> List[float]:\n", " return self.embed_documents([text])[0]\n", "\n", "# --- Data Processing with Cache Validation ---\n", "def load_and_chunk_data(file_path: str) -> List[Document]:\n", " \"\"\"Enhanced chunking with metadata preservation\"\"\"\n", " current_hash = get_data_hash(file_path)\n", " cache_file = f\"documents_{CACHE_VERSION}_{current_hash}.pkl\"\n", " \n", " if os.path.exists(cache_file):\n", " logger.info(\"Loading cached documents\")\n", " with open(cache_file, \"rb\") as f:\n", " return pickle.load(f)\n", " \n", " with open(file_path, 'r', encoding='utf-8') as f:\n", " data = json.load(f)\n", " \n", " documents = []\n", " for item in tqdm(data, desc=\"Chunking Progress\"):\n", " base_content = f\"\"\"Source: {item['Source']}\n", "Application: {item['Application']}\n", "Functions: {', '.join(filter(None, [item.get('Function1'), item.get('Function2')]))}\n", "Technical Concepts: {', '.join(item['technical_concepts'])}\n", "Biological Mechanisms: {', '.join(item['biological_mechanisms'])}\"\"\"\n", " \n", " strategy = item['Strategy']\n", " for i in range(0, len(strategy), CHUNK_SIZE - OVERLAP):\n", " chunk = strategy[i:i + CHUNK_SIZE]\n", " documents.append(Document(\n", " page_content=f\"{base_content}\\nStrategy Excerpt:\\n{chunk}\",\n", " metadata={\n", " \"source\": item[\"Source\"],\n", " \"application\": item[\"Application\"],\n", " \"technical_concepts\": item[\"technical_concepts\"],\n", " \"sustainability_impacts\": item[\"sustainability_impacts\"],\n", " \"hyperlink\": item[\"Hyperlink\"],\n", " \"chunk_id\": f\"{item['Source']}-{len(documents)+1}\"\n", " }\n", " ))\n", " \n", " with open(cache_file, \"wb\") as f:\n", " pickle.dump(documents, f)\n", " return documents\n", "\n", "# --- Optimized Retrieval System ---\n", "class EnhancedRetriever:\n", " \"\"\"Hybrid retriever with persistent caching\"\"\"\n", " def __init__(self, documents: List[Document]):\n", " self.documents = documents\n", " self.bm25 = self._init_bm25()\n", " self.vector_store = self._init_faiss()\n", " self.vector_retriever = self.vector_store.as_retriever(search_kwargs={\"k\": 3})\n", "\n", " def _init_bm25(self) -> BM25Retriever:\n", " cache_key = f\"{BM25_INDEX_PATH}_{get_data_hash(data_file_name)}\"\n", " if os.path.exists(cache_key):\n", " logger.info(\"Loading cached BM25 index\")\n", " with open(cache_key, \"rb\") as f:\n", " return pickle.load(f)\n", " \n", " logger.info(\"Building new BM25 index\")\n", " retriever = BM25Retriever.from_documents(self.documents)\n", " retriever.k = 5\n", " with open(cache_key, \"wb\") as f:\n", " pickle.dump(retriever, f)\n", " return retriever\n", "\n", " def _init_faiss(self) -> FAISS:\n", " cache_key = f\"{FAISS_INDEX_PATH}_{get_data_hash(data_file_name)}\"\n", " if os.path.exists(cache_key):\n", " logger.info(\"Loading cached FAISS index\")\n", " return FAISS.load_local(\n", " cache_key,\n", " MistralEmbeddings(),\n", " allow_dangerous_deserialization=True\n", " )\n", " \n", " logger.info(\"Building new FAISS index\")\n", " vector_store = FAISS.from_documents(self.documents, MistralEmbeddings())\n", " vector_store.save_local(cache_key)\n", " return vector_store\n", "\n", " @lru_cache(maxsize=500)\n", " def retrieve(self, query: str) -> str:\n", " try:\n", " processed_query = self._preprocess_query(query)\n", " expanded_query = self._hyde_expansion(processed_query)\n", " \n", " bm25_results = self.bm25.invoke(processed_query)\n", " vector_results = self.vector_retriever.invoke(processed_query)\n", " expanded_results = self.bm25.invoke(expanded_query)\n", " \n", " fused_results = self._fuse_results([bm25_results, vector_results, expanded_results])\n", " return self._format_context(fused_results[:5])\n", " except Exception as e:\n", " logger.error(f\"Retrieval Error: {str(e)}\")\n", " return \"\"\n", "\n", " def _preprocess_query(self, query: str) -> str:\n", " return query.lower().strip()\n", "\n", " @lru_cache(maxsize=500)\n", " def _hyde_expansion(self, query: str) -> str:\n", " try:\n", " response = client.chat.completions.create(\n", " model=generation_model,\n", " messages=[{\n", " \"role\": \"user\",\n", " \"content\": f\"Generate a technical draft about biomimicry for: {query}\\nInclude domain-specific terms.\"\n", " }],\n", " temperature=0.5,\n", " max_tokens=200\n", " )\n", " return response.choices[0].message.content\n", " except Exception as e:\n", " logger.error(f\"HyDE Error: {str(e)}\")\n", " return query\n", "\n", " def _fuse_results(self, result_sets: List[List[Document]]) -> List[Document]:\n", " fused_scores = defaultdict(float)\n", " for docs in result_sets:\n", " for rank, doc in enumerate(docs, 1):\n", " fused_scores[doc.metadata[\"chunk_id\"]] += 1 / (rank + 60)\n", " \n", " seen = set()\n", " return [\n", " doc for doc in sorted(\n", " (doc for docs in result_sets for doc in docs),\n", " key=lambda x: fused_scores[x.metadata[\"chunk_id\"]],\n", " reverse=True\n", " ) if not (doc.metadata[\"chunk_id\"] in seen or seen.add(doc.metadata[\"chunk_id\"]))\n", " ]\n", "\n", " def _format_context(self, docs: List[Document]) -> str:\n", " context = []\n", " for doc in docs:\n", " context_str = f\"\"\"**Source**: [{doc.metadata['source']}]({doc.metadata['hyperlink']})\n", " **Application**: {doc.metadata['application']}\n", " **Key Concepts**: {', '.join(doc.metadata['technical_concepts'])}\n", " **Strategy Excerpt**:\\n{doc.page_content.split('Strategy Excerpt:')[-1].strip()}\"\"\"\n", " context.append(context_str)\n", " return \"\\n\\n---\\n\\n\".join(context)\n", "\n", "# --- Generation System ---\n", "SYSTEM_PROMPT = \"\"\"**Biomimicry Expert Guidelines**\n", "1. Base answers strictly on context\n", "2. Cite sources as [Source]\n", "3. **Bold** technical terms\n", "4. Include reference links\n", "\n", "Context: {context}\"\"\"\n", "\n", "@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=20))\n", "def get_ai_response(query: str, context: str) -> str:\n", " try:\n", " response = client.chat.completions.create(\n", " model=generation_model,\n", " messages=[\n", " {\"role\": \"system\", \"content\": SYSTEM_PROMPT.format(context=context)},\n", " {\"role\": \"user\", \"content\": f\"Question: {query}\\nProvide a detailed technical answer:\"}\n", " ],\n", " temperature=0.4,\n", " max_tokens=2000 # Increased max_tokens\n", " )\n", " logger.info(f\"Raw Response: {response.choices[0].message.content}\") # Log raw response\n", " return _postprocess_response(response.choices[0].message.content)\n", " except Exception as e:\n", " logger.error(f\"Generation Error: {str(e)}\")\n", " return \"I'm unable to generate a response right now. Please try again later.\"\n", "\n", "def _postprocess_response(response: str) -> str:\n", " response = re.sub(r\"\\[(.*?)\\]\", r\"[\\1](#)\", response)\n", " response = re.sub(r\"\\*\\*([\\w-]+)\\*\\*\", r\"**\\1**\", response)\n", " return response\n", "\n", "# --- Optimized Pipeline ---\n", "documents = load_and_chunk_data(data_file_name)\n", "retriever = EnhancedRetriever(documents)\n", "\n", "def generate_response(question: str) -> str:\n", " try:\n", " context = retriever.retrieve(question)\n", " return get_ai_response(question, context) if context else \"No relevant information found.\"\n", " except Exception as e:\n", " logger.error(f\"Pipeline Error: {str(e)}\")\n", " return \"An error occurred processing your request.\"\n", "\n", "# --- Gradio Interface ---\n", "def chat_interface(question: str, history: List[Tuple[str, str]]):\n", " response = generate_response(question)\n", " return \"\", history + [(question, response)]\n", "\n", "with gr.Blocks(title=\"AskNature BioRAG Expert\", theme=gr.themes.Soft()) as demo:\n", " gr.Markdown(\"# 🌿 AskNature RAG-based Chatbot \")\n", " with gr.Row():\n", " chatbot = gr.Chatbot(label=\"Dialogue History\", height=500)\n", " with gr.Row():\n", " question = gr.Textbox(placeholder=\"Ask about biomimicry (e.g. 'How does Werewool use coral proteins to make fibers?')\",\n", " label=\"Inquiry\", scale=4)\n", " clear_btn = gr.Button(\"Clear History\", variant=\"secondary\")\n", " \n", " gr.Markdown(\"\"\"\n", "