|
|
|
|
|
|
|
|
import requests |
|
|
import pickle |
|
|
import base64 |
|
|
import json, os |
|
|
|
|
|
def serialize_feature(feature): |
|
|
if feature is None: |
|
|
return None |
|
|
return base64.encodebytes(pickle.dumps(feature)).decode('ascii') |
|
|
|
|
|
def deserialize_feature(feature): |
|
|
if feature is None: |
|
|
return None |
|
|
return pickle.loads(base64.decodebytes(feature.encode('ascii'))) |
|
|
|
|
|
class RemoteDatabase: |
|
|
def __init__(self): |
|
|
self.url = 'http://110.40.175.218:6007/' |
|
|
|
|
|
pass |
|
|
|
|
|
def top_k_search(self, query_feature, attribute='clip_feature', top_k=15): |
|
|
url = self.url + 'top_k_search' |
|
|
query_feature = serialize_feature(query_feature) |
|
|
response = requests.post(url, |
|
|
data=json.dumps({"feature":query_feature, "attribute":attribute, "top_k":top_k}), |
|
|
headers={'Content-Type': 'application/json'}) |
|
|
response.encoding = 'utf-8' |
|
|
return json.loads(response.text) |
|
|
|
|
|
def search_by_en_keyword(self, keyword): |
|
|
url = self.url + 'search_by_en_keyword' |
|
|
response = requests.post(url, |
|
|
data=json.dumps({"keyword":keyword}), |
|
|
headers={'Content-Type': 'application/json'}) |
|
|
response.encoding = 'utf-8' |
|
|
return json.loads(response.text) |
|
|
|
|
|
def random_sample(self, n): |
|
|
url = self.url + 'random_sample' |
|
|
response = requests.post(url, |
|
|
data=json.dumps({"number":n}), |
|
|
headers={'Content-Type': 'application/json'}) |
|
|
response.encoding = 'utf-8' |
|
|
result = json.loads(response.text) |
|
|
for r in result: |
|
|
image_name = r['image_name'] |
|
|
image_name = os.path.basename(image_name) |
|
|
self.download_file(image_name, os.path.join('temp_images', image_name)) |
|
|
return result |
|
|
|
|
|
def get_top_founders(self, word="", top_k=20): |
|
|
url = self.url + 'get_top_founders' |
|
|
response = requests.post(url, |
|
|
data=json.dumps({'top_k':top_k, 'word':word}), |
|
|
headers={'Content-Type': 'application/json'}) |
|
|
response.encoding = 'utf-8' |
|
|
return json.loads(response.text) |
|
|
|
|
|
def set_founders(self, word, founder, enforce=False): |
|
|
url = self.url + 'set_founder' |
|
|
response = requests.post(url, |
|
|
data=json.dumps({'founder':founder, 'word':word}), |
|
|
headers={'Content-Type': 'application/json'}) |
|
|
response.encoding = 'utf-8' |
|
|
return json.loads(response.text) |
|
|
|
|
|
def add_data(self, img_data, text_data, img_feature, text_feature): |
|
|
url = self.url + 'add_data' |
|
|
img_feature = serialize_feature(img_feature) |
|
|
text_feature = serialize_feature(text_feature) |
|
|
response = requests.post(url, |
|
|
data=json.dumps({"img_data":img_data, "text_data":text_data, "img_feature":img_feature, "text_feature":text_feature}), |
|
|
headers={'Content-Type': 'application/json'}) |
|
|
response.encoding = 'utf-8' |
|
|
return json.loads(response.text) |
|
|
|
|
|
def add_file(self, file_path): |
|
|
url = self.url + 'add_file' |
|
|
response = requests.post(url, |
|
|
data = {}, |
|
|
files = {'file': open(file_path, 'rb')}, |
|
|
stream=True) |
|
|
response.encoding = 'utf-8' |
|
|
return json.loads(response.text) |
|
|
|
|
|
def download_file(self, file_name, save_path): |
|
|
url = self.url + 'download_file' |
|
|
params = {'file_name': file_name} |
|
|
|
|
|
response = requests.post(url, data=params, stream=True) |
|
|
|
|
|
if response.status_code == 200: |
|
|
with open(save_path, 'wb') as file: |
|
|
for chunk in response.iter_content(1024): |
|
|
if chunk: |
|
|
file.write(chunk) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
db = RemoteDatabase() |
|
|
db.download_file('xyxmi1lr55pm.jpg', './test.jpg') |
|
|
|