File size: 4,126 Bytes
0117cec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# -*- coding: utf-8 -*-
import requests
import pickle
import base64
import json, os
def serialize_feature(feature):
if feature is None:
return None
return base64.encodebytes(pickle.dumps(feature)).decode('ascii')
def deserialize_feature(feature):
if feature is None:
return None
return pickle.loads(base64.decodebytes(feature.encode('ascii')))
class RemoteDatabase:
def __init__(self):
self.url = 'http://110.40.175.218:6007/'
#self.url = 'http://127.0.0.1:6007/'
pass
def top_k_search(self, query_feature, attribute='clip_feature', top_k=15):
url = self.url + 'top_k_search'
query_feature = serialize_feature(query_feature)
response = requests.post(url,
data=json.dumps({"feature":query_feature, "attribute":attribute, "top_k":top_k}),
headers={'Content-Type': 'application/json'})
response.encoding = 'utf-8'
return json.loads(response.text)
def search_by_en_keyword(self, keyword):
url = self.url + 'search_by_en_keyword'
response = requests.post(url,
data=json.dumps({"keyword":keyword}),
headers={'Content-Type': 'application/json'})
response.encoding = 'utf-8'
return json.loads(response.text)
def random_sample(self, n):
url = self.url + 'random_sample'
response = requests.post(url,
data=json.dumps({"number":n}),
headers={'Content-Type': 'application/json'})
response.encoding = 'utf-8'
result = json.loads(response.text)
for r in result:
image_name = r['image_name']
image_name = os.path.basename(image_name)
self.download_file(image_name, os.path.join('temp_images', image_name))
return result
def get_top_founders(self, word="", top_k=20):
url = self.url + 'get_top_founders'
response = requests.post(url,
data=json.dumps({'top_k':top_k, 'word':word}),
headers={'Content-Type': 'application/json'})
response.encoding = 'utf-8'
return json.loads(response.text)
def set_founders(self, word, founder, enforce=False):
url = self.url + 'set_founder'
response = requests.post(url,
data=json.dumps({'founder':founder, 'word':word}),
headers={'Content-Type': 'application/json'})
response.encoding = 'utf-8'
return json.loads(response.text)
def add_data(self, img_data, text_data, img_feature, text_feature):
url = self.url + 'add_data'
img_feature = serialize_feature(img_feature)
text_feature = serialize_feature(text_feature)
response = requests.post(url,
data=json.dumps({"img_data":img_data, "text_data":text_data, "img_feature":img_feature, "text_feature":text_feature}),
headers={'Content-Type': 'application/json'})
response.encoding = 'utf-8'
return json.loads(response.text)
def add_file(self, file_path):
url = self.url + 'add_file'
response = requests.post(url,
data = {},
files = {'file': open(file_path, 'rb')},
stream=True)
response.encoding = 'utf-8'
return json.loads(response.text)
def download_file(self, file_name, save_path):
url = self.url + 'download_file'
params = {'file_name': file_name}
response = requests.post(url, data=params, stream=True)
if response.status_code == 200:
with open(save_path, 'wb') as file:
for chunk in response.iter_content(1024):
if chunk:
file.write(chunk)
if __name__ == '__main__':
db = RemoteDatabase()
db.download_file('xyxmi1lr55pm.jpg', './test.jpg')
|