Spaces:
Running
Running
| from fastapi import FastAPI, UploadFile, File, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import nomic | |
| import fitz # PyMuPDF | |
| from nomic import embed | |
| from pinecone import Pinecone, Vector | |
| import os | |
| from dotenv import load_dotenv | |
| import uuid | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import base64 | |
| import io | |
| from urllib.parse import urljoin | |
| from PIL import Image | |
| from urllib.parse import urlparse, parse_qs | |
| import time | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript | |
| from groq import Groq | |
| import uuid | |
| load_dotenv() | |
| # -------- CLIENTS -------- | |
| groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| from google import genai # | |
| # Client initialize karein | |
| client_gemini = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| NOMIC_API_KEY = os.getenv("NOMIC_API_KEY") | |
| if NOMIC_API_KEY: | |
| nomic.login(NOMIC_API_KEY) | |
| else: | |
| print("Warning: NOMIC_API_KEY not found!") | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| index = pc.Index("context-hub") | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # -------- NAMESPACES -------- | |
| PDF_NAMESPACE = "pdf" | |
| current_pdf_namespace = "pdf_default" | |
| IMAGE_NAMESPACE = "image" | |
| YOUTUBE_NAMESPACE = "youtube" | |
| # -------- HELPER FUNCTIONS -------- | |
| def chunk_text(text: str, size: int = 500): | |
| return [text[i:i+size] for i in range(0, len(text), size)] | |
| def create_embeddings(chunks: list): | |
| if not chunks: | |
| return [] | |
| try: | |
| response = embed.text( | |
| texts=chunks, | |
| model="nomic-embed-text-v1", | |
| task_type="search_document" | |
| ) | |
| result = dict(response) | |
| if 'embeddings' in result and len(result['embeddings']) > 0: | |
| return result['embeddings'] | |
| else: | |
| print("DEBUG: Nomic returned no embeddings!") | |
| return [] | |
| except Exception as e: | |
| print(f"Embedding error: {e}") | |
| return [] | |
| def store_embeddings(chunks: list, embeddings: list, namespace: str = ""): | |
| if not chunks or not embeddings: | |
| print("DEBUG: Nothing to store.") | |
| return | |
| vectors = [ | |
| Vector( | |
| id=str(uuid.uuid4()), | |
| values=embeddings[i], | |
| metadata={"text": chunks[i]} | |
| ) | |
| for i in range(len(embeddings)) | |
| ] | |
| if vectors: | |
| index.upsert(vectors=vectors, namespace=namespace) # ← namespace added | |
| def search(query: str, namespace: str = ""): | |
| try: | |
| query_response = embed.text( | |
| texts=[query], | |
| model="nomic-embed-text-v1", | |
| task_type="search_query" | |
| ) | |
| result = dict(query_response) | |
| query_emb = result['embeddings'][0] | |
| results = index.query( | |
| vector=query_emb, | |
| top_k=3, | |
| include_metadata=True, | |
| namespace=namespace # ← namespace added | |
| ) | |
| relevant_chunks = [] | |
| for match in results['matches']: | |
| if match.get('metadata') and 'text' in match['metadata']: | |
| relevant_chunks.append(match['metadata']['text']) | |
| return relevant_chunks | |
| except Exception as e: | |
| print(f"Search error: {e}") | |
| return [] | |
| def generate_answer(query: str, context: str): | |
| try: | |
| response = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a helpful assistant. Answer ONLY using the provided context. " | |
| "If the answer is not in the context, say you don't know. " | |
| "When you find an answer, elaborate and provide more information." | |
| ) | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Context: {context}\n\nQuestion: {query}" | |
| } | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Groq Error: {str(e)}") | |
| return f"Groq Error: {str(e)}" | |
| async def upload_pdf(file: UploadFile = File(...)): | |
| global current_pdf_namespace | |
| try: | |
| # Generate a fresh unique namespace for every new upload | |
| current_pdf_namespace = f"pdf_{uuid.uuid4().hex}" | |
| print(f"DEBUG: Using new namespace: {current_pdf_namespace}") | |
| content = await file.read() | |
| pdf = fitz.open(stream=content, filetype="pdf") | |
| text = "" | |
| for page in pdf: | |
| text += page.get_text() | |
| if not text.strip(): | |
| return {"error": "PDF is empty or could not be read."} | |
| chunks = chunk_text(text) | |
| embeddings = create_embeddings(chunks) | |
| store_embeddings(chunks, embeddings, namespace=current_pdf_namespace) | |
| return {"message": "PDF processed successfully."} | |
| except Exception as e: | |
| print(f"Upload Error: {e}") | |
| return {"error": f"Internal Server Error: {str(e)}"} | |
| async def query_pdf(query: str = Query(...)): | |
| global current_pdf_namespace | |
| try: | |
| results = search(query, namespace=current_pdf_namespace) | |
| if not results: | |
| return {"answer": "I couldn't find any relevant information in the document."} | |
| context = " ".join(results) | |
| answer = generate_answer(query, context) | |
| return {"answer": answer} | |
| except Exception as e: | |
| return {"error": f"Search failed: {str(e)}"} | |
| # -------- IMAGE HELPERS ------------------------------------------------------------------------------------------------------------------- | |
| def create_image_embedding(image): | |
| output = embed.image( | |
| images=[image], | |
| model="nomic-embed-vision-v1.5" | |
| ) | |
| result = dict(output) | |
| return result["embeddings"][0] | |
| def store_image_embedding(image_vector, filename: str): | |
| index.upsert( | |
| vectors=[Vector( | |
| id=str(uuid.uuid4()), | |
| values=image_vector, | |
| metadata={"type": "image", "filename": filename} | |
| )], | |
| namespace=IMAGE_NAMESPACE | |
| ) | |
| def generate_image_answer(query, image): | |
| try: | |
| # Convert image → bytes | |
| img_byte_arr = io.BytesIO() | |
| image.save(img_byte_arr, format="PNG") | |
| img_bytes = img_byte_arr.getvalue() | |
| # Convert to base64 | |
| img_base64 = base64.b64encode(img_bytes).decode("utf-8") | |
| response = client_gemini.models.generate_content( | |
| model="gemini-3-flash-preview", | |
| contents=[ | |
| { | |
| "role": "user", | |
| "parts": [ | |
| {"text": query}, | |
| { | |
| "inline_data": { | |
| "mime_type": "image/png", | |
| "data": img_base64 | |
| } | |
| } | |
| ] | |
| } | |
| ] | |
| ) | |
| return response.text if response.text else "No answer generated." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # -------- URL HELPERS -------------------------------------------------------------------------------------------------------------- | |
| def normalize_url(url: str): | |
| url = url.strip() | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url | |
| return url | |
| def extract_text_from_url(url: str): | |
| try: | |
| url = normalize_url(url) # ✅ FIX ADDED | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| response = requests.get(url, headers=headers, timeout=10) | |
| if response.status_code != 200: | |
| return None | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Remove unwanted tags | |
| for tag in soup(["script", "style"]): | |
| tag.decompose() | |
| text = soup.get_text(separator=" ") | |
| return text[:5000] | |
| except Exception as e: | |
| print(f"URL Error: {e}") | |
| return None | |
| def extract_about_contact(base_url: str): | |
| """Optional enhancement: fetch About & Contact pages""" | |
| try: | |
| base_url = normalize_url(base_url) | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| response = requests.get(base_url, headers=headers, timeout=10) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| links = [a.get("href") for a in soup.find_all("a", href=True)] | |
| about_url = None | |
| contact_url = None | |
| for link in links: | |
| full_link = urljoin(base_url, link) | |
| if "about" in link.lower() and not about_url: | |
| about_url = full_link | |
| if "contact" in link.lower() and not contact_url: | |
| contact_url = full_link | |
| content = "" | |
| if about_url: | |
| content += extract_text_from_url(about_url) or "" | |
| if contact_url: | |
| content += extract_text_from_url(contact_url) or "" | |
| return content[:5000] | |
| except: | |
| return "" | |
| def generate_url_answer(text: str, url: str, query: str = None): | |
| try: | |
| user_msg = ( | |
| f"URL: {url}\n\n" | |
| f"Content:\n{text[:5000]}\n\n" | |
| f"Task: Explain what this page is about in simple words." | |
| ) | |
| if query: | |
| user_msg += f"\n\nUser Question: {query}" | |
| response = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant that answers questions about webpage content." | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_msg | |
| } | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| current_url = None | |
| current_url_text = None | |
| async def process_url(url: str = Query(...)): | |
| global current_url, current_url_text | |
| url = normalize_url(url) | |
| main_text = extract_text_from_url(url) | |
| extra_text = extract_about_contact(url) | |
| combined_text = (main_text or "") + "\n\n" + (extra_text or "") | |
| if not combined_text.strip(): | |
| return {"error": "Failed to extract content"} | |
| current_url = url | |
| current_url_text = combined_text # ✅ store everything once | |
| return {"message": "URL processed successfully"} | |
| async def query_url(query: str = Query(...)): | |
| global current_url, current_url_text | |
| if not current_url_text: | |
| return {"error": "No URL processed"} | |
| try: | |
| answer = generate_url_answer(current_url_text, current_url, query) | |
| return {"answer": answer} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # ######################################## YOUTUBE ############################################ | |
| import yt_dlp | |
| import whisper | |
| def get_video_id(url: str): | |
| parsed_url = urlparse(url) | |
| if "youtube.com" in url: | |
| return parse_qs(parsed_url.query).get("v", [None])[0] | |
| elif "youtu.be" in url: | |
| return parsed_url.path.strip("/") | |
| return None | |
| def get_transcript_whisper(url): | |
| try: | |
| # Download audio | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': 'audio.%(ext)s', | |
| 'quiet': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Load Whisper model | |
| model = whisper.load_model("base") # use "small" or "medium" for better accuracy | |
| # Transcribe | |
| result = model.transcribe("audio.webm") | |
| return result["text"] | |
| except Exception as e: | |
| return f"ERROR: {str(e)}" | |
| from pydantic import BaseModel | |
| class YouTubeRequest(BaseModel): | |
| url: str | |
| async def process_youtube(request: YouTubeRequest): | |
| try: | |
| url = request.url | |
| video_id = get_video_id(url) | |
| text = get_transcript_whisper(url) | |
| if text.startswith("ERROR"): | |
| print("TRANSCRIPT ERROR:", text) | |
| return {"status": "error", "error": text} | |
| chunks = chunk_text(text, size=1000) | |
| embeddings = create_embeddings(chunks) | |
| vectors = [ | |
| Vector( | |
| id=str(uuid.uuid4()), | |
| values=embeddings[i], | |
| metadata={"text": chunks[i]} | |
| ) | |
| for i in range(len(embeddings)) | |
| ] | |
| index.upsert(vectors=vectors, namespace=video_id) | |
| return {"status": "success"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| class QueryRequest(BaseModel): | |
| query: str | |
| video_id: str | |
| async def query_youtube(request: QueryRequest): | |
| try: | |
| query = request.query | |
| video_id = request.video_id | |
| query_response = embed.text( | |
| texts=[query], | |
| model="nomic-embed-text-v1", | |
| task_type="search_query" | |
| ) | |
| query_emb = dict(query_response)["embeddings"][0] | |
| results = index.query( | |
| vector=query_emb, | |
| top_k=5, | |
| include_metadata=True, | |
| namespace=video_id | |
| ) | |
| context = "\n\n".join([ | |
| match["metadata"]["text"] | |
| for match in results["matches"] | |
| ]) | |
| if not context.strip(): | |
| return {"answer": "No relevant info found."} | |
| answer = generate_answer(query, context) | |
| return {"answer": answer} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # ------------------------------- | |
| # Get transcript (SAFE) | |
| # ------------------------------- | |
| # def fetch_transcript(video_id: str): | |
| # # ✅ CACHE | |
| # if video_id in transcript_cache: | |
| # return transcript_cache[video_id] | |
| # # ✅ NEW API: use instance method | |
| # ytt_api = YouTubeTranscriptApi() | |
| # fetched = ytt_api.fetch(video_id, languages=['en']) | |
| # processed = [ | |
| # { | |
| # "text": snippet.text, | |
| # "start": snippet.start | |
| # } | |
| # for snippet in fetched | |
| # ] | |
| # transcript_cache[video_id] = processed | |
| # return processed | |
| # @app.post("/process-youtube") | |
| # async def process_youtube(url: str = Query(...)): | |
| # try: | |
| # video_id = extract_video_id(url) | |
| # if not video_id: | |
| # return {"error": "Invalid YouTube URL"} | |
| # # ✅ RATE LIMIT | |
| # if not is_allowed(video_id): | |
| # return {"error": "Too many requests. Please wait a few seconds."} | |
| # # ✅ PREVENT DUPLICATE PROCESSING | |
| # if video_id in processing_videos: | |
| # return {"message": "Already processing this video"} | |
| # processing_videos.add(video_id) | |
| # try: | |
| # transcript_data = fetch_transcript(video_id) | |
| # except TranscriptsDisabled: | |
| # return {"error": "Captions disabled for this video"} | |
| # except NoTranscriptFound: | |
| # return {"error": "No transcript available"} | |
| # except Exception as e: | |
| # return {"error": f"Transcript error: {str(e)}"} | |
| # # Combine text | |
| # full_text = " ".join([item["text"] for item in transcript_data]) | |
| # chunks = chunk_text(full_text, size=1000) | |
| # embeddings = create_embeddings(chunks) | |
| # if not embeddings: | |
| # return {"error": "Failed to create embeddings"} | |
| # vectors = [] | |
| # for i, chunk in enumerate(chunks): | |
| # vectors.append( | |
| # Vector( | |
| # id=f"{video_id}_{i}", | |
| # values=embeddings[i], | |
| # metadata={ | |
| # "text": chunk, | |
| # "source": "youtube", | |
| # "chunk_id": i, | |
| # "video_id": video_id | |
| # } | |
| # ) | |
| # ) | |
| # index.upsert(vectors=vectors, namespace=YOUTUBE_NAMESPACE) | |
| # return {"message": f"Processed successfully ({len(chunks)} chunks)"} | |
| # except Exception as e: | |
| # return {"error": str(e)} | |
| # finally: | |
| # if 'video_id' in locals(): | |
| # processing_videos.discard(video_id) | |
| # @app.post("/query-youtube") | |
| # async def query_youtube(query: str = Query(...)): | |
| # try: | |
| # query_response = embed.text( | |
| # texts=[query], | |
| # model="nomic-embed-text-v1", | |
| # task_type="search_query" | |
| # ) | |
| # query_emb = dict(query_response)["embeddings"][0] | |
| # results = index.query( | |
| # vector=query_emb, | |
| # top_k=5, | |
| # include_metadata=True, | |
| # namespace=YOUTUBE_NAMESPACE | |
| # ) | |
| # matches = results.get("matches", []) | |
| # if not matches: | |
| # return {"answer": "No relevant content found in this video."} | |
| # context = "\n\n".join([m["metadata"]["text"] for m in matches]) | |
| # answer = generate_answer(query, context) | |
| # return { | |
| # "answer": answer, | |
| # "sources": [ | |
| # {"chunk": m["metadata"]["chunk_id"]} | |
| # for m in matches | |
| # ] | |
| # } | |
| # except Exception as e: | |
| # return {"error": str(e)} |