minh-4T commited on
Commit
2005b70
·
verified ·
1 Parent(s): bd6d18b

Update core/qa_pipeline.py

Browse files
Files changed (1) hide show
  1. core/qa_pipeline.py +108 -103
core/qa_pipeline.py CHANGED
@@ -1,21 +1,55 @@
1
  from typing import List, Generator
2
- import os,re , hashlib
3
  import logging
 
 
 
4
  from .models import llm
5
  from .config import TOP_K_RESULTS, FINAL_TOP_K
6
  from .rerank import advanced_rerank
7
  from .prompting import create_advanced_prompt
8
  from .retriever import HybridRetriever
9
  from .analyze_and_expand import analyze_and_expand_query
10
- from .llm_utils import safe_invoke , safe_stream
 
11
  logger = logging.getLogger(__name__)
12
 
 
13
  MAX_CONTEXT_CHARS = 12000
14
  MAX_DOC_CHARS = 1800
15
  MAX_OUT_CHARS = 3000
16
 
17
- # Làm sạch dữ liệu trước khi đưa vào prompt : Lọc bỏ prompt injection PII ( Personal Idenfiable Information)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  def sanitize_for_prompt(text: str) -> str:
 
19
  text = re.sub(r"(?i)(ignore previous instructions|system prompt|developer message|jailbreak)", "[FILTERED_INJECTION]", text)
20
  text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[EMAIL]", text)
21
  text = re.sub(r"\b(0\d{9}|\+84\d{9,10})\b", "[PHONE]", text)
@@ -23,7 +57,7 @@ def sanitize_for_prompt(text: str) -> str:
23
  return text.strip()
24
 
25
  def generate_standalone_query(message: str, history: List) -> str:
26
- """Tái tạo câu hỏi từ lịch sử để giữ nguyên chủ đề tránh nhầm lẫn cụm từ tương đồng."""
27
  if not history:
28
  return message
29
 
@@ -59,95 +93,34 @@ def generate_standalone_query(message: str, history: List) -> str:
59
  Câu hỏi hiện tại: {message}
60
  Câu hỏi độc lập:"""
61
 
62
- try:
63
- response = safe_invoke(llm, prompt, timeout=15, retries=1)
64
- standalone_q = response.content.strip() if hasattr(response, 'content') else str(response)
65
- logger.info(f" Câu hỏi đã tái tạo: {standalone_q}")
66
- return standalone_q
67
- except Exception as e:
68
- logger.exception(f" Lỗi tái tạo câu hỏi: {e}")
69
- return message
 
 
 
 
 
 
 
70
 
71
  def ask_ai_improved(message: str, history: List, hybrid_retriever) -> Generator[str, None, None]:
72
- if not message.strip():
73
- yield " Bạn chưa nhập câu hỏi."
74
- return
75
-
76
- if message.strip().lower() in {"hello", "hi", "xin chào", "chào"}:
77
- yield "Chào bạn 👋 Mình hỗ trợ tra cứu quy chế đào tạo. Bạn cần hỏi điều gì?"
78
- return
79
-
80
- logger.info(f" CÂU HỎI GỐC: {message}")
81
- question = generate_standalone_query(message, history)
82
- processed_data = analyze_and_expand_query(question)
83
-
84
- if processed_data.get("question_type") == "normal":
85
- ans = processed_data.get("answer") or "Chào bạn 👋 Mình hỗ trợ tra cứu quy chế đào tạo."
86
- yield ans
87
- return
88
-
89
- question_type = processed_data['question_type']
90
- queries = processed_data['expanded_queries']
91
- logger.info(f"Các truy vấn tìm kiếm: {queries}")
92
-
93
- all_docs: List = []
94
- seen = set()
95
- for query in queries:
96
- current_alpha = 0.4 if "CNTT" in query.upper() else 0.5
97
- docs = hybrid_retriever.search(query, k=TOP_K_RESULTS, alpha=current_alpha)
98
- for doc in docs:
99
- content_hash = hashlib.sha256(doc.page_content.encode("utf-8")).hexdigest()
100
- if content_hash not in seen:
101
- all_docs.append(doc)
102
- seen.add(content_hash)
103
-
104
- logger.info(f"Tìm thấy tổng {len(all_docs)} documents.")
105
- if not all_docs:
106
- yield "Không tìm thấy thông tin liên quan trong tài liệu."
107
- return
108
-
109
- final_docs = advanced_rerank(question, all_docs, top_k=FINAL_TOP_K)
110
-
111
- context_parts = []
112
- total_chars = 0
113
- for doc in final_docs:
114
- page = doc.metadata.get('page_number', 'N/A')
115
- file_name = doc.metadata.get('source_file') or doc.metadata.get('source')
116
- source = f"[{os.path.basename(file_name)} | Trang {page}]" if file_name else f"[Trang {page}]"
117
- block = f"{source}\n{doc.page_content}"
118
- if total_chars + len(block) > MAX_CONTEXT_CHARS:
119
- break
120
- total_chars += len(block)
121
- context_parts.append(block)
122
- context = "\n\n---\n\n".join(context_parts)
123
- topic_hint = processed_data.get('topic') or processed_data.get('root_question') or question
124
- prompt = create_advanced_prompt(question, context, question_type, topic_hint)
125
-
126
- logger.info("Đang tạo câu trả lời cuối cùng...")
127
- try:
128
- partial = ""
129
- emitted = False
130
- for chunk in safe_stream(llm, prompt):
131
- partial += chunk
132
- emitted = True
133
- if len(partial) > MAX_OUT_CHARS:
134
- partial = partial[:MAX_OUT_CHARS] + "\n\n[Đã cắt bớt nội dung dài]"
135
- yield partial
136
- return
137
- yield partial
138
- if not emitted:
139
- yield " Không nhận được phản hồi từ mô hình."
140
- except Exception:
141
- logger.exception(" Lỗi sinh câu trả lời")
142
- yield "Đã xảy ra lỗi hệ thống."
143
- return
144
-
145
 
146
  def ask_ai_stream_delta(message: str, history: List, hybrid_retriever) -> Generator[str, None, None]:
147
- """
148
- Tương tự ask_ai_improved nhưng yield delta chunks (các token mới) thay vì cumulative.
149
- Được dùng cho streaming SSE trên web frontend.
150
- """
151
  if not message.strip():
152
  yield " Bạn chưa nhập câu hỏi."
153
  return
@@ -172,6 +145,7 @@ def ask_ai_stream_delta(message: str, history: List, hybrid_retriever) -> Genera
172
  all_docs: List = []
173
  seen = set()
174
  for query in queries:
 
175
  current_alpha = 0.4 if "CNTT" in query.upper() else 0.5
176
  docs = hybrid_retriever.search(query, k=TOP_K_RESULTS, alpha=current_alpha)
177
  for doc in docs:
@@ -198,21 +172,52 @@ def ask_ai_stream_delta(message: str, history: List, hybrid_retriever) -> Genera
198
  break
199
  total_chars += len(block)
200
  context_parts.append(block)
 
201
  context = "\n\n---\n\n".join(context_parts)
202
  topic_hint = processed_data.get('topic') or processed_data.get('root_question') or question
203
  prompt = create_advanced_prompt(question, context, question_type, topic_hint)
204
 
205
- logger.info("Đang tạo câu trả lời cuối cùng (delta stream)...")
206
- try:
207
- emitted = False
208
- for chunk in safe_stream(llm, prompt):
209
- # yield delta (từng chunk mới) thay vì partial
210
- if chunk:
211
- emitted = True
212
- yield chunk
213
- if not emitted:
214
- yield " Không nhận được phản hồi từ mô hình."
215
- except Exception:
216
- logger.exception(" Lỗi sinh câu trả lời (stream delta)")
217
- yield "Đã xảy ra lỗi hệ thống."
218
- return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from typing import List, Generator
2
+ import os, re, hashlib
3
  import logging
4
+ import groq
5
+ import google.generativeai as genai
6
+ # Giữ nguyên các import của Minh
7
  from .models import llm
8
  from .config import TOP_K_RESULTS, FINAL_TOP_K
9
  from .rerank import advanced_rerank
10
  from .prompting import create_advanced_prompt
11
  from .retriever import HybridRetriever
12
  from .analyze_and_expand import analyze_and_expand_query
13
+ from .llm_utils import safe_invoke, safe_stream
14
+
15
  logger = logging.getLogger(__name__)
16
 
17
+ # Giữ nguyên các hằng số
18
  MAX_CONTEXT_CHARS = 12000
19
  MAX_DOC_CHARS = 1800
20
  MAX_OUT_CHARS = 3000
21
 
22
+ # Quản API Keys cho Groq Gemini với xoay tua tự động khi gặp lỗi hoặc hết hạn
23
+ class AIProviderManager:
24
+ def __init__(self):
25
+ # Lấy danh sách keys
26
+ self.groq_keys = [k.strip() for k in os.getenv("GROQ_API_KEYS", "").split(",") if k.strip()]
27
+ self.gemini_keys = [k.strip() for k in os.getenv("GEMINI_API_KEYS", "").split(",") if k.strip()]
28
+ self.groq_idx = 0
29
+ self.gemini_idx = 0
30
+
31
+ def get_groq_client(self):
32
+ if not self.groq_keys: return None
33
+ return groq.Groq(api_key=self.groq_keys[self.groq_idx])
34
+
35
+ def rotate_groq(self):
36
+ if len(self.groq_keys) > 1:
37
+ self.groq_idx = (self.groq_idx + 1) % len(self.groq_keys)
38
+ logger.info(f"🔄 Đã xoay sang Groq Key thứ {self.groq_idx + 1}")
39
+
40
+ def get_gemini_key(self):
41
+ if not self.gemini_keys: return None
42
+ return self.gemini_keys[self.gemini_idx]
43
+
44
+ def rotate_gemini(self):
45
+ if len(self.gemini_keys) > 1:
46
+ self.gemini_idx = (self.gemini_idx + 1) % len(self.gemini_keys)
47
+ logger.info(f"🔄 Đã xoay sang Gemini Key dự phòng")
48
+
49
+ api_manager = AIProviderManager()
50
+
51
  def sanitize_for_prompt(text: str) -> str:
52
+ """Lọc bỏ prompt injection và PII - Giữ nguyên của Minh"""
53
  text = re.sub(r"(?i)(ignore previous instructions|system prompt|developer message|jailbreak)", "[FILTERED_INJECTION]", text)
54
  text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[EMAIL]", text)
55
  text = re.sub(r"\b(0\d{9}|\+84\d{9,10})\b", "[PHONE]", text)
 
57
  return text.strip()
58
 
59
  def generate_standalone_query(message: str, history: List) -> str:
60
+ """Tái tạo câu hỏi từ lịch sử - Giữ nguyên logic xử history phức tạp của Minh"""
61
  if not history:
62
  return message
63
 
 
93
  Câu hỏi hiện tại: {message}
94
  Câu hỏi độc lập:"""
95
 
96
+ # Sử dụng xoay tua cho bước tái tạo câu hỏi
97
+ for _ in range(max(1, len(api_manager.groq_keys))):
98
+ try:
99
+ client = api_manager.get_groq_client()
100
+ response = client.chat.completions.create(
101
+ model="llama-3.1-8b-instant", # Dùng bản 8B cho nhanh và tiết kiệm
102
+ messages=[{"role": "user", "content": prompt}]
103
+ )
104
+ standalone_q = response.choices[0].message.content.strip()
105
+ logger.info(f" Câu hỏi đã tái tạo: {standalone_q}")
106
+ return standalone_q
107
+ except Exception:
108
+ api_manager.rotate_groq()
109
+ continue
110
+ return message
111
 
112
  def ask_ai_improved(message: str, history: List, hybrid_retriever) -> Generator[str, None, None]:
113
+ """Giữ nguyên hàm cumulative stream của Minh"""
114
+ full_response = ""
115
+ for delta in ask_ai_stream_delta(message, history, hybrid_retriever):
116
+ full_response += delta
117
+ if len(full_response) > MAX_OUT_CHARS:
118
+ yield full_response[:MAX_OUT_CHARS] + "\n\n[Đã cắt bớt nội dung dài]"
119
+ return
120
+ yield full_response
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
  def ask_ai_stream_delta(message: str, history: List, hybrid_retriever) -> Generator[str, None, None]:
123
+ """Hàm chính xử lý RAG - Giữ nguyên 100% flow của Minh"""
 
 
 
124
  if not message.strip():
125
  yield " Bạn chưa nhập câu hỏi."
126
  return
 
145
  all_docs: List = []
146
  seen = set()
147
  for query in queries:
148
+ # GIỮ NGUYÊN logic alpha ngành CNTT của Minh
149
  current_alpha = 0.4 if "CNTT" in query.upper() else 0.5
150
  docs = hybrid_retriever.search(query, k=TOP_K_RESULTS, alpha=current_alpha)
151
  for doc in docs:
 
172
  break
173
  total_chars += len(block)
174
  context_parts.append(block)
175
+
176
  context = "\n\n---\n\n".join(context_parts)
177
  topic_hint = processed_data.get('topic') or processed_data.get('root_question') or question
178
  prompt = create_advanced_prompt(question, context, question_type, topic_hint)
179
 
180
+ logger.info("Đang tạo câu trả lời cuối cùng ...")
181
+
182
+ success = False
183
+ # Thử với Groq
184
+ for _ in range(len(api_manager.groq_keys)):
185
+ try:
186
+ client = api_manager.get_groq_client()
187
+ stream = client.chat.completions.create(
188
+ model="llama-3.1-70b-versatile",
189
+ messages=[{"role": "user", "content": prompt}],
190
+ stream=True
191
+ )
192
+ for chunk in stream:
193
+ token = chunk.choices[0].delta.content
194
+ if token:
195
+ yield token
196
+ success = True
197
+ break
198
+ except Exception as e:
199
+ if "429" in str(e): # Lỗi Rate Limit
200
+ api_manager.rotate_groq()
201
+ continue
202
+ logger.error(f"Lỗi Groq: {e}")
203
+ break
204
+
205
+ # Dự phòng sang Gemini (nếu Groq lỗi hoặc hết key)
206
+ if not success:
207
+ logger.warning("Chuyển sang Gemini ...")
208
+ for _ in range(max(1, len(api_manager.gemini_keys))):
209
+ try:
210
+ genai.configure(api_key=api_manager.get_current_gemini_key())
211
+ model = genai.GenerativeModel('gemini-2.5-flash')
212
+ response = model.generate_content(prompt, stream=True)
213
+ for chunk in response:
214
+ if chunk.text:
215
+ yield chunk.text
216
+ success = True
217
+ break
218
+ except Exception as e:
219
+ api_manager.rotate_gemini()
220
+ logger.error(f"Lỗi Gemini: {e}")
221
+
222
+ if not success:
223
+ yield "Đã xảy ra lỗi hệ thống hoặc quá tải. Vui lòng thử lại sau giây lát!"