|
|
import requests |
|
|
import ipaddress |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
from . import api_folder |
|
|
|
|
|
|
|
|
def get_url_port(server_url_port=""): |
|
|
if not server_url_port or server_url_port == "": |
|
|
return None, None |
|
|
o = urlparse(server_url_port) |
|
|
_url = f"http://{o.hostname}" |
|
|
if o.hostname != "localhost": |
|
|
_ip = ipaddress.ip_address(o.hostname) |
|
|
if _ip.version == 6: |
|
|
_url = f"http://[{o.hostname}]" |
|
|
port = o.port |
|
|
return _url, port |
|
|
|
|
|
|
|
|
|
|
|
def findFolderByID(r_posts, target_id): |
|
|
return findFolderByName(r_posts, target_id, findByID=True) |
|
|
|
|
|
|
|
|
def findFolderByName(r_posts, target_name, findByID=False): |
|
|
_ret = [] |
|
|
if not target_name or target_name == "" or not r_posts: |
|
|
return None |
|
|
_all_folder = getAllFolder(r_posts) |
|
|
for _data in _all_folder: |
|
|
if (findByID and _data.get("id", "") == target_name) or ( |
|
|
_data.get("name", "") == target_name |
|
|
): |
|
|
_ret = _data |
|
|
break |
|
|
return _ret |
|
|
|
|
|
|
|
|
def getAllFolder(r_posts): |
|
|
"""get dict of {"folderId": _data, ...""" |
|
|
|
|
|
def dig_folder(data, dig_count, dig_limit=10): |
|
|
dig_count += 1 |
|
|
if dig_count > dig_limit: |
|
|
return [] |
|
|
_ret = [data] |
|
|
if "children" in data and len(data["children"]) > 0: |
|
|
for _child in data["children"]: |
|
|
_ret += dig_folder(_child, dig_count) |
|
|
return _ret |
|
|
|
|
|
_ret = [] |
|
|
if not r_posts: |
|
|
return None |
|
|
_posts = r_posts.json() |
|
|
if not _posts or "status" not in _posts or _posts["status"] != "success": |
|
|
return None |
|
|
if "data" in _posts and len(_posts["data"]) > 0: |
|
|
for _data in _posts["data"]: |
|
|
_ret += dig_folder(_data, 0) |
|
|
return _ret |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_response(_res: requests.Response): |
|
|
print(f"status code : {_res.status_code}") |
|
|
print(f"headers : {_res.headers}") |
|
|
print(f"text : {_res.text}") |
|
|
print(f"encoding : {_res.encoding}") |
|
|
print(f"cookies : {_res.cookies}") |
|
|
|
|
|
print(f"content : {_res.content}") |
|
|
print(f"content decode: {_res.content.decode(encoding=_res.apparent_encoding)}") |
|
|
|
|
|
|
|
|
def get_json_from_response(_res: requests.Response): |
|
|
try: |
|
|
_result = _res.json() |
|
|
return _result |
|
|
except: |
|
|
return _res |
|
|
|
|
|
|
|
|
def find_or_create_folder( |
|
|
folder_name_or_id, |
|
|
allow_create_new_folder=False, |
|
|
server_url="http://localhost", |
|
|
port=41595, |
|
|
timeout_connect=3, |
|
|
timeout_read=10, |
|
|
): |
|
|
""" |
|
|
Find or Create folder on Eagle, by folderId or FolderName |
|
|
|
|
|
Args: |
|
|
folder_name_or_id (str): |
|
|
allow_create_new_folder (bool, optional): if True, create new folder on Eagle. Defaults to False. |
|
|
server_url (str, optional): Defaults to "http://localhost". |
|
|
port (int, optional): Defaults to 41595. |
|
|
timeout_connect (int, optional): Defaults to 3. |
|
|
timeout_read (int, optional): Defaults to 10. |
|
|
Return: |
|
|
folderId or "" |
|
|
|
|
|
""" |
|
|
_eagle_folderid = "" |
|
|
if folder_name_or_id and folder_name_or_id != "": |
|
|
_ret_folder_list = api_folder.list( |
|
|
server_url=server_url, |
|
|
port=port, |
|
|
timeout_connect=timeout_connect, |
|
|
timeout_read=timeout_read, |
|
|
) |
|
|
|
|
|
|
|
|
_ret = findFolderByName(_ret_folder_list, folder_name_or_id) |
|
|
if _ret and len(_ret) > 0: |
|
|
_eagle_folderid = _ret.get("id", "") |
|
|
|
|
|
if _eagle_folderid == "": |
|
|
_ret = findFolderByID(_ret_folder_list, folder_name_or_id) |
|
|
if _ret and len(_ret) > 0: |
|
|
_eagle_folderid = _ret.get("id", "") |
|
|
if _eagle_folderid == "": |
|
|
if allow_create_new_folder: |
|
|
_r_get = api_folder.create( |
|
|
folder_name_or_id, |
|
|
server_url=server_url, |
|
|
port=port, |
|
|
timeout_connect=timeout_connect, |
|
|
timeout_read=timeout_read, |
|
|
) |
|
|
try: |
|
|
_eagle_folderid = _r_get.json().get("data").get("id") |
|
|
except: |
|
|
_eagle_folderid = "" |
|
|
return _eagle_folderid |
|
|
|