| from entity import Docs, Cluster, Preprocess, SummaryInput
|
| from fastapi import FastAPI
|
| import time
|
| import hashlib
|
| import json
|
| from fastapi.middleware.cors import CORSMiddleware
|
|
|
| from function import topic_clustering_v2 as tc
|
| from iclibs.ic_rabbit import ICRabbitMQ
|
| from get_config import config_params
|
|
|
| app = FastAPI()
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
| def get_hash_id(item: Docs):
|
| str_hash = ""
|
| for it in item.response["docs"]:
|
| str_hash += it["url"]
|
| str_hash += str(item.top_cluster)
|
| str_hash += str(item.top_sentence)
|
| str_hash += str(item.topn_summary)
|
| str_hash += str(item.top_doc)
|
| str_hash += str(item.threshold)
|
| if item.sorted_field.strip():
|
| str_hash += str(item.sorted_field)
|
| if item.delete_message:
|
| str_hash += str(item.delete_message)
|
| return hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
|
|
|
|
|
| try:
|
| with open("log_run/log.txt") as f:
|
| data_dict = json.load(f)
|
| except Exception as ve:
|
| print(ve)
|
| data_dict = {}
|
|
|
|
|
| @app.post("/newsanalysis/topic_clustering")
|
| async def topic_clustering(item: Docs):
|
| docs = item.response["docs"]
|
|
|
| print("start ")
|
| print("len doc: ", len(docs))
|
| st_time = time.time()
|
| top_cluster = item.top_cluster
|
| top_sentence = item.top_sentence
|
| topn_summary = item.topn_summary
|
| sorted_field = item.sorted_field
|
| max_doc_per_cluster = item.max_doc_per_cluster
|
| hash_str = get_hash_id(item)
|
|
|
|
|
|
|
|
|
|
|
|
|
| print(hash_str)
|
| if len(docs) > 200:
|
|
|
| try:
|
| if hash_str in data_dict:
|
| path_res = data_dict[hash_str]["response_path"]
|
| with open(path_res) as ff:
|
| results = json.load(ff)
|
| print("time analysis (cache): ", time.time() - st_time)
|
| return results
|
| except Exception as vee:
|
| print(vee)
|
|
|
| results = tc.topic_clustering(docs, item.threshold, top_cluster=top_cluster, top_sentence=top_sentence,
|
| topn_summary=topn_summary, sorted_field=sorted_field, max_doc_per_cluster=max_doc_per_cluster, delete_message=item.delete_message)
|
| path_res = "log/result_{0}.txt".format(hash_str)
|
| with open(path_res, "w+") as ff:
|
| ff.write(json.dumps(results))
|
| data_dict[hash_str] = {"time": st_time, "response_path": path_res}
|
|
|
| lst_rm = []
|
| for dt in data_dict:
|
| if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600:
|
| lst_rm.append(dt)
|
| for dt in lst_rm:
|
| del data_dict[dt]
|
| with open("log_run/log.txt", "w+") as ff:
|
| ff.write(json.dumps(data_dict))
|
| print("time analysis: ", time.time() - st_time)
|
| return results
|
|
|
| def init_rabbit_queue(usr, passw, host, vir_host, queue_name, durable, max_priority, exchange=""):
|
| connection = ICRabbitMQ(host, vir_host, usr, passw)
|
| connection.init_connection()
|
| channel = connection.init_queue(
|
| queue_name, exchange=exchange, durable=durable, max_priority=max_priority)
|
| return channel, connection, queue_name
|
|
|
|
|
| @app.post("/newsanalysis/topic_clustering_2")
|
| async def topic_clustering_v2(item: Docs):
|
| docs = item.response["docs"]
|
| meta = item.response.get('meta', {})
|
|
|
| print("start ")
|
| print("len doc: ", len(docs))
|
| st_time = time.time()
|
| top_cluster = item.top_cluster
|
| top_sentence = item.top_sentence
|
| topn_summary = item.topn_summary
|
| hash_str = get_hash_id(item)
|
|
|
|
|
|
|
|
|
|
|
|
|
| data_push = {
|
| "docs": docs,
|
| "threshold": item.threshold,
|
| "top_cluster": top_cluster,
|
| "top_sentence": top_sentence,
|
| "topn_summary": topn_summary,
|
| "hash_str": hash_str,
|
| "st_time": st_time,
|
| "meta": meta
|
| }
|
| params = config_params['queue_topic_clustering']
|
| usr_name = params["usr_name"]
|
| password = str(params["password"])
|
| host = params["host"]
|
| virtual_host = params["virtual_host"]
|
| queue_name = params["queue_name"]
|
|
|
| channel_consumer, rb_consumer, queue_consumer = init_rabbit_queue(usr_name, password, host, virtual_host, queue_name, True, 10)
|
|
|
| ICRabbitMQ.publish_message(channel_consumer, queue_consumer, data_push, priority= 1,delivery_mode=2, exchange='')
|
| return {"message":"success"}
|
|
|
|
|
| @app.post("/newsanalysis/topic_clustering_with_preprocess")
|
| async def topic_clustering_with_preprocess(item: Preprocess):
|
| with open('preprocess.json','w') as f:
|
| json.dump(item.__dict__,f,ensure_ascii=False)
|
|
|
| data_push = {
|
| "type": item.type,
|
| "threshold": item.threshold,
|
| "top_cluster": item.top_cluster,
|
| "benchmark_id": item.benchmark_id,
|
| "benchmark_children_id": item.benchmark_children_id,
|
| "source_tagids": item.source_tagids,
|
| "preprocess": item.preprocess,
|
| "meta": item.meta
|
|
|
| }
|
| params = config_params['queue_merge_clustering']
|
| usr_name = params["usr_name"]
|
| password = str(params["password"])
|
| host = params["host"]
|
| virtual_host = params["virtual_host"]
|
| queue_name = params["queue_name"]
|
|
|
| channel_consumer, rb_consumer, queue_consumer = init_rabbit_queue(usr_name, password, host, virtual_host, queue_name, True, 10)
|
|
|
| ICRabbitMQ.publish_message(channel_consumer, queue_consumer, data_push, priority= 1,delivery_mode=2, exchange='')
|
| return {"message":"success"}
|
|
|
|
|
| @app.post("/newsanalysis/summary")
|
| async def summary(item: SummaryInput):
|
| try:
|
| summary_txt = tc.get_summary_bert(item.text, lang=None, topn=item.topn, ratio = 0.2)
|
| except Exception as ex:
|
|
|
| summary_txt = ''
|
| return summary_txt
|
|
|