File size: 7,216 Bytes
54bef2f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# app/api/endpoints.py

import asyncio
import os
import re
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from app.models.schemas import AskRequest
from app.storage.vector_store import load_vectorstore_for_video, create_vectorstore_for_video
from app.services.qa_chain import create_qa_chain
from app.api.deps import llm
from app.storage.cache import load_transcript
from app.services.transcripts import get_transcript

router = APIRouter()

@router.get('/check/{video_id}')
def check_transcript_status(video_id: str):
    transcript = load_transcript(video_id)
    if transcript:
        return {"status": "available"}
    
    vectorstore_path = f"./data/faiss/{video_id}/"
    if os.path.exists(vectorstore_path):
        return {"status": "available"}
    
    try:
        transcript = get_transcript(video_id)
        if transcript:
            return {"status": "available"}
    except:
        pass
    
    return {"status": "unavailable"}

import uuid
import logging

logger = logging.getLogger(__name__)

def remove_consecutive_duplicates(text: str) -> str:
    """
    Remove consecutive duplicate words from text.
    Example: "AWS AWS caused" -> "AWS caused"
    Example: "economy, economy," -> "economy,"
    """
    # Pattern 1: Remove word-level duplicates (with punctuation handling)
    # Matches: word followed by space(s) and the same word
    text = re.sub(r'\b(\w+)\s+\1\b', r'\1', text, flags=re.IGNORECASE)
    
    # Pattern 2: Remove duplicates with punctuation
    # Matches: word with punctuation followed by space and same word with punctuation
    text = re.sub(r'\b(\w+)([.,;:!?]?)\s+\1\2\b', r'\1\2', text, flags=re.IGNORECASE)
    
    # Pattern 3: Clean up any remaining multiple consecutive duplicates
    words = text.split()
    cleaned = []
    prev_word = None
    
    for word in words:
        # Normalize for comparison (remove punctuation)
        word_normalized = re.sub(r'[^\w]', '', word).lower()
        if word_normalized != prev_word or word_normalized == '':
            cleaned.append(word)
            prev_word = word_normalized
    
    return ' '.join(cleaned)

@router.post('/ask/stream')
async def ask_question_stream(body: AskRequest):
    video_id = body.video_id
    question = body.question
    
    logger.info(f"REQ {uuid.uuid4()}: incoming QA request: video_id={video_id}, question_len={len(question)}")
    
    # CRITICAL: Validate inputs
    if not video_id or not question:
        async def error_stream():
            yield "data: ❌ Missing video ID or question\n\n"
            yield "data: [END]\n\n"
        return StreamingResponse(error_stream(), media_type="text/event-stream")
    
    # CRITICAL: Ensure question is a clean string
    question = str(question).strip()
    if not question:
        async def error_stream():
            yield "data: ❌ Question cannot be empty\n\n"
            yield "data: [END]\n\n"
        return StreamingResponse(error_stream(), media_type="text/event-stream")
    
    try:
        vectorstore = load_vectorstore_for_video(video_id)
    except FileNotFoundError:
        async def processing_stream():
            yield "data: πŸ”„ Processing video...\n\n"
            await asyncio.sleep(0.2)
            
            transcript = load_transcript(video_id)
            if not transcript:
                try:
                    transcript = get_transcript(video_id)
                except Exception as e:
                    yield f"data: ❌ Could not fetch transcript: {str(e)}\n\n"
                    yield "data: [END]\n\n"
                    return
            
            yield "data: 🧠 Creating embeddings...\n\n"
            await asyncio.sleep(0.2)
            
            try:
                create_vectorstore_for_video(video_id, transcript)
                vectorstore = load_vectorstore_for_video(video_id)
            except Exception as e:
                yield f"data: ❌ Error creating embeddings: {str(e)}\n\n"
                yield "data: [END]\n\n"
                return
            
            yield "data: βœ… Ready!\n\n\n"
            await asyncio.sleep(0.2)
            
            try:
                qa_chain = create_qa_chain(llm, vectorstore)
                result = qa_chain.invoke({"query": question})
                answer = result.get('result', result.get('answer', str(result)))
                
                # Ensure answer is string and clean
                answer = str(answer).strip()
                
                # CRITICAL: Apply aggressive deduplication before streaming
                answer = remove_consecutive_duplicates(answer)
                
                # Log cleaned answer
                logger.info(f"Cleaned answer (first 200 chars): {answer[:200]}")
                
                # Stream word by word with deduplication check
                words = answer.split()
                prev_word = None
                for word in words:
                    word_clean = word.strip()
                    # Additional check: don't send if same as previous
                    word_normalized = re.sub(r'[^\w]', '', word_clean).lower()
                    if word_normalized != prev_word or word_normalized == '':
                        yield f"data: {word_clean}\n\n"
                        await asyncio.sleep(0.04)
                        prev_word = word_normalized
                    
            except Exception as e:
                logger.error(f"Error generating answer: {str(e)}")
                yield f"data: ❌ Error generating answer: {str(e)}\n\n"
            
            yield "data: [END]\n\n"
        
        return StreamingResponse(processing_stream(), media_type="text/event-stream")
    
    # Vectorstore exists
    qa_chain = create_qa_chain(llm, vectorstore)
    
    async def event_stream():
        try:
            result = qa_chain.invoke({"query": question})
            answer = result.get('result', result.get('answer', str(result)))
            
            # Ensure answer is string and clean
            answer = str(answer).strip()
            
            # CRITICAL: Apply aggressive deduplication before streaming
            answer = remove_consecutive_duplicates(answer)
            
            # Log cleaned answer
            logger.info(f"Cleaned answer (first 200 chars): {answer[:200]}")
            
            # Stream word by word with deduplication check
            words = answer.split()
            prev_word = None
            for word in words:
                word_clean = word.strip()
                # Additional check: don't send if same as previous
                word_normalized = re.sub(r'[^\w]', '', word_clean).lower()
                if word_normalized != prev_word or word_normalized == '':
                    yield f"data: {word_clean}\n\n"
                    await asyncio.sleep(0.04)
                    prev_word = word_normalized
                
        except Exception as e:
            logger.error(f"Error: {str(e)}")
            yield f"data: ❌ Error: {str(e)}\n\n"
        
        yield "data: [END]\n\n"
    
    return StreamingResponse(event_stream(), media_type="text/event-stream")