| import json | |
| from typing import List, Union, Dict | |
| from urllib.parse import urljoin | |
| import requests | |
| class AppSearchClient: | |
| def __init__(self): | |
| self.appsearch_endpoint = "https://fgm-v2.ent.eastus2.azure.elastic-cloud.com" | |
| self.appsearch_private_key = "private-dzf1pbcssw97hxkm3wxbdrpu" | |
| self.headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.appsearch_private_key}", | |
| } | |
| assert self.appsearch_endpoint is not None | |
| assert self.appsearch_private_key is not None | |
| def list_all_engines(self) -> List[str]: | |
| ENGINES_URL = "/api/as/v1/engines/" | |
| request_url = urljoin(self.appsearch_endpoint, ENGINES_URL) | |
| MAX_DOCS_PER_PAGE = 10 | |
| current_page = 1 | |
| while True: | |
| params = ( | |
| ("page[size]", f"{MAX_DOCS_PER_PAGE}"), | |
| ("page[current]", f"{current_page}"), | |
| ) | |
| r = requests.get(request_url, headers=self.headers, params=params).json() | |
| for item in r["results"]: | |
| yield item["name"] | |
| current_page += 1 | |
| if not len(r["results"]): | |
| break | |
| def create_engine(self, name) -> requests.Response: | |
| ENGINES_URL = "/api/as/v1/engines/" | |
| request_url = urljoin(self.appsearch_endpoint, ENGINES_URL) | |
| data = json.dumps({"name": name}, indent=4, sort_keys=True) | |
| r = requests.post(request_url, headers=self.headers, data=data) | |
| return r | |
| def index_documents(self, data: Union[Dict, List[Dict]], engine_name: str) -> None: | |
| INDEX_URL = f"/api/as/v1/engines/{engine_name}/documents" | |
| request_url = urljoin(self.appsearch_endpoint, INDEX_URL) | |
| r = requests.post( | |
| request_url, | |
| headers=self.headers, | |
| data=json.dumps(data, indent=4, sort_keys=True), | |
| ) | |
| def list_existing_docs(self, engine_name) -> List[Dict]: | |
| LIST_URL = f"/api/as/v1/engines/{engine_name}/documents/list" | |
| MAX_DOCS_PER_PAGE = 100 | |
| request_url = urljoin(self.appsearch_endpoint, LIST_URL) | |
| current_page = 1 | |
| docs = list() | |
| while True: | |
| params = ( | |
| ("page[size]", f"{MAX_DOCS_PER_PAGE}"), | |
| ("page[current]", f"{current_page}"), | |
| ) | |
| page_content = json.loads( | |
| requests.get(request_url, headers=self.headers, params=params).text | |
| )["results"] | |
| docs.extend(page_content) | |
| current_page += 1 | |
| if not page_content: | |
| break | |
| return docs | |
| def list_existing_manual_urls(self, engine_name: str) -> List[Dict]: | |
| for doc in self.list_existing_docs(engine_name): | |
| if doc["is_manual"] == "true": | |
| yield doc["id"] | |
| def list_existing_non_manual_urls(self, engine_name: str) -> List[Dict]: | |
| for doc in self.list_existing_docs(engine_name): | |
| if doc["is_manual"] == "false": | |
| yield doc["id"] | |
| def list_existing_urls(self, engine_name: str) -> List[str]: | |
| for doc in self.list_existing_docs(engine_name): | |
| yield doc["id"] | |
| def get_elastic_query(self, data: str, size: int): | |
| return requests.post( | |
| url=f"{self.appsearch_endpoint}/api/as/v0/engines/us-speeches-s/elasticsearch/_search?size={size}", | |
| headers=self.headers, data=data) | |
| def delete_existing_non_manual_docs(self, engine_name: str) -> None: | |
| non_manual_doc_ids = list(self.list_existing_non_manual_urls(engine_name)) | |
| DELETE_URL = f"/api/as/v1/engines/{engine_name}/documents" | |
| MAX_DOCS_TO_DELETE_PER_REQUEST = 100 | |
| request_url = urljoin(self.appsearch_endpoint, DELETE_URL) | |
| def chunker(seq, size): | |
| return (seq[pos: pos + size] for pos in range(0, len(seq), size)) | |
| for idx, group in enumerate( | |
| chunker(non_manual_doc_ids, MAX_DOCS_TO_DELETE_PER_REQUEST) | |
| ): | |
| r = requests.delete( | |
| request_url, | |
| headers=self.headers, | |
| data=json.dumps(group, indent=4, sort_keys=True), | |
| ) | |