Spaces:
Paused
Paused
| import json | |
| import os | |
| import random | |
| import uuid | |
| import datetime | |
| import re | |
| from typing import List, Tuple, Dict, Optional, Generator, Any | |
| from agent import ( | |
| PREFIX, | |
| COMPRESS_DATA_PROMPT_SMALL, | |
| COMPRESS_DATA_PROMPT, | |
| LOG_PROMPT, | |
| LOG_RESPONSE | |
| ) | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from pypdf import PdfReader | |
| import openai | |
| from huggingface_hub import HfApi | |
| # Configuration | |
| OPENAI_API_BASE = "https://openrouter.ai/api/v1" | |
| OPENAI_API_KEY = os.environ.get("OR_KEY", "") | |
| REPO_NAME = "LPX55/ArxivPapers" | |
| SAVE_DATA_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/" | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| api = HfApi(token=HF_TOKEN) | |
| # Initialize OpenAI client | |
| openai.api_base = OPENAI_API_BASE | |
| openai.api_key = OPENAI_API_KEY | |
| VERBOSE = True # Set to False to disable debug logging | |
| # Indexing Constants | |
| INDEX_PROMPT = """Compile this data into a structured JSON format with these keys: | |
| - "keywords": List of important keywords | |
| - "title": Descriptive title | |
| - "description": Brief summary | |
| - "content": Main content | |
| - "url": Source URL if available | |
| """ | |
| def extract_paper_metadata(content: str) -> Dict: | |
| """Extract structured metadata from a paper's content.""" | |
| metadata = { | |
| "keywords": [], | |
| "title": "Untitled", | |
| "description": "No description", | |
| "content": content[:1000], | |
| "url": "" | |
| } | |
| # Extract URL | |
| url_match = re.search(r'https?://[^\s]+', content) | |
| if url_match: | |
| metadata['url'] = url_match.group(0) | |
| # Extract title (first line that looks like a title) | |
| lines = content.split('\n') | |
| for line in lines: | |
| if len(line) > 20 and line[0].isupper() and line[-1] in ('.', '?', '!'): | |
| metadata['title'] = line | |
| break | |
| # Extract description (first paragraph) | |
| paragraphs = [p for p in content.split('\n\n') if len(p) > 50] | |
| if paragraphs: | |
| metadata['description'] = paragraphs[0] | |
| # Extract keywords (from title and description) | |
| text_for_keywords = f"{metadata['title']} {metadata['description']}" | |
| words = [w.lower() for w in re.findall(r'\w+', text_for_keywords) if len(w) > 3] | |
| metadata['keywords'] = sorted(list(set(words)))[:10] # Get top 10 unique keywords | |
| return metadata | |
| def save_paper_to_memory(content: str) -> Dict: | |
| """Save a paper to memory with proper metadata extraction.""" | |
| metadata = extract_paper_metadata(content) | |
| # Additional processing for academic papers | |
| if 'arxiv' in metadata['url'].lower(): | |
| metadata['keywords'].extend(['arxiv', 'paper', 'research']) | |
| metadata['description'] = f"Academic paper: {metadata['description']}" | |
| return metadata | |
| def create_index() -> None: | |
| """Create or update the search index from memory files.""" | |
| uid = uuid.uuid4() | |
| # Load existing index | |
| index_url = f"{SAVE_DATA_URL}mem-test2/index.json" | |
| r = requests.get(index_url) | |
| index_data = json.loads(r.text) if r.status_code == 200 else [{}] | |
| # Load main memory data | |
| main_url = f"{SAVE_DATA_URL}mem-test2/main.json" | |
| m = requests.get(main_url) | |
| main_data = json.loads(m.text) if m.status_code == 200 else [] | |
| # Update index | |
| for entry in main_data: | |
| try: | |
| for keyword in entry.get('keywords', []): | |
| if keyword in index_data[0]: | |
| if entry['file_name'] not in index_data[0][keyword]: | |
| index_data[0][keyword].append(entry['file_name']) | |
| else: | |
| index_data[0][keyword] = [entry['file_name']] | |
| except Exception as e: | |
| print(f"Indexing error: {e}") | |
| # Save updated index | |
| index_path = f"tmp-index-{uid}.json" | |
| with open(index_path, "w") as f: | |
| json.dump(index_data, f) | |
| api.upload_file( | |
| path_or_fileobj=index_path, | |
| path_in_repo="/mem-test2/index.json", | |
| repo_id=REPO_NAME, | |
| repo_type="dataset", | |
| ) | |
| def fetch_url_content(url: str) -> Tuple[bool, str]: | |
| """Fetch content from a URL and return status and content.""" | |
| try: | |
| if not url: | |
| return False, "Enter valid URL" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, "lxml") | |
| return True, str(soup) | |
| return False, f"Status: {response.status_code}" | |
| except Exception as e: | |
| return False, f"Error: {e}" | |
| def read_file_content(file_path: str) -> str: | |
| """Read content from a file (txt or pdf).""" | |
| if file_path.endswith(".pdf"): | |
| reader = PdfReader(file_path) | |
| return "\n".join(page.extract_text() for page in reader.pages) | |
| elif file_path.endswith(".txt"): | |
| with open(file_path, "r") as f: | |
| return f.read() | |
| return "" | |
| def generate_response(prompt: str, model: str = "meta-llama/llama-4-maverick:free") -> str: | |
| """Generate response using OpenRouter API.""" | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def process_pdf_url(pdf_url: str) -> str: | |
| """Process PDF from URL and extract text.""" | |
| try: | |
| response = requests.get(pdf_url, stream=True) | |
| if response.status_code == 200: | |
| temp_path = f"temp_{uuid.uuid4()}.pdf" | |
| with open(temp_path, "wb") as f: | |
| f.write(response.content) | |
| return read_file_content(temp_path) | |
| return f"Error: Status {response.status_code}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def save_memory(purpose: str, content: str) -> List[Dict]: | |
| """Save processed content to memory with proper metadata extraction.""" | |
| metadata = extract_paper_metadata(content) | |
| return [metadata] | |
| def summarize( | |
| inp: str, | |
| history: List[Tuple[str, str]], | |
| report_check: bool, | |
| sum_check: bool, | |
| mem_check: bool, | |
| data: str = "", | |
| file: Optional[str] = None, | |
| url: str = "", | |
| pdf_url: str = "", | |
| model: str = "meta-llama/llama-4-maverick:free" | |
| ) -> Generator[Tuple[str, List[Tuple[str, str]], str, Dict], None, None]: | |
| """Main summarization function with memory support.""" | |
| history = [(inp, "Processing...")] | |
| yield "", history, "", {} | |
| processed_data = "" | |
| if pdf_url.startswith("http"): | |
| processed_data += f"PDF URL: {pdf_url}\n" | |
| if url.startswith("http"): | |
| processed_data += f"URL: {url}\n" | |
| if file: | |
| processed_data += f"File: {file}\n" | |
| if data: | |
| processed_data += f"Data: {data[:1000]}\n" | |
| summary = f"Summary for: {inp[:100]}\n{processed_data[:500]}" | |
| memory_entries = [] | |
| if mem_check: | |
| memory_entries = save_memory(inp, processed_data) | |
| if memory_entries: | |
| summary += "\n\nSaved to memory" | |
| else: | |
| summary += "\n\nMemory save failed" | |
| yield summary, history, "", memory_entries[0] if memory_entries else {} | |
| def create_app(): | |
| with gr.Blocks() as app: | |
| gr.Markdown("## Mixtral 8x7B Summarizer") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| prompt = gr.Textbox(label="Instruction") | |
| with gr.Column(scale=1): | |
| report_check = gr.Checkbox(label="Return report", value=True) | |
| sum_check = gr.Checkbox(label="Summarize", value=True) | |
| mem_check = gr.Checkbox(label="Memory", value=True) | |
| submit_btn = gr.Button("Submit") | |
| with gr.Row(): | |
| with gr.Tab("Text"): | |
| data = gr.Textbox(label="Input text") | |
| with gr.Tab("File"): | |
| file = gr.File(label="Upload file") | |
| with gr.Tab("URL"): | |
| url = gr.Textbox(label="Website URL") | |
| with gr.Tab("PDF"): | |
| pdf_url = gr.Textbox(label="PDF URL") | |
| chatbot = gr.Chatbot() | |
| error_box = gr.Textbox() | |
| json_output = gr.JSON() | |
| submit_btn.click( | |
| summarize, | |
| [prompt, chatbot, report_check, sum_check, mem_check, data, file, url, pdf_url], | |
| [prompt, chatbot, error_box, json_output] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_app() | |
| app.launch() | |