import os import time import yaml """ This client is a generic client for any Grobid application and sub-modules. At the moment, it supports only single document processing. Source: https://github.com/kermitt2/grobid-client-python """ """ Generic API Client """ from copy import deepcopy import json import requests try: from urlparse import urljoin except ImportError: from urllib.parse import urljoin class ApiClient(object): """Client to interact with a generic Rest API. Subclasses should implement functionality accordingly with the provided service methods, i.e. ``get``, ``post``, ``put`` and ``delete``. """ accept_type = "application/xml" api_base = None def __init__(self, base_url, username=None, api_key=None, status_endpoint=None, timeout=60): """Initialise client. Args: base_url (str): The base URL to the service being used. username (str): The username to authenticate with. api_key (str): The API key to authenticate with. timeout (int): Maximum time before timing out. """ self.base_url = base_url self.username = username self.api_key = api_key self.status_endpoint = urljoin(self.base_url, status_endpoint) self.timeout = timeout @staticmethod def encode(request, data): """Add request content data to request body, set Content-type header. Should be overridden by subclasses if not using JSON encoding. Args: request (HTTPRequest): The request object. data (dict, None): Data to be encoded. Returns: HTTPRequest: The request object. """ if data is None: return request request.add_header("Content-Type", "application/json") request.extracted_data = json.dumps(data) return request @staticmethod def decode(response): """Decode the returned data in the response. Should be overridden by subclasses if something else than JSON is expected. Args: response (HTTPResponse): The response object. Returns: dict or None. """ try: return response.json() except ValueError as e: return e.message def get_credentials(self): """Returns parameters to be added to authenticate the request. This lives on its own to make it easier to re-implement it if needed. Returns: dict: A dictionary containing the credentials. """ return {"username": self.username, "api_key": self.api_key} def call_api( self, method, url, headers=None, params=None, data=None, files=None, timeout=None, ): """Call API. This returns object containing data, with error details if applicable. Args: method (str): The HTTP method to use. url (str): Resource location relative to the base URL. headers (dict or None): Extra request headers to set. params (dict or None): Query-string parameters. data (dict or None): Request body contents for POST or PUT requests. files (dict or None: Files to be passed to the request. timeout (int): Maximum time before timing out. Returns: ResultParser or ErrorParser. """ headers = deepcopy(headers) or {} headers["Accept"] = self.accept_type if "Accept" not in headers else headers["Accept"] params = deepcopy(params) or {} data = data or {} files = files or {} # if self.username is not None and self.api_key is not None: # params.update(self.get_credentials()) r = requests.request( method, url, headers=headers, params=params, files=files, data=data, timeout=timeout, ) return r, r.status_code def get(self, url, params=None, **kwargs): """Call the API with a GET request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. Returns: ResultParser or ErrorParser. """ return self.call_api("GET", url, params=params, **kwargs) def delete(self, url, params=None, **kwargs): """Call the API with a DELETE request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. Returns: ResultParser or ErrorParser. """ return self.call_api("DELETE", url, params=params, **kwargs) def put(self, url, params=None, data=None, files=None, **kwargs): """Call the API with a PUT request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. data (dict or None): Request body contents. files (dict or None: Files to be passed to the request. Returns: An instance of ResultParser or ErrorParser. """ return self.call_api("PUT", url, params=params, data=data, files=files, **kwargs) def post(self, url, params=None, data=None, files=None, **kwargs): """Call the API with a POST request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. data (dict or None): Request body contents. files (dict or None: Files to be passed to the request. Returns: An instance of ResultParser or ErrorParser. """ return self.call_api(method="POST", url=url, params=params, data=data, files=files, **kwargs) def service_status(self, **kwargs): """Call the API to get the status of the service. Returns: An instance of ResultParser or ErrorParser. """ return self.call_api("GET", self.status_endpoint, params={"format": "json"}, **kwargs) class NERClientGeneric(ApiClient): def __init__(self, config_path=None, ping=False): self.config = None if config_path is not None: self.config = self._load_yaml_config_from_file(path=config_path) super().__init__(self.config["grobid"]["server"]) if ping: result = self.ping_service() if not result: raise Exception("Grobid is down.") os.environ["NO_PROXY"] = "nims.go.jp" @staticmethod def _load_json_config_from_file(path="./config.json"): """ Load the json configuration """ config = {} with open(path, "r") as fp: config = json.load(fp) return config @staticmethod def _load_yaml_config_from_file(path="./config.yaml"): """ Load the YAML configuration """ config = {} try: with open(path, "r") as the_file: raw_configuration = the_file.read() config = yaml.safe_load(raw_configuration) except Exception as e: print("Configuration could not be loaded: ", str(e)) exit(1) return config def set_config(self, config, ping=False): self.config = config if ping: try: result = self.ping_service() if not result: raise Exception("Grobid is down.") except Exception as e: raise Exception("Grobid is down or other problems were encountered. ", e) def ping_service(self): # test if the server is up and running... ping_url = self.get_url("ping") r = requests.get(ping_url) status = r.status_code if status != 200: print("GROBID server does not appear up and running " + str(status)) return False else: print("GROBID server is up and running") return True def get_url(self, action): grobid_config = self.config["grobid"] base_url = grobid_config["server"] action_url = base_url + grobid_config["url_mapping"][action] return action_url def process_texts(self, input, method_name="superconductors", params={}, headers={"Accept": "application/json"}): files = {"texts": input} the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post(url=the_url, files=files, data=params, headers=headers) if status == 503: time.sleep(self.config["sleep_time"]) return self.process_texts(input, method_name, params, headers) elif status != 200: print("Processing failed with error " + str(status)) return status, None else: return status, json.loads(res.text) def process_text(self, input, method_name="superconductors", params={}, headers={"Accept": "application/json"}): files = {"text": input} the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post(url=the_url, files=files, data=params, headers=headers) if status == 503: time.sleep(self.config["sleep_time"]) return self.process_text(input, method_name, params, headers) elif status != 200: print("Processing failed with error " + str(status)) return status, None else: return status, json.loads(res.text) def process_pdf(self, form_data: dict, method_name="superconductors", params={}, headers={"Accept": "application/json"}): the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post(url=the_url, files=form_data, data=params, headers=headers) if status == 503: time.sleep(self.config["sleep_time"]) return self.process_text(input, method_name, params, headers) elif status != 200: print("Processing failed with error " + str(status)) else: return res.text def process_pdfs(self, pdf_files, params={}): pass def process_pdf(self, pdf_file, method_name, params={}, headers={"Accept": "application/json"}, verbose=False, retry=None): files = {"input": (pdf_file, open(pdf_file, "rb"), "application/pdf", {"Expires": "0"})} the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post(url=the_url, files=files, data=params, headers=headers) if status == 503 or status == 429: if retry is None: retry = self.config["max_retry"] - 1 else: if retry - 1 == 0: if verbose: print("re-try exhausted. Aborting request") return None, status else: retry -= 1 sleep_time = self.config["sleep_time"] if verbose: print("Server is saturated, waiting", sleep_time, "seconds and trying again. ") time.sleep(sleep_time) return self.process_pdf(pdf_file, method_name, params, headers, verbose=verbose, retry=retry) elif status != 200: desc = None if res.content: c = json.loads(res.text) desc = c["description"] if "description" in c else None return desc, status elif status == 204: # print('No content returned. Moving on. ') return None, status else: return res.text, status def get_params_from_url(self, the_url): """ This method is used to pass to the URL predefined parameters, which are added in the URL format """ params = {} if "?" in the_url: split = the_url.split("?") the_url = split[0] params = split[1] params = {param.split("=")[0]: param.split("=")[1] for param in params.split("&")} return params, the_url