File size: 7,216 Bytes
54bef2f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | # app/api/endpoints.py
import asyncio
import os
import re
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from app.models.schemas import AskRequest
from app.storage.vector_store import load_vectorstore_for_video, create_vectorstore_for_video
from app.services.qa_chain import create_qa_chain
from app.api.deps import llm
from app.storage.cache import load_transcript
from app.services.transcripts import get_transcript
router = APIRouter()
@router.get('/check/{video_id}')
def check_transcript_status(video_id: str):
transcript = load_transcript(video_id)
if transcript:
return {"status": "available"}
vectorstore_path = f"./data/faiss/{video_id}/"
if os.path.exists(vectorstore_path):
return {"status": "available"}
try:
transcript = get_transcript(video_id)
if transcript:
return {"status": "available"}
except:
pass
return {"status": "unavailable"}
import uuid
import logging
logger = logging.getLogger(__name__)
def remove_consecutive_duplicates(text: str) -> str:
"""
Remove consecutive duplicate words from text.
Example: "AWS AWS caused" -> "AWS caused"
Example: "economy, economy," -> "economy,"
"""
# Pattern 1: Remove word-level duplicates (with punctuation handling)
# Matches: word followed by space(s) and the same word
text = re.sub(r'\b(\w+)\s+\1\b', r'\1', text, flags=re.IGNORECASE)
# Pattern 2: Remove duplicates with punctuation
# Matches: word with punctuation followed by space and same word with punctuation
text = re.sub(r'\b(\w+)([.,;:!?]?)\s+\1\2\b', r'\1\2', text, flags=re.IGNORECASE)
# Pattern 3: Clean up any remaining multiple consecutive duplicates
words = text.split()
cleaned = []
prev_word = None
for word in words:
# Normalize for comparison (remove punctuation)
word_normalized = re.sub(r'[^\w]', '', word).lower()
if word_normalized != prev_word or word_normalized == '':
cleaned.append(word)
prev_word = word_normalized
return ' '.join(cleaned)
@router.post('/ask/stream')
async def ask_question_stream(body: AskRequest):
video_id = body.video_id
question = body.question
logger.info(f"REQ {uuid.uuid4()}: incoming QA request: video_id={video_id}, question_len={len(question)}")
# CRITICAL: Validate inputs
if not video_id or not question:
async def error_stream():
yield "data: β Missing video ID or question\n\n"
yield "data: [END]\n\n"
return StreamingResponse(error_stream(), media_type="text/event-stream")
# CRITICAL: Ensure question is a clean string
question = str(question).strip()
if not question:
async def error_stream():
yield "data: β Question cannot be empty\n\n"
yield "data: [END]\n\n"
return StreamingResponse(error_stream(), media_type="text/event-stream")
try:
vectorstore = load_vectorstore_for_video(video_id)
except FileNotFoundError:
async def processing_stream():
yield "data: π Processing video...\n\n"
await asyncio.sleep(0.2)
transcript = load_transcript(video_id)
if not transcript:
try:
transcript = get_transcript(video_id)
except Exception as e:
yield f"data: β Could not fetch transcript: {str(e)}\n\n"
yield "data: [END]\n\n"
return
yield "data: π§ Creating embeddings...\n\n"
await asyncio.sleep(0.2)
try:
create_vectorstore_for_video(video_id, transcript)
vectorstore = load_vectorstore_for_video(video_id)
except Exception as e:
yield f"data: β Error creating embeddings: {str(e)}\n\n"
yield "data: [END]\n\n"
return
yield "data: β
Ready!\n\n\n"
await asyncio.sleep(0.2)
try:
qa_chain = create_qa_chain(llm, vectorstore)
result = qa_chain.invoke({"query": question})
answer = result.get('result', result.get('answer', str(result)))
# Ensure answer is string and clean
answer = str(answer).strip()
# CRITICAL: Apply aggressive deduplication before streaming
answer = remove_consecutive_duplicates(answer)
# Log cleaned answer
logger.info(f"Cleaned answer (first 200 chars): {answer[:200]}")
# Stream word by word with deduplication check
words = answer.split()
prev_word = None
for word in words:
word_clean = word.strip()
# Additional check: don't send if same as previous
word_normalized = re.sub(r'[^\w]', '', word_clean).lower()
if word_normalized != prev_word or word_normalized == '':
yield f"data: {word_clean}\n\n"
await asyncio.sleep(0.04)
prev_word = word_normalized
except Exception as e:
logger.error(f"Error generating answer: {str(e)}")
yield f"data: β Error generating answer: {str(e)}\n\n"
yield "data: [END]\n\n"
return StreamingResponse(processing_stream(), media_type="text/event-stream")
# Vectorstore exists
qa_chain = create_qa_chain(llm, vectorstore)
async def event_stream():
try:
result = qa_chain.invoke({"query": question})
answer = result.get('result', result.get('answer', str(result)))
# Ensure answer is string and clean
answer = str(answer).strip()
# CRITICAL: Apply aggressive deduplication before streaming
answer = remove_consecutive_duplicates(answer)
# Log cleaned answer
logger.info(f"Cleaned answer (first 200 chars): {answer[:200]}")
# Stream word by word with deduplication check
words = answer.split()
prev_word = None
for word in words:
word_clean = word.strip()
# Additional check: don't send if same as previous
word_normalized = re.sub(r'[^\w]', '', word_clean).lower()
if word_normalized != prev_word or word_normalized == '':
yield f"data: {word_clean}\n\n"
await asyncio.sleep(0.04)
prev_word = word_normalized
except Exception as e:
logger.error(f"Error: {str(e)}")
yield f"data: β Error: {str(e)}\n\n"
yield "data: [END]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
|