import os from typing import List from dataclasses import dataclass import chainlit as cl import requests import feedparser from dotenv import load_dotenv # LangChain bits (unchanged) from langchain.chat_models import ChatOpenAI from langchain.chains import ConversationalRetrievalChain from langchain.memory import ConversationBufferMemory from langchain.text_splitter import CharacterTextSplitter from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import FAISS load_dotenv() ARXIV_API = "https://export.arxiv.org/api/query" # ---------- Simple paper container (drop-in replacement for arxiv.Result we used) ---------- @dataclass class Paper: title: str summary: str comment: str entry_id: str authors: List[str] # ---------- Direct arXiv API fetch (HTTPS + custom UA) ---------- def fetch_arxiv_papers(query: str, max_results: int = 5) -> List[Paper]: params = { "search_query": query, "id_list": "", "sortBy": "relevance", "sortOrder": "descending", "start": 0, "max_results": max_results, } headers = { "User-Agent": f"arxiv-chainlit-app/1.0 (mailto:{os.getenv('CONTACT_EMAIL','noreply@example.com')})", "Accept": "application/atom+xml", } resp = requests.get(ARXIV_API, params=params, headers=headers, timeout=20) # Raise on non-200 so we can show a friendly error resp.raise_for_status() feed = feedparser.parse(resp.text) papers: List[Paper] = [] for e in feed.entries: title = getattr(e, "title", "").strip() summary = getattr(e, "summary", "").strip() comment = getattr(e, "arxiv_comment", "") if hasattr(e, "arxiv_comment") else "" entry_id = getattr(e, "id", getattr(e, "link", "")) authors = [a.get("name", "").strip() for a in getattr(e, "authors", [])] papers.append(Paper(title=title, summary=summary, comment=comment, entry_id=entry_id, authors=authors or ["Unknown"])) return papers # ---------- Your assistant, unchanged logic but using the new fetcher ---------- class ArxivResearchAssistant: def __init__(self): self.selected_paper: Paper | None = None self.qa_chain = None self.papers: List[Paper] = [] self.state = "SEARCH" async def search_papers(self, query: str): try: self.papers = fetch_arxiv_papers(query, max_results=5) except requests.HTTPError as e: # Shows the real HTTP status & message (e.g., if UA missing or rate-limited) await cl.Message(content=f"Error talking to arXiv (HTTP {e.response.status_code}): {e.response.text[:200]}").send() return None except Exception as e: await cl.Message(content=f"Error talking to arXiv: {e}").send() return None if not self.papers: await cl.Message(content="No papers found. Please try another search query.").send() return None paper_list = "\n".join([ f"{i+1}. {p.title} - {p.authors[0]}\nLink: {p.entry_id}" for i, p in enumerate(self.papers) ]) await cl.Message( content=f"Please select a paper by entering its number:\n\n{paper_list}\n\nEnter the number of the paper you want to select:" ).send() self.state = "SELECT" return self.papers async def select_paper(self, selection: str): try: idx = int(selection) - 1 if 0 <= idx < len(self.papers): self.selected_paper = self.papers[idx] else: await cl.Message(content="Invalid selection. Please try again.").send() return None except ValueError: await cl.Message(content="Invalid input. Please enter a number.").send() return None # Compose the text from the feed fields paper_text = ( f"{self.selected_paper.title}\n\n" f"{self.selected_paper.summary}\n\n" f"{self.selected_paper.comment or ''}" ) # Split, embed, index (unchanged) text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) chunks = text_splitter.split_text(paper_text) embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts( chunks, embeddings, metadatas=[{ "title": self.selected_paper.title, "link": self.selected_paper.entry_id, "chunk": f"Chunk {i+1}/{len(chunks)}" } for i in range(len(chunks))] ) memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, output_key="answer" ) self.qa_chain = ConversationalRetrievalChain.from_llm( ChatOpenAI(temperature=0, model="gpt-4o-mini"), vectorstore.as_retriever(), memory=memory, return_source_documents=True ) await cl.Message( content=( f"Selected paper: {self.selected_paper.title}\n" f"Link: {self.selected_paper.entry_id}\n\n" f"You can now ask questions about this paper. " f"Type 'new search' when you want to search for a different paper." ) ).send() self.state = "QA" return self.selected_paper async def process_question(self, message: str): if message.lower() == "new search": self.reset() await cl.Message(content="Sure! Please enter a new search query for arXiv papers.").send() return None response = self.qa_chain({"question": message}) answer = response["answer"] sources = "\n".join([ f"- {doc.metadata.get('title','Unknown title')} " f"({doc.metadata.get('link','No link')}) - {doc.metadata.get('chunk','No chunk info')}" for doc in response.get("source_documents", []) ]) if sources: answer += f"\n\nSources:\n{sources}" return answer def reset(self): self.selected_paper = None self.qa_chain = None self.papers = [] self.state = "SEARCH" # Global assistant instance assistant = ArxivResearchAssistant() @cl.on_chat_start async def start(): await cl.Message(content=( "Welcome! This tool helps you search for papers on arXiv, pick one, and ask questions about its content.\n\n" "Please enter a topic to search for on arXiv papers." )).send() @cl.on_message async def main(message: cl.Message): if assistant.state == "SEARCH": await assistant.search_papers(message.content) elif assistant.state == "SELECT": await assistant.select_paper(message.content) elif assistant.state == "QA": answer = await assistant.process_question(message.content) if answer: await cl.Message(content=answer).send() if __name__ == "__main__": cl.run()