Spaces:
Sleeping
Sleeping
| import re | |
| from functools import lru_cache | |
| import gradio as gr | |
| import requests | |
| import os | |
| TOKEN_VALUE=os.getenv("TOKEN_VALUE") | |
| LOGIN = os.getenv("LOGIN") | |
| TOKEN = os.getenv("TOKEN") | |
| BASE_URL = 'https://nethunt.com/api/v1/zapier' | |
| FIELDS = { | |
| 'company_code': 'ЄДРПОУ', | |
| 'company_codes': 'Всі ЄДРПОУ', | |
| 'domain': 'Корпоративний домен', | |
| 'linkedin': 'LinkedIn', | |
| 'email': 'Email', | |
| 'phone': 'Телефон', | |
| 'company_name': 'Name', | |
| 'first_name': 'Имя', | |
| 'last_name': 'Фамилия', | |
| 'manager': 'Відповідальний'} | |
| FOLDERS = { | |
| 'companies': 'Companies', | |
| 'contacts': 'Contacts'} | |
| OUTPUT_LIMIT = 5 | |
| def _get_query( | |
| endpoint: str, | |
| params: dict[str, str] = None, | |
| json: dict[str, str] = None, | |
| login: str = LOGIN, | |
| token: str = TOKEN, | |
| base_url: str = BASE_URL | |
| ) -> list: | |
| url = f'{base_url}{endpoint}' | |
| responce = requests.get( | |
| url=url, params=params, json=json, auth=(login, token)) | |
| responce.raise_for_status() | |
| return responce.json() | |
| def _get_readable_folders(**kwargs) -> list[dict]: | |
| endpoint = '/triggers/writable-folder' | |
| return _get_query(endpoint, **kwargs) | |
| def _get_folder_id(folder_name: str, **kwargs) -> str: | |
| readable_folders = _get_readable_folders(**kwargs) | |
| for folder in readable_folders: | |
| if folder['name'] == folder_name: | |
| return folder['id'] | |
| raise ValueError('Wrong folder name.') | |
| def _get_nethunt_records( | |
| folder_name: str, | |
| query: str, | |
| limit: int = OUTPUT_LIMIT, | |
| **kwargs | |
| ) -> list[dict]: | |
| folder_id = _get_folder_id(folder_name) | |
| endpoint = f'/searches/find-record/{folder_id}' | |
| params = { | |
| 'query': query, | |
| 'limit': limit} | |
| return _get_query(endpoint, params, **kwargs) | |
| def _clean_found_records( | |
| folder_name: str, | |
| found_records: str, | |
| check_type: str | |
| ) -> list: | |
| result_list = [] | |
| for v in found_records: | |
| record = { | |
| 'folder_name': folder_name, | |
| 'duplicate_type': check_type, | |
| 'record_id': v.get('id'), | |
| 'link': v.get('link'), | |
| 'manager': v.get('fields', {}).get(FIELDS['manager'], '')} | |
| if folder_name == FOLDERS['companies']: | |
| company_name = v.get('fields', {}) \ | |
| .get(FIELDS['company_name'], '') | |
| record['name'] = company_name | |
| if folder_name == FOLDERS['contacts']: | |
| first_name = v.get('fields', {}) \ | |
| .get(FIELDS['first_name'], '') | |
| last_name = v.get('fields', {}) \ | |
| .get(FIELDS['last_name'], '') | |
| record['name'] = (first_name + ' ' + last_name).strip() | |
| result_list.append(record) | |
| return result_list | |
| def _check_field( | |
| folder_name: str, | |
| query: str, | |
| check_type: str | |
| ) -> list: | |
| found_records = _get_nethunt_records(folder_name, query=query) | |
| if len(found_records) == 0: | |
| return [] | |
| return _clean_found_records(folder_name, found_records, check_type) | |
| def _check_company_code(code: str) -> list: | |
| code = code.strip() | |
| folder = FOLDERS['companies'] | |
| query = f''' | |
| ("{FIELDS['company_code']}":"{code}") | |
| OR ("{FIELDS['company_codes']}":"{code}")''' | |
| return _check_field(folder, query, 'company_code') | |
| def _check_company_domain(domain: str) -> list: | |
| domain = domain.replace('@', '').strip() | |
| folder = FOLDERS['companies'] | |
| query = f'''"{FIELDS['domain']}":"{domain}"''' | |
| return _check_field(folder, query, 'domain') | |
| def _check_company_linkedin(linkedin: str) -> list: | |
| linkedin = linkedin.strip() | |
| folder = FOLDERS['companies'] | |
| query = f'''"{FIELDS['linkedin']}":"{linkedin}"''' | |
| return _check_field(folder, query, 'company_linkedin') | |
| def _check_contact_email(email: str) -> list: | |
| email = email.lower().strip() | |
| folder = FOLDERS['contacts'] | |
| query = f'''"{FIELDS['email']}":"{email}"''' | |
| return _check_field(folder, query, 'email') | |
| def _check_contact_phone(phone: str) -> list: | |
| phone = re.sub(r'[^\d+]', '', phone) | |
| folder = FOLDERS['contacts'] | |
| query = f'''"{FIELDS['phone']}":"{phone}"''' | |
| return _check_field(folder, query, 'phone') | |
| def _check_contact_linkedin(linkedin: str) -> list: | |
| linkedin = linkedin.strip() | |
| folder = FOLDERS['contacts'] | |
| query = f'''"{FIELDS['linkedin']}":"{linkedin}"''' | |
| return _check_field(folder, query, 'contact_linkedin') | |
| def check_record( | |
| company_code: str = None, | |
| domain: str = None, | |
| company_linkedin: str = None, | |
| email: str = None, | |
| phone: str = None, | |
| contact_linkedin: str = None | |
| ) -> list: | |
| found_records = [] | |
| checks = { | |
| _check_company_code: company_code, | |
| _check_company_domain: domain, | |
| _check_company_linkedin: company_linkedin, | |
| _check_contact_email: email, | |
| _check_contact_phone: phone, | |
| _check_contact_linkedin: contact_linkedin} | |
| for check, value in checks.items(): | |
| if value: | |
| found_records.extend(check(value)) | |
| # Апгрейд для вывода | |
| result = '<span>' | |
| count = 0 | |
| for item in found_records: | |
| if item['folder_name'] == 'Companies': | |
| icon = '🏢' | |
| elif item['folder_name'] == 'Contacts': | |
| icon = '👨💼' | |
| count = count + 1 | |
| result += ( | |
| f"{icon} " | |
| f"<b>{item['name']}</b> | " | |
| f"Manager: {item['manager']} | " | |
| f"Found by: {item['duplicate_type']} | " | |
| f"Link: <a href=\"{str(item['link'])}\">" | |
| f"{item['record_id']}</a><br><br>") | |
| if count == OUTPUT_LIMIT: | |
| break | |
| if count == 0: | |
| result += "<h2>Nothing found</h2>" | |
| result += "</span>" | |
| return result | |
| def generate( | |
| company_code, domain, company_linkedin, | |
| email, phone, contact_linkedin, token): | |
| if (TOKEN_VALUE == token): return check_record( | |
| company_code=company_code, | |
| domain=domain, | |
| company_linkedin=company_linkedin, | |
| email=email, | |
| phone=phone, | |
| contact_linkedin=contact_linkedin) | |
| else: return "Token error" | |
| title = "NetHunt search" | |
| css = """ | |
| footer {visibility: hidden} | |
| """ | |
| with gr.Blocks(css=css, title=title) as demo: | |
| gr.Markdown( | |
| """ | |
| # NetHuntCrm Search | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_7 = gr.Text(label="Token") | |
| input_1 = gr.Text(label="Сompany Code") | |
| input_2 = gr.Textbox(label="Corp Domain") | |
| gr.Markdown(""" | |
| """) | |
| greet_btn = gr.Button("Search") | |
| with gr.Column(): | |
| input_4 = gr.Textbox(label="Email") | |
| input_5 = gr.Textbox(label="Phone") | |
| input_6 = gr.Textbox(label="Contact LinkedIn") | |
| input_3 = gr.Textbox(label="Company LinkedIn") | |
| output = gr.outputs.HTML() | |
| input = [input_1, input_2, input_3, input_4, input_5, input_6, input_7] | |
| greet_btn.click(fn=generate, inputs=input, outputs=output) | |
| demo.launch(debug=True, share=False) | |