from fastapi import FastAPI, UploadFile, File, Query from fastapi.middleware.cors import CORSMiddleware import nomic import fitz # PyMuPDF from nomic import embed from pinecone import Pinecone, Vector import os from dotenv import load_dotenv import uuid import requests from bs4 import BeautifulSoup import base64 import io from urllib.parse import urljoin from PIL import Image from urllib.parse import urlparse, parse_qs import time from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript from groq import Groq import uuid load_dotenv() # -------- CLIENTS -------- groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) from google import genai # # Client initialize karein client_gemini = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") NOMIC_API_KEY = os.getenv("NOMIC_API_KEY") if NOMIC_API_KEY: nomic.login(NOMIC_API_KEY) else: print("Warning: NOMIC_API_KEY not found!") pc = Pinecone(api_key=PINECONE_API_KEY) index = pc.Index("context-hub") app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # -------- NAMESPACES -------- PDF_NAMESPACE = "pdf" current_pdf_namespace = "pdf_default" IMAGE_NAMESPACE = "image" YOUTUBE_NAMESPACE = "youtube" # -------- HELPER FUNCTIONS -------- def chunk_text(text: str, size: int = 500): return [text[i:i+size] for i in range(0, len(text), size)] def create_embeddings(chunks: list): if not chunks: return [] try: response = embed.text( texts=chunks, model="nomic-embed-text-v1", task_type="search_document" ) result = dict(response) if 'embeddings' in result and len(result['embeddings']) > 0: return result['embeddings'] else: print("DEBUG: Nomic returned no embeddings!") return [] except Exception as e: print(f"Embedding error: {e}") return [] def store_embeddings(chunks: list, embeddings: list, namespace: str = ""): if not chunks or not embeddings: print("DEBUG: Nothing to store.") return vectors = [ Vector( id=str(uuid.uuid4()), values=embeddings[i], metadata={"text": chunks[i]} ) for i in range(len(embeddings)) ] if vectors: index.upsert(vectors=vectors, namespace=namespace) # ← namespace added def search(query: str, namespace: str = ""): try: query_response = embed.text( texts=[query], model="nomic-embed-text-v1", task_type="search_query" ) result = dict(query_response) query_emb = result['embeddings'][0] results = index.query( vector=query_emb, top_k=3, include_metadata=True, namespace=namespace # ← namespace added ) relevant_chunks = [] for match in results['matches']: if match.get('metadata') and 'text' in match['metadata']: relevant_chunks.append(match['metadata']['text']) return relevant_chunks except Exception as e: print(f"Search error: {e}") return [] def generate_answer(query: str, context: str): try: response = groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[ { "role": "system", "content": ( "You are a helpful assistant. Answer ONLY using the provided context. " "If the answer is not in the context, say you don't know. " "When you find an answer, elaborate and provide more information." ) }, { "role": "user", "content": f"Context: {context}\n\nQuestion: {query}" } ] ) return response.choices[0].message.content except Exception as e: print(f"Groq Error: {str(e)}") return f"Groq Error: {str(e)}" @app.post("/upload") async def upload_pdf(file: UploadFile = File(...)): global current_pdf_namespace try: # Generate a fresh unique namespace for every new upload current_pdf_namespace = f"pdf_{uuid.uuid4().hex}" print(f"DEBUG: Using new namespace: {current_pdf_namespace}") content = await file.read() pdf = fitz.open(stream=content, filetype="pdf") text = "" for page in pdf: text += page.get_text() if not text.strip(): return {"error": "PDF is empty or could not be read."} chunks = chunk_text(text) embeddings = create_embeddings(chunks) store_embeddings(chunks, embeddings, namespace=current_pdf_namespace) return {"message": "PDF processed successfully."} except Exception as e: print(f"Upload Error: {e}") return {"error": f"Internal Server Error: {str(e)}"} @app.post("/query") async def query_pdf(query: str = Query(...)): global current_pdf_namespace try: results = search(query, namespace=current_pdf_namespace) if not results: return {"answer": "I couldn't find any relevant information in the document."} context = " ".join(results) answer = generate_answer(query, context) return {"answer": answer} except Exception as e: return {"error": f"Search failed: {str(e)}"} # -------- IMAGE HELPERS ------------------------------------------------------------------------------------------------------------------- def create_image_embedding(image): output = embed.image( images=[image], model="nomic-embed-vision-v1.5" ) result = dict(output) return result["embeddings"][0] def store_image_embedding(image_vector, filename: str): index.upsert( vectors=[Vector( id=str(uuid.uuid4()), values=image_vector, metadata={"type": "image", "filename": filename} )], namespace=IMAGE_NAMESPACE ) def generate_image_answer(query, image): try: # Convert image → bytes img_byte_arr = io.BytesIO() image.save(img_byte_arr, format="PNG") img_bytes = img_byte_arr.getvalue() # Convert to base64 img_base64 = base64.b64encode(img_bytes).decode("utf-8") response = client_gemini.models.generate_content( model="gemini-3-flash-preview", contents=[ { "role": "user", "parts": [ {"text": query}, { "inline_data": { "mime_type": "image/png", "data": img_base64 } } ] } ] ) return response.text if response.text else "No answer generated." except Exception as e: return f"Error: {str(e)}" # -------- URL HELPERS -------------------------------------------------------------------------------------------------------------- def normalize_url(url: str): url = url.strip() if not url.startswith(("http://", "https://")): url = "https://" + url return url def extract_text_from_url(url: str): try: url = normalize_url(url) # ✅ FIX ADDED headers = {"User-Agent": "Mozilla/5.0"} response = requests.get(url, headers=headers, timeout=10) if response.status_code != 200: return None soup = BeautifulSoup(response.text, "html.parser") # Remove unwanted tags for tag in soup(["script", "style"]): tag.decompose() text = soup.get_text(separator=" ") return text[:5000] except Exception as e: print(f"URL Error: {e}") return None def extract_about_contact(base_url: str): """Optional enhancement: fetch About & Contact pages""" try: base_url = normalize_url(base_url) headers = {"User-Agent": "Mozilla/5.0"} response = requests.get(base_url, headers=headers, timeout=10) soup = BeautifulSoup(response.text, "html.parser") links = [a.get("href") for a in soup.find_all("a", href=True)] about_url = None contact_url = None for link in links: full_link = urljoin(base_url, link) if "about" in link.lower() and not about_url: about_url = full_link if "contact" in link.lower() and not contact_url: contact_url = full_link content = "" if about_url: content += extract_text_from_url(about_url) or "" if contact_url: content += extract_text_from_url(contact_url) or "" return content[:5000] except: return "" def generate_url_answer(text: str, url: str, query: str = None): try: user_msg = ( f"URL: {url}\n\n" f"Content:\n{text[:5000]}\n\n" f"Task: Explain what this page is about in simple words." ) if query: user_msg += f"\n\nUser Question: {query}" response = groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[ { "role": "system", "content": "You are a helpful assistant that answers questions about webpage content." }, { "role": "user", "content": user_msg } ] ) return response.choices[0].message.content except Exception as e: return f"Error: {str(e)}" current_url = None current_url_text = None @app.post("/process-url") async def process_url(url: str = Query(...)): global current_url, current_url_text url = normalize_url(url) main_text = extract_text_from_url(url) extra_text = extract_about_contact(url) combined_text = (main_text or "") + "\n\n" + (extra_text or "") if not combined_text.strip(): return {"error": "Failed to extract content"} current_url = url current_url_text = combined_text # ✅ store everything once return {"message": "URL processed successfully"} @app.post("/query-url") async def query_url(query: str = Query(...)): global current_url, current_url_text if not current_url_text: return {"error": "No URL processed"} try: answer = generate_url_answer(current_url_text, current_url, query) return {"answer": answer} except Exception as e: return {"error": str(e)} # ######################################## YOUTUBE ############################################ import yt_dlp import whisper def get_video_id(url: str): parsed_url = urlparse(url) if "youtube.com" in url: return parse_qs(parsed_url.query).get("v", [None])[0] elif "youtu.be" in url: return parsed_url.path.strip("/") return None def get_transcript_whisper(url): try: # Download audio ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': 'audio.%(ext)s', 'quiet': True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Load Whisper model model = whisper.load_model("base") # use "small" or "medium" for better accuracy # Transcribe result = model.transcribe("audio.webm") return result["text"] except Exception as e: return f"ERROR: {str(e)}" from pydantic import BaseModel class YouTubeRequest(BaseModel): url: str @app.post("/process-youtube") async def process_youtube(request: YouTubeRequest): try: url = request.url video_id = get_video_id(url) text = get_transcript_whisper(url) if text.startswith("ERROR"): print("TRANSCRIPT ERROR:", text) return {"status": "error", "error": text} chunks = chunk_text(text, size=1000) embeddings = create_embeddings(chunks) vectors = [ Vector( id=str(uuid.uuid4()), values=embeddings[i], metadata={"text": chunks[i]} ) for i in range(len(embeddings)) ] index.upsert(vectors=vectors, namespace=video_id) return {"status": "success"} except Exception as e: return {"error": str(e)} class QueryRequest(BaseModel): query: str video_id: str @app.post("/query-youtube") async def query_youtube(request: QueryRequest): try: query = request.query video_id = request.video_id query_response = embed.text( texts=[query], model="nomic-embed-text-v1", task_type="search_query" ) query_emb = dict(query_response)["embeddings"][0] results = index.query( vector=query_emb, top_k=5, include_metadata=True, namespace=video_id ) context = "\n\n".join([ match["metadata"]["text"] for match in results["matches"] ]) if not context.strip(): return {"answer": "No relevant info found."} answer = generate_answer(query, context) return {"answer": answer} except Exception as e: return {"error": str(e)} # ------------------------------- # Get transcript (SAFE) # ------------------------------- # def fetch_transcript(video_id: str): # # ✅ CACHE # if video_id in transcript_cache: # return transcript_cache[video_id] # # ✅ NEW API: use instance method # ytt_api = YouTubeTranscriptApi() # fetched = ytt_api.fetch(video_id, languages=['en']) # processed = [ # { # "text": snippet.text, # "start": snippet.start # } # for snippet in fetched # ] # transcript_cache[video_id] = processed # return processed # @app.post("/process-youtube") # async def process_youtube(url: str = Query(...)): # try: # video_id = extract_video_id(url) # if not video_id: # return {"error": "Invalid YouTube URL"} # # ✅ RATE LIMIT # if not is_allowed(video_id): # return {"error": "Too many requests. Please wait a few seconds."} # # ✅ PREVENT DUPLICATE PROCESSING # if video_id in processing_videos: # return {"message": "Already processing this video"} # processing_videos.add(video_id) # try: # transcript_data = fetch_transcript(video_id) # except TranscriptsDisabled: # return {"error": "Captions disabled for this video"} # except NoTranscriptFound: # return {"error": "No transcript available"} # except Exception as e: # return {"error": f"Transcript error: {str(e)}"} # # Combine text # full_text = " ".join([item["text"] for item in transcript_data]) # chunks = chunk_text(full_text, size=1000) # embeddings = create_embeddings(chunks) # if not embeddings: # return {"error": "Failed to create embeddings"} # vectors = [] # for i, chunk in enumerate(chunks): # vectors.append( # Vector( # id=f"{video_id}_{i}", # values=embeddings[i], # metadata={ # "text": chunk, # "source": "youtube", # "chunk_id": i, # "video_id": video_id # } # ) # ) # index.upsert(vectors=vectors, namespace=YOUTUBE_NAMESPACE) # return {"message": f"Processed successfully ({len(chunks)} chunks)"} # except Exception as e: # return {"error": str(e)} # finally: # if 'video_id' in locals(): # processing_videos.discard(video_id) # @app.post("/query-youtube") # async def query_youtube(query: str = Query(...)): # try: # query_response = embed.text( # texts=[query], # model="nomic-embed-text-v1", # task_type="search_query" # ) # query_emb = dict(query_response)["embeddings"][0] # results = index.query( # vector=query_emb, # top_k=5, # include_metadata=True, # namespace=YOUTUBE_NAMESPACE # ) # matches = results.get("matches", []) # if not matches: # return {"answer": "No relevant content found in this video."} # context = "\n\n".join([m["metadata"]["text"] for m in matches]) # answer = generate_answer(query, context) # return { # "answer": answer, # "sources": [ # {"chunk": m["metadata"]["chunk_id"]} # for m in matches # ] # } # except Exception as e: # return {"error": str(e)}