kyobody-chatbot-api / server /app /intervention.py
dahyedahye's picture
.
1161dd2
import json
import time
from flask import Blueprint, request
from server.app.utils.decorators import token_required
from server.app.utils.sqlite_client import get_db_connection
from server.app.utils.diskcache_client import diskcache_client
from server.app.utils.diskcache_lock import diskcache_lock
from server.logger.logger_config import my_logger as logger
intervention_bp = Blueprint('intervention',
__name__,
url_prefix='/open_kf_api/intervention')
@intervention_bp.route('/add_intervene_record', methods=['POST'])
@token_required
def add_intervene_record():
data = request.json
query = data.get('query')
intervene_answer = data.get('intervene_answer')
source = data.get('source', [])
if None in (query, intervene_answer, source):
return {
'retcode': -20000,
'message': 'Missing mandatory parameters',
'data': {}
}
conn = None
try:
# Check if query already exists in the database
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
'SELECT COUNT(*) FROM t_user_qa_intervene_tab WHERE query = ?',
(query, ))
result = cur.fetchone()
if result and result[0] > 0:
logger.error(
f"intervene query:'{query}' is already exists in the database")
return {
'retcode': -30000,
'message': 'Query already exists in the database',
'data': {}
}
# Insert the intervene record into DB
timestamp = int(time.time())
source_str = json.dumps(source)
try:
with diskcache_lock.lock():
cur.execute(
'INSERT INTO t_user_qa_intervene_tab (query, intervene_answer, source, ctime, mtime) VALUES (?, ?, ?, ?, ?)',
(query, intervene_answer, source_str, timestamp,
timestamp))
conn.commit()
except Exception as e:
logger.error(f"process discache_lock exception:{e}")
return {
'retcode': -30000,
'message': f'An error occurred: {e}',
'data': {}
}
# Update Cache using simple string with the query as the key (prefixed)
key = f"open_kf:intervene:{query}"
value = json.dumps({"answer": intervene_answer, "source": source})
diskcache_client.set(key, value)
return {"retcode": 0, "message": "success", 'data': {}}
except Exception as e:
return {
'retcode': -30000,
'message': 'Database or Cache error',
'data': {}
}
finally:
if conn:
conn.close()
@intervention_bp.route('/delete_intervene_record', methods=['POST'])
@token_required
def delete_intervene_record():
data = request.json
record_id = data.get('id')
if not record_id:
return {'retcode': -20000, 'message': 'id is required', 'data': {}}
conn = None
try:
conn = get_db_connection()
cur = conn.cursor()
# First, find the query string for the given id to delete it from Cache
cur.execute('SELECT query FROM t_user_qa_intervene_tab WHERE id = ?',
(record_id, ))
row = cur.fetchone()
if row:
query = row['query']
# Delete the record from DB
try:
with diskcache_lock.lock():
cur.execute(
'DELETE FROM t_user_qa_intervene_tab WHERE id = ?',
(record_id, ))
conn.commit()
except Exception as e:
logger.error(f"process g_discache_lock exception:{e}")
return {
'retcode': -30000,
'message': f'An error occurred: {e}',
'data': {}
}
# Now, delete the corresponding record from Cache
key = f"open_kf:intervene:{query}"
diskcache_client.delete(key)
return {"retcode": 0, "message": "success", 'data': {}}
else:
return {
'retcode': -20001,
'message': 'Record not found',
'data': {}
}
except Exception as e:
return {'retcode': -30000, 'message': 'Database error', 'data': {}}
finally:
if conn:
conn.close()
@intervention_bp.route('/batch_delete_intervene_record', methods=['POST'])
@token_required
def batch_delete_intervene_record():
data = request.json
id_list = data.get('id_list')
if not id_list or not isinstance(id_list, list) or len(id_list) == 0:
return {
'retcode': -20000,
'message': 'Missing or invalid mandatory parameter: id_list',
'data': {}
}
conn = None
try:
conn = get_db_connection()
cur = conn.cursor()
# Retrieve the queries to delete their corresponding Cache entries
cur.execute(
f'SELECT query FROM t_user_qa_intervene_tab WHERE id IN ({",".join(["?"]*len(id_list))})',
id_list)
rows = cur.fetchall()
for row in rows:
query = row['query']
key = f"open_kf:intervene:{query}"
diskcache_client.delete(key)
# Then, batch delete from DB
try:
with diskcache_lock.lock():
cur.execute(
f'DELETE FROM t_user_qa_intervene_tab WHERE id IN ({",".join(["?"]*len(id_list))})',
id_list)
conn.commit()
except Exception as e:
logger.error(f"process discache_lock exception:{e}")
return {
'retcode': -30000,
'message': f'An error occurred: {e}',
'data': {}
}
return {"retcode": 0, "message": "success", 'data': {}}
except Exception as e:
return {'retcode': -30000, 'message': 'Database error', 'data': {}}
finally:
if conn:
conn.close()
@intervention_bp.route('/update_intervene_record', methods=['POST'])
@token_required
def update_intervene_record():
data = request.json
record_id = data.get('id')
intervene_answer = data.get('intervene_answer')
source = data.get('source', [])
if None in (record_id, intervene_answer, source):
return {
'retcode': -20000,
'message': 'Missing or invalid mandatory parameters',
'data': {}
}
conn = None
try:
conn = get_db_connection()
cur = conn.cursor()
# Convert the source list to a JSON string for storing in DB
source_json = json.dumps(source)
timestamp = int(time.time())
# Update the DB record
try:
with diskcache_lock.lock():
cur.execute(
'UPDATE t_user_qa_intervene_tab SET intervene_answer = ?, source = ?, mtime = ? WHERE id = ?',
(intervene_answer, source_json, timestamp, record_id))
conn.commit()
except Exception as e:
logger.error(f"process discache_lock exception:{e}")
return {
'retcode': -30000,
'message': f'An error occurred: {e}',
'data': {}
}
# Retrieve the query text to update the corresponding Cache entry
cur.execute('SELECT query FROM t_user_qa_intervene_tab WHERE id = ?',
(record_id, ))
row = cur.fetchone()
if row:
query = row['query']
key = f"open_kf:intervene:{query}"
value = json.dumps({"answer": intervene_answer, "source": source})
diskcache_client.set(key, value)
else:
return {
'retcode': -20001,
'message': 'Record not found',
'data': {}
}
return {"retcode": 0, "message": "success", 'data': {}}
except Exception as e:
return {'retcode': -30000, 'message': 'Database error', 'data': {}}
finally:
if conn:
conn.close()
@intervention_bp.route('/get_intervene_query_list', methods=['POST'])
@token_required
def get_intervene_query_list():
data = request.json
start_timestamp = data.get('start_timestamp')
end_timestamp = data.get('end_timestamp')
page = data.get('page')
page_size = data.get('page_size')
# Validate mandatory parameters
if None in (start_timestamp, end_timestamp, page, page_size):
return {
'retcode': -20000,
'message': 'Missing mandatory parameters',
'data': {}
}
if not isinstance(start_timestamp, int) or not isinstance(
end_timestamp, int):
return {
'retcode': -20001,
'message': 'Invalid start_timestamp or end_timestamp parameters',
'data': {}
}
if not isinstance(page, int) or not isinstance(
page_size, int) or page < 1 or page_size < 1:
return {
'retcode': -20001,
'message': 'Invalid page or page_size parameters',
'data': {}
}
conn = None
try:
conn = get_db_connection()
cur = conn.cursor()
# Calculate total count
cur.execute(
'SELECT COUNT(*) FROM t_user_qa_intervene_tab WHERE ctime BETWEEN ? AND ?',
(start_timestamp, end_timestamp))
total_count = cur.fetchone()[0]
# Calculate the starting point for the query
start = (page - 1) * page_size
# Retrieve the specified page of records
cur.execute(
'''
SELECT id, query, intervene_answer, source, ctime, mtime
FROM t_user_qa_intervene_tab
WHERE ctime BETWEEN ? AND ?
ORDER BY ctime DESC
LIMIT ? OFFSET ?''',
(start_timestamp, end_timestamp, page_size, start))
rows = cur.fetchall()
# Convert rows to dictionaries
record_list = [dict(row) for row in rows]
# Apply json.loads on the 'source' field of each record
for record in record_list:
if 'source' in record: # Ensure the 'source' key exists
try:
# Convert JSON string to Python list
record['source'] = json.loads(record['source'])
except json.JSONDecodeError:
# If decoding fails, set to an empty list or other default value
record['source'] = []
return {
"retcode": 0,
"message": "success",
"data": {
"total_count": total_count,
"intervene_list": record_list
}
}
except Exception as e:
return {'retcode': -30000, 'message': 'Database error', 'data': {}}
finally:
if conn:
conn.close()