Spaces:
Sleeping
Sleeping
| import json | |
| import time | |
| from flask import Blueprint, request | |
| from server.app.utils.decorators import token_required | |
| from server.app.utils.sqlite_client import get_db_connection | |
| from server.app.utils.diskcache_client import diskcache_client | |
| from server.app.utils.diskcache_lock import diskcache_lock | |
| from server.logger.logger_config import my_logger as logger | |
| intervention_bp = Blueprint('intervention', | |
| __name__, | |
| url_prefix='/open_kf_api/intervention') | |
| def add_intervene_record(): | |
| data = request.json | |
| query = data.get('query') | |
| intervene_answer = data.get('intervene_answer') | |
| source = data.get('source', []) | |
| if None in (query, intervene_answer, source): | |
| return { | |
| 'retcode': -20000, | |
| 'message': 'Missing mandatory parameters', | |
| 'data': {} | |
| } | |
| conn = None | |
| try: | |
| # Check if query already exists in the database | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| cur.execute( | |
| 'SELECT COUNT(*) FROM t_user_qa_intervene_tab WHERE query = ?', | |
| (query, )) | |
| result = cur.fetchone() | |
| if result and result[0] > 0: | |
| logger.error( | |
| f"intervene query:'{query}' is already exists in the database") | |
| return { | |
| 'retcode': -30000, | |
| 'message': 'Query already exists in the database', | |
| 'data': {} | |
| } | |
| # Insert the intervene record into DB | |
| timestamp = int(time.time()) | |
| source_str = json.dumps(source) | |
| try: | |
| with diskcache_lock.lock(): | |
| cur.execute( | |
| 'INSERT INTO t_user_qa_intervene_tab (query, intervene_answer, source, ctime, mtime) VALUES (?, ?, ?, ?, ?)', | |
| (query, intervene_answer, source_str, timestamp, | |
| timestamp)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"process discache_lock exception:{e}") | |
| return { | |
| 'retcode': -30000, | |
| 'message': f'An error occurred: {e}', | |
| 'data': {} | |
| } | |
| # Update Cache using simple string with the query as the key (prefixed) | |
| key = f"open_kf:intervene:{query}" | |
| value = json.dumps({"answer": intervene_answer, "source": source}) | |
| diskcache_client.set(key, value) | |
| return {"retcode": 0, "message": "success", 'data': {}} | |
| except Exception as e: | |
| return { | |
| 'retcode': -30000, | |
| 'message': 'Database or Cache error', | |
| 'data': {} | |
| } | |
| finally: | |
| if conn: | |
| conn.close() | |
| def delete_intervene_record(): | |
| data = request.json | |
| record_id = data.get('id') | |
| if not record_id: | |
| return {'retcode': -20000, 'message': 'id is required', 'data': {}} | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| # First, find the query string for the given id to delete it from Cache | |
| cur.execute('SELECT query FROM t_user_qa_intervene_tab WHERE id = ?', | |
| (record_id, )) | |
| row = cur.fetchone() | |
| if row: | |
| query = row['query'] | |
| # Delete the record from DB | |
| try: | |
| with diskcache_lock.lock(): | |
| cur.execute( | |
| 'DELETE FROM t_user_qa_intervene_tab WHERE id = ?', | |
| (record_id, )) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"process g_discache_lock exception:{e}") | |
| return { | |
| 'retcode': -30000, | |
| 'message': f'An error occurred: {e}', | |
| 'data': {} | |
| } | |
| # Now, delete the corresponding record from Cache | |
| key = f"open_kf:intervene:{query}" | |
| diskcache_client.delete(key) | |
| return {"retcode": 0, "message": "success", 'data': {}} | |
| else: | |
| return { | |
| 'retcode': -20001, | |
| 'message': 'Record not found', | |
| 'data': {} | |
| } | |
| except Exception as e: | |
| return {'retcode': -30000, 'message': 'Database error', 'data': {}} | |
| finally: | |
| if conn: | |
| conn.close() | |
| def batch_delete_intervene_record(): | |
| data = request.json | |
| id_list = data.get('id_list') | |
| if not id_list or not isinstance(id_list, list) or len(id_list) == 0: | |
| return { | |
| 'retcode': -20000, | |
| 'message': 'Missing or invalid mandatory parameter: id_list', | |
| 'data': {} | |
| } | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| # Retrieve the queries to delete their corresponding Cache entries | |
| cur.execute( | |
| f'SELECT query FROM t_user_qa_intervene_tab WHERE id IN ({",".join(["?"]*len(id_list))})', | |
| id_list) | |
| rows = cur.fetchall() | |
| for row in rows: | |
| query = row['query'] | |
| key = f"open_kf:intervene:{query}" | |
| diskcache_client.delete(key) | |
| # Then, batch delete from DB | |
| try: | |
| with diskcache_lock.lock(): | |
| cur.execute( | |
| f'DELETE FROM t_user_qa_intervene_tab WHERE id IN ({",".join(["?"]*len(id_list))})', | |
| id_list) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"process discache_lock exception:{e}") | |
| return { | |
| 'retcode': -30000, | |
| 'message': f'An error occurred: {e}', | |
| 'data': {} | |
| } | |
| return {"retcode": 0, "message": "success", 'data': {}} | |
| except Exception as e: | |
| return {'retcode': -30000, 'message': 'Database error', 'data': {}} | |
| finally: | |
| if conn: | |
| conn.close() | |
| def update_intervene_record(): | |
| data = request.json | |
| record_id = data.get('id') | |
| intervene_answer = data.get('intervene_answer') | |
| source = data.get('source', []) | |
| if None in (record_id, intervene_answer, source): | |
| return { | |
| 'retcode': -20000, | |
| 'message': 'Missing or invalid mandatory parameters', | |
| 'data': {} | |
| } | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| # Convert the source list to a JSON string for storing in DB | |
| source_json = json.dumps(source) | |
| timestamp = int(time.time()) | |
| # Update the DB record | |
| try: | |
| with diskcache_lock.lock(): | |
| cur.execute( | |
| 'UPDATE t_user_qa_intervene_tab SET intervene_answer = ?, source = ?, mtime = ? WHERE id = ?', | |
| (intervene_answer, source_json, timestamp, record_id)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"process discache_lock exception:{e}") | |
| return { | |
| 'retcode': -30000, | |
| 'message': f'An error occurred: {e}', | |
| 'data': {} | |
| } | |
| # Retrieve the query text to update the corresponding Cache entry | |
| cur.execute('SELECT query FROM t_user_qa_intervene_tab WHERE id = ?', | |
| (record_id, )) | |
| row = cur.fetchone() | |
| if row: | |
| query = row['query'] | |
| key = f"open_kf:intervene:{query}" | |
| value = json.dumps({"answer": intervene_answer, "source": source}) | |
| diskcache_client.set(key, value) | |
| else: | |
| return { | |
| 'retcode': -20001, | |
| 'message': 'Record not found', | |
| 'data': {} | |
| } | |
| return {"retcode": 0, "message": "success", 'data': {}} | |
| except Exception as e: | |
| return {'retcode': -30000, 'message': 'Database error', 'data': {}} | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_intervene_query_list(): | |
| data = request.json | |
| start_timestamp = data.get('start_timestamp') | |
| end_timestamp = data.get('end_timestamp') | |
| page = data.get('page') | |
| page_size = data.get('page_size') | |
| # Validate mandatory parameters | |
| if None in (start_timestamp, end_timestamp, page, page_size): | |
| return { | |
| 'retcode': -20000, | |
| 'message': 'Missing mandatory parameters', | |
| 'data': {} | |
| } | |
| if not isinstance(start_timestamp, int) or not isinstance( | |
| end_timestamp, int): | |
| return { | |
| 'retcode': -20001, | |
| 'message': 'Invalid start_timestamp or end_timestamp parameters', | |
| 'data': {} | |
| } | |
| if not isinstance(page, int) or not isinstance( | |
| page_size, int) or page < 1 or page_size < 1: | |
| return { | |
| 'retcode': -20001, | |
| 'message': 'Invalid page or page_size parameters', | |
| 'data': {} | |
| } | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| # Calculate total count | |
| cur.execute( | |
| 'SELECT COUNT(*) FROM t_user_qa_intervene_tab WHERE ctime BETWEEN ? AND ?', | |
| (start_timestamp, end_timestamp)) | |
| total_count = cur.fetchone()[0] | |
| # Calculate the starting point for the query | |
| start = (page - 1) * page_size | |
| # Retrieve the specified page of records | |
| cur.execute( | |
| ''' | |
| SELECT id, query, intervene_answer, source, ctime, mtime | |
| FROM t_user_qa_intervene_tab | |
| WHERE ctime BETWEEN ? AND ? | |
| ORDER BY ctime DESC | |
| LIMIT ? OFFSET ?''', | |
| (start_timestamp, end_timestamp, page_size, start)) | |
| rows = cur.fetchall() | |
| # Convert rows to dictionaries | |
| record_list = [dict(row) for row in rows] | |
| # Apply json.loads on the 'source' field of each record | |
| for record in record_list: | |
| if 'source' in record: # Ensure the 'source' key exists | |
| try: | |
| # Convert JSON string to Python list | |
| record['source'] = json.loads(record['source']) | |
| except json.JSONDecodeError: | |
| # If decoding fails, set to an empty list or other default value | |
| record['source'] = [] | |
| return { | |
| "retcode": 0, | |
| "message": "success", | |
| "data": { | |
| "total_count": total_count, | |
| "intervene_list": record_list | |
| } | |
| } | |
| except Exception as e: | |
| return {'retcode': -30000, 'message': 'Database error', 'data': {}} | |
| finally: | |
| if conn: | |
| conn.close() | |