| |
|
|
| import asyncio |
| import os |
| import re |
| from fastapi import APIRouter |
| from fastapi.responses import StreamingResponse |
| from app.models.schemas import AskRequest |
| from app.storage.vector_store import load_vectorstore_for_video, create_vectorstore_for_video |
| from app.services.qa_chain import create_qa_chain |
| from app.api.deps import llm |
| from app.storage.cache import load_transcript |
| from app.services.transcripts import get_transcript |
|
|
| router = APIRouter() |
|
|
| @router.get('/check/{video_id}') |
| def check_transcript_status(video_id: str): |
| transcript = load_transcript(video_id) |
| if transcript: |
| return {"status": "available"} |
| |
| vectorstore_path = f"./data/faiss/{video_id}/" |
| if os.path.exists(vectorstore_path): |
| return {"status": "available"} |
| |
| try: |
| transcript = get_transcript(video_id) |
| if transcript: |
| return {"status": "available"} |
| except: |
| pass |
| |
| return {"status": "unavailable"} |
|
|
| import uuid |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def remove_consecutive_duplicates(text: str) -> str: |
| """ |
| Remove consecutive duplicate words from text. |
| Example: "AWS AWS caused" -> "AWS caused" |
| Example: "economy, economy," -> "economy," |
| """ |
| |
| |
| text = re.sub(r'\b(\w+)\s+\1\b', r'\1', text, flags=re.IGNORECASE) |
| |
| |
| |
| text = re.sub(r'\b(\w+)([.,;:!?]?)\s+\1\2\b', r'\1\2', text, flags=re.IGNORECASE) |
| |
| |
| words = text.split() |
| cleaned = [] |
| prev_word = None |
| |
| for word in words: |
| |
| word_normalized = re.sub(r'[^\w]', '', word).lower() |
| if word_normalized != prev_word or word_normalized == '': |
| cleaned.append(word) |
| prev_word = word_normalized |
| |
| return ' '.join(cleaned) |
|
|
| @router.post('/ask/stream') |
| async def ask_question_stream(body: AskRequest): |
| video_id = body.video_id |
| question = body.question |
| |
| logger.info(f"REQ {uuid.uuid4()}: incoming QA request: video_id={video_id}, question_len={len(question)}") |
| |
| |
| if not video_id or not question: |
| async def error_stream(): |
| yield "data: β Missing video ID or question\n\n" |
| yield "data: [END]\n\n" |
| return StreamingResponse(error_stream(), media_type="text/event-stream") |
| |
| |
| question = str(question).strip() |
| if not question: |
| async def error_stream(): |
| yield "data: β Question cannot be empty\n\n" |
| yield "data: [END]\n\n" |
| return StreamingResponse(error_stream(), media_type="text/event-stream") |
| |
| try: |
| vectorstore = load_vectorstore_for_video(video_id) |
| except FileNotFoundError: |
| async def processing_stream(): |
| yield "data: π Processing video...\n\n" |
| await asyncio.sleep(0.2) |
| |
| transcript = load_transcript(video_id) |
| if not transcript: |
| try: |
| transcript = get_transcript(video_id) |
| except Exception as e: |
| yield f"data: β Could not fetch transcript: {str(e)}\n\n" |
| yield "data: [END]\n\n" |
| return |
| |
| yield "data: π§ Creating embeddings...\n\n" |
| await asyncio.sleep(0.2) |
| |
| try: |
| create_vectorstore_for_video(video_id, transcript) |
| vectorstore = load_vectorstore_for_video(video_id) |
| except Exception as e: |
| yield f"data: β Error creating embeddings: {str(e)}\n\n" |
| yield "data: [END]\n\n" |
| return |
| |
| yield "data: β
Ready!\n\n\n" |
| await asyncio.sleep(0.2) |
| |
| try: |
| qa_chain = create_qa_chain(llm, vectorstore) |
| result = qa_chain.invoke({"query": question}) |
| answer = result.get('result', result.get('answer', str(result))) |
| |
| |
| answer = str(answer).strip() |
| |
| |
| answer = remove_consecutive_duplicates(answer) |
| |
| |
| logger.info(f"Cleaned answer (first 200 chars): {answer[:200]}") |
| |
| |
| words = answer.split() |
| prev_word = None |
| for word in words: |
| word_clean = word.strip() |
| |
| word_normalized = re.sub(r'[^\w]', '', word_clean).lower() |
| if word_normalized != prev_word or word_normalized == '': |
| yield f"data: {word_clean}\n\n" |
| await asyncio.sleep(0.04) |
| prev_word = word_normalized |
| |
| except Exception as e: |
| logger.error(f"Error generating answer: {str(e)}") |
| yield f"data: β Error generating answer: {str(e)}\n\n" |
| |
| yield "data: [END]\n\n" |
| |
| return StreamingResponse(processing_stream(), media_type="text/event-stream") |
| |
| |
| qa_chain = create_qa_chain(llm, vectorstore) |
| |
| async def event_stream(): |
| try: |
| result = qa_chain.invoke({"query": question}) |
| answer = result.get('result', result.get('answer', str(result))) |
| |
| |
| answer = str(answer).strip() |
| |
| |
| answer = remove_consecutive_duplicates(answer) |
| |
| |
| logger.info(f"Cleaned answer (first 200 chars): {answer[:200]}") |
| |
| |
| words = answer.split() |
| prev_word = None |
| for word in words: |
| word_clean = word.strip() |
| |
| word_normalized = re.sub(r'[^\w]', '', word_clean).lower() |
| if word_normalized != prev_word or word_normalized == '': |
| yield f"data: {word_clean}\n\n" |
| await asyncio.sleep(0.04) |
| prev_word = word_normalized |
| |
| except Exception as e: |
| logger.error(f"Error: {str(e)}") |
| yield f"data: β Error: {str(e)}\n\n" |
| |
| yield "data: [END]\n\n" |
| |
| return StreamingResponse(event_stream(), media_type="text/event-stream") |
|
|