Spaces:
Sleeping
Sleeping
| # main.py | |
| import os | |
| import uuid | |
| from typing import List | |
| from fastapi import FastAPI, UploadFile, File, Form | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import pinecone | |
| import openai | |
| from dotenv import load_dotenv | |
| import PyPDF2 | |
| import io | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Initialize FastAPI app | |
| app = FastAPI() | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Configure OpenAI | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| # Pinecone configuration | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| PINECONE_ENV = os.getenv("PINECONE_ENV") | |
| INDEX_NAME = "main" | |
| VECTOR_DIM = 3072 # Dimension for 'text-embedding-ada-002' embeddings | |
| # Initialize Pinecone | |
| pc = pinecone.Pinecone(api_key=PINECONE_API_KEY) | |
| if INDEX_NAME not in pc.list_indexes().names(): | |
| pc.create_index( | |
| name=INDEX_NAME, | |
| dimension=VECTOR_DIM, | |
| metric='cosine' | |
| ) | |
| index = pc.Index(INDEX_NAME) | |
| # In-memory store for bot metadata (for demonstration) | |
| bots = {} | |
| def generate_gpt4o_mini_response(context: str, query: str) -> str: | |
| """ | |
| Generate a response using OpenAI's GPT model. | |
| Uses the chat completions API with the latest model. | |
| """ | |
| client = openai.OpenAI() | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant that answers questions based on the given context."}, | |
| {"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"} | |
| ] | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", # You can also use "gpt-4" if you have access | |
| messages=messages, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| async def upload_documents(files: List[UploadFile] = File(...)): | |
| """ | |
| Accepts file uploads, processes PDFs and other text documents, | |
| generates embeddings using OpenAI, stores them in Pinecone, | |
| and returns a unique botid. | |
| """ | |
| client = openai.OpenAI() | |
| botid = str(uuid.uuid4()) | |
| bots[botid] = {"vectors": []} | |
| for file in files: | |
| # Read file content | |
| content = await file.read() | |
| # Process different file types | |
| if file.filename.lower().endswith('.pdf'): | |
| # Handle PDF files | |
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(content)) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| else: | |
| # Handle other text files | |
| text = content.decode('utf-8', errors='ignore') | |
| # Generate embedding using OpenAI | |
| embedding_response = client.embeddings.create( | |
| input=text, | |
| model="text-embedding-3-large" | |
| ) | |
| vector = embedding_response.data[0].embedding | |
| # Create a unique ID for this vector | |
| vector_id = f"{botid}_{file.filename}_{uuid.uuid4()}" | |
| # Upsert the vector into Pinecone with metadata including the text content | |
| index.upsert(vectors=[(vector_id, vector, { | |
| "botid": botid, | |
| "filename": file.filename, | |
| "text": text | |
| })]) | |
| bots[botid]["vectors"].append(vector_id) | |
| return {"botid": botid} | |
| async def query_endpoint(botid: str = Form(...), query: str = Form(...)): | |
| """ | |
| Accepts a botid and user query, retrieves relevant vectors from Pinecone, | |
| and returns a response generated using GPT-4o-mini proxy. | |
| """ | |
| client = openai.OpenAI() | |
| # Generate embedding for the query using OpenAI | |
| query_embedding_response = client.embeddings.create( | |
| input=query, | |
| model="text-embedding-3-large" | |
| ) | |
| query_vector = query_embedding_response.data[0].embedding | |
| # Query Pinecone for similar vectors associated with the given botid | |
| response = index.query( | |
| vector=query_vector, | |
| top_k=5, | |
| filter={"botid": {"$eq": botid}}, | |
| include_metadata=True | |
| ) | |
| # Process the response matches | |
| matches = response.matches if hasattr(response, 'matches') else [] | |
| # If no matches found, the bot doesn't exist or has no content | |
| if not matches: | |
| return JSONResponse(status_code=404, content={"error": "No content found for this bot"}) | |
| results = [] | |
| relevant_texts = [] | |
| for match in matches: | |
| if hasattr(match, 'metadata') and match.metadata: | |
| filename = match.metadata.get('filename', 'Unknown file') | |
| text = match.metadata.get('text', '') | |
| score = match.score if hasattr(match, 'score') else 0.0 | |
| results.append({ | |
| "filename": filename, | |
| "score": score | |
| }) | |
| if text: | |
| relevant_texts.append(text) | |
| # Create context from available results and texts | |
| context = "" | |
| if results: | |
| context += "Relevant files: " + ", ".join([r["filename"] for r in results]) + "\n\n" | |
| if relevant_texts: | |
| context += "Content from relevant documents:\n" + "\n---\n".join(relevant_texts) | |
| else: | |
| context = "No relevant content found" | |
| # Use GPT-4o-mini proxy to generate an answer | |
| answer = generate_gpt4o_mini_response(context, query) | |
| return {"response": answer, "matches": results} | |
| async def root(): | |
| with open("index.html") as f: | |
| return f.read() | |
| async def bot_page(): | |
| with open("bot.html") as f: | |
| return f.read() | |
| async def generate_embed_code(botid: str): | |
| """ | |
| Generates and returns a dynamic embed code snippet for the provided botid. | |
| """ | |
| base_url = "https://poemsforaphrodite-bot.hf.space" | |
| embed_snippet = f""" | |
| <!-- SmartlyQ Chatbot Embed Code --> | |
| <div id="smartlyq-chatbot-container"></div> | |
| <script src="{base_url}/static/chatbot.js?botid={botid}"></script> | |
| """ | |
| return {"embed_code": embed_snippet} | |