Spaces:
Sleeping
Sleeping
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import wraps | |
| import json | |
| import os | |
| from threading import Thread | |
| import time | |
| from typing import List, Dict, Any, Tuple | |
| from urllib.parse import urlparse | |
| from flask import Blueprint, request, Response | |
| from langchain.schema.document import Document | |
| from server.constant.constants import (RECALL_TOP_K, RERANK_RECALL_TOP_K, | |
| MAX_QUERY_LENGTH, SESSION_EXPIRE_TIME, | |
| MAX_HISTORY_SESSION_LENGTH) | |
| from server.app.utils.decorators import token_required | |
| from server.app.utils.sqlite_client import get_db_connection | |
| from server.app.utils.diskcache_client import diskcache_client | |
| from server.app.utils.diskcache_lock import diskcache_lock | |
| from server.logger.logger_config import my_logger as logger | |
| from server.rag.generation.llm import llm_generator | |
| from server.rag.pre_retrieval.query_transformation.rewrite import detect_query_lang | |
| from server.rag.post_retrieval.rerank.flash_ranker import RerankRequest, reranker | |
| from server.rag.retrieval.vector_search import vector_search | |
| LLM_NAME = os.getenv('LLM_NAME') | |
| MIN_RELEVANCE_SCORE = float(os.getenv('MIN_RELEVANCE_SCORE', '0.3')) | |
| BOT_TOPIC = os.getenv('BOT_TOPIC') | |
| USE_PREPROCESS_QUERY = int(os.getenv('USE_PREPROCESS_QUERY')) | |
| USE_RERANKING = int(os.getenv('USE_RERANKING')) | |
| USE_DEBUG = int(os.getenv('USE_DEBUG')) | |
| queries_bp = Blueprint('queries', __name__, url_prefix='/open_kf_api/queries') | |
| def get_user_query_history(user_id: str, is_streaming: bool) -> List[Any]: | |
| if is_streaming: | |
| history_key = f"open_kf:query_history:{user_id}:stream" | |
| else: | |
| history_key = f"open_kf:query_history:{user_id}" | |
| history_items = diskcache_client.get_list(history_key)[::-1] | |
| history = [json.loads(item) for item in history_items] | |
| return history | |
| def save_user_query_history(user_id: str, query: str, answer: str, | |
| is_streaming: bool) -> None: | |
| try: | |
| # After generating the response from LLM | |
| # Store user query and LLM response in Cache | |
| if is_streaming: | |
| history_key = f"open_kf:query_history:{user_id}:stream" | |
| history_data = {'query': query, 'answer': answer} | |
| else: | |
| history_key = f"open_kf:query_history:{user_id}" | |
| answer_json = json.loads(answer) | |
| history_data = {'query': query, 'answer': answer_json} | |
| diskcache_client.append_to_list(history_key, | |
| json.dumps(history_data), | |
| ttl=SESSION_EXPIRE_TIME, | |
| max_length=MAX_HISTORY_SESSION_LENGTH) | |
| except Exception as e: | |
| logger.error( | |
| f"For the query: '{query}' and user_id: '{user_id}', is processed failed with Cache, the exception is {e}" | |
| ) | |
| timestamp = int(time.time()) | |
| conn = None | |
| try: | |
| # Store user query and LLM resposne in DB | |
| conn = get_db_connection() | |
| try: | |
| with diskcache_lock.lock(): | |
| if is_streaming: | |
| conn.execute( | |
| 'INSERT INTO t_user_qa_record_tab (user_id, query, answer, source, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?)', | |
| (user_id, query, answer, '[]', timestamp, timestamp)) | |
| else: | |
| conn.execute( | |
| 'INSERT INTO t_user_qa_record_tab (user_id, query, answer, source, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?)', | |
| (user_id, query, answer_json["answer"], | |
| json.dumps( | |
| answer_json["source"]), timestamp, timestamp)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"process discache_lock exception:{e}") | |
| return { | |
| 'retcode': -30000, | |
| 'message': f'An error occurred: {e}', | |
| 'data': {} | |
| } | |
| except Exception as e: | |
| logger.error( | |
| f"For the query: '{query}' and user_id: '{user_id}', is processed failed with Database, the exception is {e}" | |
| ) | |
| finally: | |
| if conn: | |
| conn.close() | |
| def refine_query(query: str, history_context: str, lang: str) -> str: | |
| prompt = f"""Given a conversation (between Human and Assistant) and a follow up message from Human, using the prior knowledge relationships, rewrite the message to be a standalone and detailed question that captures all relevant context from the conversation. Ensure the rewritten question: | |
| 1. Preserves the original intent of the follow-up message. | |
| 2. If the true intent of the follow-up message cannot be determined, make no modifications to avoid generating an incorrect question. | |
| 3. The length of the rewritten question should not increase significantly compared to the follow-up message, to avoid altering the original intent. | |
| 4. Do not directly use the content of the Assistant's responses to form the rewritten question. Prioritize referring to the information of the Human's historical question. | |
| 5. Maintains the same language as the follow-up message (e.g., reply in Chinese if the question was asked in Chinese and in English if it was asked in English). | |
| Chat History (Sorted by request time from most recent to oldest): | |
| {history_context} | |
| Follow Up Input: {query} | |
| **NOTE:** The detected language of the Input is '{lang}'. Please respond in '{lang}'. | |
| Refined Standalone Question:""" | |
| beg_time = time.time() | |
| response = llm_generator.generate(prompt, False, False) | |
| timecost = time.time() - beg_time | |
| adjust_query = response.choices[0].message.content | |
| logger.warning( | |
| f"For the query: '{query}', the refined query is '{adjust_query}'. The timecost is {timecost}" | |
| ) | |
| if hasattr(response, 'usage'): | |
| logger.warning( | |
| f"[Track token consumption] for refine_query: '{query}', usage={response.usage}" | |
| ) | |
| return adjust_query | |
| def search_documents(query: str, k: int) -> List[Tuple[Document, float]]: | |
| beg_time = time.time() | |
| results = vector_search.similarity_search_with_relevance_scores(query, k) | |
| timecost = time.time() - beg_time | |
| logger.warning( | |
| f"search_documents, query: '{query}', k: {k}, the timecost is {timecost}" | |
| ) | |
| return results | |
| def rerank_documents( | |
| query: str, results: List[Tuple[Document, | |
| float]]) -> List[Dict[str, Any]]: | |
| passages: List[Dict[str, Any]] = [] | |
| index = 1 | |
| for doc, chroma_score in results: | |
| item = { | |
| "id": index, | |
| "text": doc.page_content, | |
| "metadata": doc.metadata, | |
| "chroma_score": chroma_score | |
| } | |
| index += 1 | |
| passages.append(item) | |
| rerank_results = [] | |
| if passages: | |
| beg_time = time.time() | |
| rerankrequest = RerankRequest(query=query, passages=passages) | |
| rerank_results = reranker.rerank(rerankrequest) | |
| timecost = time.time() - beg_time | |
| logger.warning( | |
| f"For the query: '{query}', rerank_documents, the timecost is {timecost}" | |
| ) | |
| if USE_DEBUG: | |
| rerank_info = "\n--------------------\n".join([ | |
| f"ID: {item['id']}\nTEXT: {item['text']}\nMETADATA: {item['metadata']}\nCHROME_SCORE: {item['chroma_score']}\nSCORE: {item['score']}" | |
| for item in rerank_results | |
| ]) | |
| logger.info( | |
| f"==========\nFor the query: '{query}', the rerank results is:\n{rerank_info}\n==========" | |
| ) | |
| return rerank_results | |
| def filter_documents( | |
| results: List[Tuple[Document, float]], | |
| min_relevance_score: float) -> List[Tuple[Document, float]]: | |
| filter_results = [] | |
| for doc, score in results: | |
| if score >= min_relevance_score: | |
| filter_results.append((doc, score)) | |
| return filter_results | |
| def get_recall_documents( | |
| current_query, refined_query, k, user_id, | |
| min_relevance_score: float) -> List[Tuple[Document, float]]: | |
| if current_query == refined_query: | |
| ret = search_documents(current_query, k) | |
| results = filter_documents(ret, min_relevance_score) | |
| if USE_DEBUG: | |
| results_info = "\n********************\n".join([ | |
| f"URL: {doc.metadata['source']}\nscore: {score}\npage_content: {doc.page_content}" | |
| for doc, score in results | |
| ]) | |
| logger.info( | |
| f"==========\nFor the current_query: '{current_query}', '{user_id}', the recall results is\n{results_info}\n==========" | |
| ) | |
| return results | |
| with ThreadPoolExecutor() as executor: | |
| future_ret1 = executor.submit(search_documents, current_query, k) | |
| future_ret2 = executor.submit(search_documents, refined_query, k) | |
| ret1 = filter_documents(future_ret1.result(), min_relevance_score) | |
| ret2 = filter_documents(future_ret2.result(), min_relevance_score) | |
| if USE_DEBUG: | |
| results_info1 = "\n********************\n".join([ | |
| f"URL: {doc.metadata['source']}\nscore: {score}\npage_content: {doc.page_content}" | |
| for doc, score in ret1 | |
| ]) | |
| logger.info( | |
| f"==========\nFor the current_query: '{current_query}', '{user_id}', the recall results is\n{results_info1}\n==========" | |
| ) | |
| results_info2 = "\n********************\n".join([ | |
| f"URL: {doc.metadata['source']}\nscore: {score}\npage_content: {doc.page_content}" | |
| for doc, score in ret2 | |
| ]) | |
| logger.info( | |
| f"==========\nFor the refined_query: '{refined_query}', '{user_id}', the recall results is\n{results_info2}\n==========" | |
| ) | |
| ret = ret1 + ret2 | |
| results = [] | |
| source_id_set = set() | |
| for doc, chroma_score in ret: | |
| source_id = doc.metadata["id"] | |
| if source_id not in source_id_set: | |
| source_id_set.add(source_id) | |
| results.append((doc, chroma_score)) | |
| else: | |
| logger.warning(f"source_id: '{source_id}' is already existed!") | |
| return results | |
| def generate_answer(query: str, user_id: str, is_streaming: bool = False): | |
| bot_topic = BOT_TOPIC | |
| # Detect the language of the query | |
| lang = detect_query_lang(query) | |
| logger.warning(f"For query: '{query}', detect the language is '{lang}'!") | |
| history_context = f"""Human: Hello | |
| Assistant: I'm here to assist you with information related to `{bot_topic}`. If you have any specific questions about our services or need help, feel free to ask, and I'll do my best to provide you with accurate and relevant answers.""" | |
| # Get the history session from the cache | |
| history_session = get_user_query_history(user_id, is_streaming) | |
| if history_session: | |
| # Build the history context, showing user's historical queries and answers | |
| history_context = "\n--------------------\n".join([ | |
| f"**Human:** {item['query']}\n**Assistant:** {item['answer']}" | |
| for item in history_session | |
| ]) | |
| if USE_PREPROCESS_QUERY and history_context: | |
| adjust_query = refine_query(query, history_context, lang) | |
| else: | |
| adjust_query = query | |
| if USE_RERANKING: | |
| top_k = RERANK_RECALL_TOP_K | |
| else: | |
| top_k = RECALL_TOP_K | |
| results = get_recall_documents(query, adjust_query, top_k, user_id, | |
| MIN_RELEVANCE_SCORE) | |
| filter_context = '' | |
| # Build the context with filtered documents, showing relevant documents | |
| if USE_RERANKING and results: | |
| # Rerank the documents | |
| rerank_results = rerank_documents(query, results) | |
| if rerank_results: | |
| filter_context = "\n--------------------\n".join([ | |
| f"Citation URL: {doc['metadata']['source']}\nDocument Content: {doc['text']}" | |
| for doc in rerank_results[:RECALL_TOP_K] | |
| ]) | |
| else: | |
| if len(results) > 1: | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| if results: | |
| filter_context = "\n--------------------\n".join([ | |
| f"Citation URL: {doc.metadata['source']}\nDocument Content: {doc.page_content}" | |
| for doc, score in results[:RECALL_TOP_K] | |
| ]) | |
| if filter_context: | |
| context = f"""Chat History (Sorted by request time from most recent to oldest): | |
| {history_context} | |
| Documents Information: | |
| {filter_context} | |
| """ | |
| else: | |
| # When no directly related documents are found, provide standard friendly response and guidance | |
| fallback_answer = f"""No documents found directly related to the current question! | |
| Please provide the response in the following format and ensure that the 'answer' part is translated into the same language as the user's question: | |
| "I'm sorry, I cannot find a specific answer about '{query}' from the information provided. I'm here to assist you with information related to `{bot_topic}`. If you have any specific questions about our services or need help, feel free to ask, and I'll do my best to provide you with accurate and relevant answers." | |
| Please ensure: | |
| - If the user's question is a straightforward greeting, the assistant will offer a friendly standard response, guiding users to seek information or services related to `{bot_topic}`. Don't start with "I'm sorry, I cannot find a specific answer about '{query}' from the information provided.". | |
| - Maintain the context and meaning of the original message. | |
| - Respond in the language of the original question; for instance, reply in Chinese if the question was asked in Chinese and in English if it was asked in English!""" | |
| context = f"""Chat History (Sorted by request time from most recent to oldest): | |
| {history_context} | |
| Documents Information: | |
| {fallback_answer} | |
| """ | |
| if not is_streaming: | |
| answer_format_prompt = '''**Expected Response Format:** | |
| The response should be a JSON object, with 'answer' and 'source' fields. | |
| - "answer": "A detailed and specific answer, crafted in the question's language and fully formatted using **Markdown** syntax. **Don't repeat the question**". Only cite the most relevant Documents that answer the question accurately. | |
| - "source": ["List only unique `Citation URL` from the context that are directly related to the answer. Ensure that each URL is listed only once. If no documents are referenced, or the documents are not relevant, use an empty list []. The number of `Citation URL` should not exceed {RECALL_TOP_K}. The generated answer must have indeed used content from the document corresponding to the `Citation URL` before including that URL in the `source`; otherwise, the URL should not be included in the `source`."]''' | |
| else: | |
| answer_format_prompt = '''**Expected Response Format:** | |
| The response should be fully formatted using **Mardown** syntax (Note: Don't start with 'Answer:' or 'answer:'). First output the answer. Then output the Sources. | |
| - A detailed and specific answer, crafted in the question's language. Don't repeat the question. Only cite the most relevant Documents that answer the question accurately. | |
| - Sources: "List only unique `Citation URL` from the context that are directly related to the answer. Ensure that each URL is listed only once. If no documents are referenced, or the documents are not relevant, return ''. The number of `Citation URL` should not exceed {RECALL_TOP_K}. The generated answer must have indeed used content from the document corresponding to the `Citation URL` before including that URL in the `Sources`; otherwise, the URL should not be included in the `Sources`."''' | |
| prompt = f""" | |
| You are a smart customer service assistant and problem-solver, tasked to answer any question about `{bot_topic}`. Using the provided context, answer the user's question to the best of your ability using the resources provided. | |
| If the user's question is a straightforward greeting, the assistant will offer a friendly standard response, guiding users to seek information or services related to `{bot_topic}`. | |
| Base on the Chat History and the provided context. First, analyze the provided context information without assuming prior knowledge. Identify all relevant aspects of knowledge contained within. Then, from various perspectives and angles, answer questions as thoroughly and comprehensively as possible to better address and resolve the user's question. If the question is not related to the provided context, the user is informed that no relevant answer can be provided. | |
| **Question:** {query} | |
| **Context for Answering the Question:** | |
| {context} | |
| **Response Requirements:** | |
| - Don't repeat the question at the beginning. | |
| - If unsure about the answer, proactively seek clarification. | |
| - Ensure that answers are strictly based on the provided context. | |
| - Inform users that questions unrelated to the provided context cannot be answered. | |
| - Format the answer using Markdown syntax for clarity and readability. | |
| - Respond in the language of the original question; for instance, reply in Chinese if the question was asked in Chinese and in English if it was asked in English! | |
| **REMEMBER:** Please do not fabricate any knowledge. If you cannot get knowledge from the provided context, please directly state that you do not know, rather than constructing nonexistent and potentially fake information!!! | |
| **NOTE:** The detected language of the question is '{lang}'. Please respond in '{lang}'. | |
| {answer_format_prompt} | |
| Please format answer as follows: | |
| The answer must be fully formatted using Markdown syntax. This includes: | |
| - **Bold** (`**bold**`) and *italic* (`*italic*`) text for emphasis. | |
| - Unordered lists (`- item`) for itemization and ordered lists (`1. item`) for sequencing. | |
| - `Inline code` (`` `Inline code` ``) for brief code snippets and (` ``` `) for longer examples, specifying the programming language for syntax highlighting when possible. | |
| - [Hyperlinks](URL) (`[Hyperlinks](URL)`) to reference external sources. | |
| - Headings (`# Heading 1`, `## Heading 2`, ...) to structure the answer effectively. | |
| """ | |
| if USE_DEBUG: | |
| logger.info(f"$$$$$$$$$$\nPrompt is:\n{prompt}\n$$$$$$$$$$") | |
| if is_streaming: | |
| is_json = False | |
| else: | |
| is_json = True | |
| response = llm_generator.generate(prompt, is_streaming, is_json) | |
| return response | |
| def check_smart_query(f): | |
| def decorated_function(*args, **kwargs): | |
| data = request.json | |
| user_id = data.get('user_id') | |
| query = data.get('query') | |
| if not user_id or not query: | |
| logger.error(f"user_id and query are required") | |
| return { | |
| 'retcode': -20000, | |
| 'message': 'user_id and query are required', | |
| 'data': {} | |
| }, 400 | |
| request.user_id = user_id | |
| request.query = query | |
| request.intervene_data = None | |
| try: | |
| # Check if the query is in Cache | |
| key = f"open_kf:intervene:{query}" | |
| intervene_data = diskcache_client.get(key) | |
| if intervene_data: | |
| logger.info( | |
| f"For the query: '{query}' and user_id: '{user_id}', is hit in Cache, the intervene_data is {intervene_data}" | |
| ) | |
| request.intervene_data = intervene_data | |
| except Exception as e: | |
| logger.error( | |
| f"Cache exception {e} for user_id: '{user_id}' and query: '{query}'" | |
| ) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def smart_query(): | |
| try: | |
| user_id = request.user_id | |
| query = request.query | |
| intervene_data = request.intervene_data | |
| if intervene_data: | |
| # Start a new thread to execute saving history records asynchronously | |
| Thread(target=save_user_query_history, | |
| args=(user_id, query, intervene_data, False)).start() | |
| intervene_data_json = json.loads(intervene_data) | |
| return { | |
| "retcode": 0, | |
| "message": "success", | |
| "data": intervene_data_json | |
| } | |
| if len(query) > MAX_QUERY_LENGTH: | |
| query = query[:MAX_QUERY_LENGTH] | |
| beg_time = time.time() | |
| response = generate_answer(query, user_id, False) | |
| if hasattr(response, 'usage'): | |
| logger.warning( | |
| f"[Track token consumption] for smart_query: '{query}', usage={response.usage}" | |
| ) | |
| answer = response.choices[0].message.content | |
| #logger.warning(f"The answer is:\n{answer}") | |
| if LLM_NAME == 'ZhipuAI': | |
| #logger.warning(f"The answer is:\n{answer}") | |
| # Solve the result format problem of ZhipuAI | |
| if answer.startswith("```json"): | |
| answer = answer[7:] | |
| if answer.endswith("```"): | |
| answer = answer[:-3] | |
| timecost = time.time() - beg_time | |
| answer_json = json.loads(answer) | |
| answer_json["source"] = list(dict.fromkeys(answer_json["source"])) | |
| logger.success( | |
| f"For smart_query, query: '{query}' and user_id: '{user_id}', is processed successfully, the answer is:\n{answer}\nthe total timecost is {timecost}\n" | |
| ) | |
| # Start another new thread to execute saving history records asynchronously | |
| Thread(target=save_user_query_history, | |
| args=(user_id, query, answer, False)).start() | |
| return {"retcode": 0, "message": "success", "data": answer_json} | |
| except Exception as e: | |
| logger.error( | |
| f"For the query: '{query}' and user_id: '{user_id}', is processed failed, the exception is {e}" | |
| ) | |
| return {'retcode': -20001, 'message': str(e), 'data': {}} | |
| def smart_query_stream(): | |
| headers = { | |
| 'Content-Type': 'text/event-stream', | |
| 'Cache-Control': 'no-cache', | |
| 'X-Accel-Buffering': 'no' | |
| } | |
| try: | |
| user_id = request.user_id | |
| query = request.query | |
| intervene_data = request.intervene_data | |
| if intervene_data: | |
| save_user_query_history(user_id, query, intervene_data, True) | |
| def generate_intervene(): | |
| yield intervene_data | |
| return Response(generate_intervene(), | |
| mimetype="text/event-stream", | |
| headers=headers) | |
| if len(query) > MAX_QUERY_LENGTH: | |
| query = query[:MAX_QUERY_LENGTH] | |
| def generate_llm(): | |
| beg_time = time.time() | |
| answer_chunks = [] | |
| response = generate_answer(query, user_id, True) | |
| for chunk in response: | |
| #logger.info(f"chunk is: {chunk}") | |
| content = chunk.choices[0].delta.content | |
| if content: | |
| answer_chunks.append(content) | |
| # Send each answer segment | |
| yield content | |
| if hasattr(chunk, 'usage'): | |
| if chunk.usage: | |
| logger.warning( | |
| f"[Track token consumption of streaming] for smart_query_stream: '{query}', usage={chunk.usage}" | |
| ) | |
| # After the streaming response is complete, save to Cache and SQLite | |
| answer = ''.join(answer_chunks) | |
| timecost = time.time() - beg_time | |
| logger.success( | |
| f"query: '{query}' and user_id: '{user_id}' is processed successfully, the answer is:\n{answer}\nthe total timecost is {timecost}\n" | |
| ) | |
| save_user_query_history(user_id, query, answer, True) | |
| return Response(generate_llm(), | |
| mimetype="text/event-stream", | |
| headers=headers) | |
| except Exception as e: | |
| logger.error( | |
| f"query: '{query}' and user_id: '{user_id}' is processed failed, the exception is {e}" | |
| ) | |
| return {'retcode': -30000, 'message': str(e), 'data': {}} | |
| def get_user_conversation_list(): | |
| """Retrieve a list of user conversations within a specified time range, with pagination and total count.""" | |
| data = request.json | |
| start_timestamp = data.get('start_timestamp') | |
| end_timestamp = data.get('end_timestamp') | |
| page = data.get('page') | |
| page_size = data.get('page_size') | |
| if None in ([start_timestamp, end_timestamp, page, page_size]): | |
| return {'retcode': -20000, 'message': 'Missing required parameters'} | |
| if not isinstance(start_timestamp, int) or not isinstance( | |
| end_timestamp, int): | |
| return { | |
| 'retcode': -20001, | |
| 'message': 'Invalid start_timestamp or end_timestamp parameters', | |
| 'data': {} | |
| } | |
| if not isinstance(page, int) or not isinstance( | |
| page_size, int) or page < 1 or page_size < 1: | |
| return { | |
| 'retcode': -20001, | |
| 'message': 'Invalid page or page_size parameters', | |
| 'data': {} | |
| } | |
| conn = get_db_connection() | |
| try: | |
| cur = conn.cursor() | |
| offset = (page - 1) * page_size | |
| # First, get the total count of distinct user_ids within the time range for pagination | |
| cur.execute( | |
| """ | |
| SELECT COUNT(DISTINCT user_id) AS total_count FROM t_user_qa_record_tab | |
| WHERE ctime BETWEEN ? AND ? | |
| """, (start_timestamp, end_timestamp)) | |
| total_count = cur.fetchone()['total_count'] | |
| # Then, fetch the most recent conversation record for each distinct user within the time range | |
| cur.execute( | |
| ''' | |
| SELECT t.* FROM t_user_qa_record_tab t | |
| INNER JOIN ( | |
| SELECT user_id, MAX(ctime) AS max_ctime | |
| FROM t_user_qa_record_tab | |
| WHERE ctime BETWEEN ? AND ? | |
| GROUP BY user_id | |
| ) tm ON t.user_id = tm.user_id AND t.ctime = tm.max_ctime | |
| ORDER BY t.ctime DESC LIMIT ? OFFSET ? | |
| ''', (start_timestamp, end_timestamp, page_size, offset)) | |
| conversation_list = [{ | |
| "user_id": row["user_id"], | |
| "latest_query": { | |
| "id": row["id"], | |
| "query": row["query"], | |
| "answer": row["answer"], | |
| "source": json.loads(row["source"]), | |
| "ctime": row["ctime"], | |
| "mtime": row["mtime"] | |
| } | |
| } for row in cur.fetchall()] | |
| return { | |
| 'retcode': 0, | |
| 'message': 'Success', | |
| 'data': { | |
| 'total_count': total_count, | |
| 'conversation_list': conversation_list | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to retrieve user conversation list: {e}") | |
| return {'retcode': -30000, 'message': 'Internal server error'} | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_user_query_history_list(): | |
| data = request.json | |
| page = data.get('page') | |
| page_size = data.get('page_size') | |
| user_id = data.get('user_id') | |
| # Check for mandatory parameters | |
| if None in (page, page_size, user_id): | |
| logger.error("page, page_size and user_id are required") | |
| return { | |
| 'retcode': -20000, | |
| 'message': 'page, page_size and user_id are required', | |
| 'data': {} | |
| } | |
| try: | |
| # Convert timestamps and pagination parameters to integers | |
| page = int(page) | |
| page_size = int(page_size) | |
| except ValueError as e: | |
| logger.error(f"Parameter conversion error: {e}") | |
| return {'retcode': -20001, 'message': 'Invalid parameters', 'data': {}} | |
| # Build query conditions | |
| query_conditions = "WHERE user_id = ?" | |
| params = [user_id] | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| # First, query the total count of records under the given conditions | |
| cur.execute( | |
| f'SELECT COUNT(*) FROM t_user_qa_record_tab {query_conditions}', | |
| params) | |
| total_count = cur.fetchone()[0] | |
| # Then, query the paginated records | |
| cur.execute( | |
| f'SELECT * FROM t_user_qa_record_tab {query_conditions} ORDER BY id LIMIT ? OFFSET ?', | |
| params + [page_size, (page - 1) * page_size]) | |
| rows = cur.fetchall() | |
| # Convert rows to dictionaries | |
| record_list = [dict(row) for row in rows] | |
| # Apply json.loads on the 'source' field of each record | |
| for record in record_list: | |
| if 'source' in record: # Ensure the 'source' key exists | |
| try: | |
| # Convert JSON string to Python list | |
| record['source'] = json.loads(record['source']) | |
| except json.JSONDecodeError: | |
| # If decoding fails, set to an empty list or other default value | |
| record['source'] = [] | |
| return { | |
| "retcode": 0, | |
| "message": "success", | |
| "data": { | |
| "total_count": total_count, | |
| "query_list": record_list | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Database exception: {e}") | |
| return {'retcode': -30000, 'message': 'Database exception', 'data': {}} | |
| finally: | |
| if conn: | |
| conn.close() | |