Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException, UploadFile, File,Request,Depends,status,BackgroundTasks | |
| from fastapi.security import OAuth2PasswordBearer | |
| from pydantic import BaseModel, Json,EmailStr | |
| from typing import Optional | |
| from pinecone import Pinecone, ServerlessSpec | |
| from uuid import uuid4 | |
| import os | |
| from dotenv import load_dotenv | |
| from rag import * | |
| from fastapi.responses import StreamingResponse | |
| import json | |
| from prompt import * | |
| from typing import Literal | |
| import time | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import requests | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| load_dotenv() | |
| ## setup pinecone index | |
| pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| index_name = os.environ.get("INDEX_NAME") # change if desired | |
| existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] | |
| if index_name not in existing_indexes: | |
| pc.create_index( | |
| name=index_name, | |
| dimension=1536, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
| ) | |
| while not pc.describe_index(index_name).status["ready"]: | |
| time.sleep(1) | |
| index = pc.Index(index_name) | |
| vector_store = PineconeVectorStore(index=index, embedding=embedding) | |
| ## setup authorization | |
| api_keys = [os.environ.get("FASTAPI_API_KEY")] | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # use token authentication | |
| def api_key_auth(api_key: str = Depends(oauth2_scheme)): | |
| if api_key not in api_keys: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Forbidden" | |
| ) | |
| dev_mode = os.environ.get("DEV") | |
| if dev_mode == "True": | |
| app = FastAPI() | |
| else: | |
| app = FastAPI(dependencies=[Depends(api_key_auth)]) | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) | |
| # Pydantic model for the form data | |
| class ContactForm(BaseModel): | |
| name: str | |
| email: EmailStr | |
| message: str | |
| def send_simple_message(to,subject,text): | |
| api_key = os.getenv("MAILGUN_API_KEY") | |
| return requests.post( | |
| "https://api.mailgun.net/v3/sandboxafc6970ffdab40ee9566a4e180b117fd.mailgun.org/messages", | |
| auth=("api", api_key), | |
| data={"from": "Excited User <mailgun@sandboxafc6970ffdab40ee9566a4e180b117fd.mailgun.org>", | |
| "to": [to], | |
| "subject": subject, | |
| "text": text}) | |
| # Function to send email | |
| def send_email(form_data: ContactForm): | |
| # sender_email = os.getenv("SENDER_EMAIL") | |
| # sender_password = os.getenv("SENDER_PASSWORD") | |
| receiver_email = os.getenv("RECEIVER_EMAIL") # Your email | |
| # Setup the message content | |
| text = f"Name: {form_data.name}\nEmail: {form_data.email}\nMessage: {form_data.message}" | |
| title = "New message from your website!" | |
| # Send the email | |
| try: | |
| send_simple_message(receiver_email,title,text) | |
| except Exception as e: | |
| print(e) | |
| return {"message": "Failed to send email."} | |
| # Endpoint to handle form submission | |
| async def send_contact_form(form_data: ContactForm, background_tasks: BackgroundTasks): | |
| background_tasks.add_task(send_email, form_data) | |
| return {"message": "Email sent successfully!"} | |
| class UserInput(BaseModel): | |
| query: str | |
| stream: Optional[bool] = False | |
| messages: Optional[list[dict]] = [] | |
| class ChunkToDB(BaseModel): | |
| message: str | |
| title: str | |
| async def add_chunk_to_db(chunk: ChunkToDB): | |
| try: | |
| title = chunk.title | |
| message = chunk.message | |
| return get_vectorstore(text_chunk=message,index=index,title=title) | |
| except Exception as e: | |
| return {"message": str(e)} | |
| async def list_vectors(): | |
| try: | |
| return index.list() | |
| except Exception as e: | |
| return {"message": str(e)} | |
| async def generate(user_input: UserInput): | |
| try: | |
| print(user_input.stream,user_input.query) | |
| if user_input.stream: | |
| return StreamingResponse(generate_stream(user_input.query,user_input.messages,index_name=index,stream=True,vector_store=vector_store),media_type="application/json") | |
| else: | |
| return generate_stream(user_input.query,user_input.messages,index_name=index,stream=False,vector_store=vector_store) | |
| except Exception as e: | |
| return {"message": str(e)} | |
| async def retreive_context_response(query: str): | |
| try: | |
| return retreive_context(index=index,query=query) | |
| except Exception as e: | |
| return {"message": str(e)} | |
| async def delete_vector(filename_id: str): | |
| try: | |
| return index.delete(ids=[filename_id]) | |
| except Exception as e: | |
| return {"message": str(e)} | |
| async def check_server(): | |
| return {"message":"Server is running"} | |
| async def read_root(): | |
| return {"message":"Welcome to the AI API"} | |