| from faker import Faker |
| import pandas as pd |
| import re |
| import sys |
| import json |
| import unicodedata |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin |
| import cpca |
|
|
| from selenium import webdriver |
| import helium as hm |
| from selenium.webdriver.chrome.options import Options |
| from selenium.webdriver.chrome.service import Service |
|
|
| from keywordInfo import key_pat, zwlx_list |
|
|
| |
| sub_url ="http://www.beijing.gov.cn/gongkai/rsxx/gwyzk/202211/t20221120_2862819.html" |
| sub_url ="https://www.js.msa.gov.cn/art/2023/2/24/art_11436_1391666.html" |
|
|
|
|
| def getDriver(): |
| |
| uas = Faker() |
| CHROMEDRIVER_PATH = '/usr/bin/chromedriver' |
| CHROMEDRIVER_PATH = './chromedriver' |
| s = Service(CHROMEDRIVER_PATH) |
| WINDOW_SIZE = "1920,1080" |
| |
| chrome_options = Options() |
| chrome_options.add_argument("--headless") |
| chrome_options.add_argument("--window-size=%s" % WINDOW_SIZE) |
| chrome_options.add_argument('--no-sandbox') |
| chrome_options.add_argument('--disable-infobars') |
| chrome_options.add_argument('--disable-gpu') |
| |
| chrome_options.add_argument('--disable-blink-features=AutomationControlled') |
| chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) |
| chrome_options.add_argument(f'user-agent={uas.chrome()}') |
| with open('./stealth.min.js') as f: |
| js = f.read() |
| driver = webdriver.Chrome(executable_path=CHROMEDRIVER_PATH, options=chrome_options) |
| driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js }) |
| return driver |
|
|
| def content_with_date(lines): |
| if len(lines) < 1: |
| return [] |
| date_pattern1 = r'\d{1,2}月\d{1,2}日' |
| date_pattern2 = r'[上|下]午' |
| date_pattern3 = r'\d{4}年\d{1,2}月\d{1,2}日' |
| inx_ = len(lines)-1 |
| for inx_, line in enumerate(lines): |
| matches = re.findall(f'({date_pattern1}|{date_pattern3})', line) |
| if len(matches)>0: |
| break |
| if len(matches)<1: |
| return [] |
| |
| |
| |
| return lines[:inx_+1] |
|
|
|
|
| |
| def find_key_paragrap(search_text, paragraphs): |
| |
| for inx_, paragraph in enumerate(paragraphs): |
| text = paragraph.text |
| if search_text in text: |
| |
| |
| index = inx_ |
| |
| start = max(0, inx_) |
| end = min(len(paragraphs), index + 7) |
| target_paragrap = paragraphs[start:end] |
| texts = [i.text for i in target_paragrap] |
| dt_lines = content_with_date(texts) |
| if len(dt_lines) >= 1: |
| return texts |
| return None |
|
|
| def titleLocInfo(title): |
| """get loction and year from title""" |
| |
| |
| |
| year_pattern = r'\d{4}年' |
| matches = re.findall(f'({year_pattern})', title) |
| zwk_year = matches[0] if len(matches) else "2023" |
| |
| area_df = cpca.transform([title]) |
| |
| zwk_sheng = list(area_df["省"])[0] if area_df.shape[0] > 0 else "" |
| a_ = list(area_df["市"])[0] if area_df.shape[0] > 0 else "" |
| b_ = list(area_df["区"])[0] if area_df.shape[0] > 0 else "" |
| zwk_diqu = a_ |
| |
| zwk_zip = list(area_df["adcode"])[0] if area_df.shape[0] > 0 else "" |
|
|
| zwlx = zwlx_list[0] |
| for i in zwlx_list: |
| if i in title: |
| zwlx = i |
| res = [zwk_year, zwk_sheng, zwk_diqu, zwk_zip, zwlx] |
| |
| return res |
|
|
| def extract_from_driver(driver): |
| """ get result from url request BeautifulSoup(texts,'html.parser') |
| return: doc_item ,time source info, and attach information |
| """ |
| title=driver.title |
| |
| title_info = titleLocInfo(title) |
|
|
| items_ = driver.find_elements_by_xpath("//p") |
| items_ = [i.text for i in items_ if i.text != ""] |
| context_to_label = "\n".join(items_) |
|
|
| paragraphs = driver.find_elements_by_tag_name("p") |
| paragraphs = [i for i in paragraphs if i.text.strip() != ""] |
|
|
| |
| |
| def get_key_info(pt:list): |
| for item in pt: |
| res_ = find_key_paragrap(item, paragraphs) |
| if res_ is not None: |
| return res_ |
| return "" |
| bm_sj = get_key_info(key_pat["报名"]) |
| fee_sj = get_key_info(key_pat["缴费"]) |
| ks_sj = get_key_info(key_pat["考试"]) |
| zkz_sj = get_key_info(key_pat["准考证"]) |
| |
| links = driver.find_elements_by_tag_name("a") |
| unique_link = {} |
| for link in links: |
| url_ = link.get_attribute("href") |
| content_ = link.get_attribute("textContent") |
| url_con = url_ and (url_.endswith(".doc") or url_.endswith(".xls") or url_.endswith(".xlsx")) |
| name_con = content_ and (content_.endswith(".doc") or content_.endswith(".xls") or content_.endswith(".xlsx")) |
| if url_con or name_con: |
| unique_link[content_] = url_ |
| name = ["title", "zwk_year", "zwk_sheng", "zwk_diqu", "zwk_zip", "zwlx", |
| "bm_sj", "fee_sj", "ks_sj", "zkz_sj", |
| "fn_list" , |
| "tidy_bm_sj","tidy_fee_sj", "tidy_ks_sj", "tidy_zkz_sj" |
| ] |
| doc_item = [title] |
| doc_item.extend(title_info) |
| doc_item.extend([bm_sj, fee_sj, ks_sj, zkz_sj, |
| unique_link]) |
| td_bm_sj = content_with_date(bm_sj) |
| td_fee_sj = content_with_date(fee_sj) |
| td_ks_sj = content_with_date(ks_sj) |
| td_zkz_sj = content_with_date(zkz_sj) |
| doc_item.extend([td_bm_sj, td_fee_sj, td_ks_sj, td_zkz_sj]) |
| doc_dc = {} |
| for k_, v_ in zip(name, doc_item): |
| doc_dc[k_] = v_ |
| return doc_dc |
|
|
| |
| def table_record_doc(doc): |
| fn_dc = doc["fn_list"] |
| row_item = [ |
| doc["title"], doc["zwk_year"], doc["zwk_sheng"], doc["zwk_diqu"], doc["zwk_zip"], |
| doc["zwlx"], |
| "\n".join(doc["bm_sj"]), |
| "\n".join(doc["fee_sj"]), |
| "\n".join(doc["ks_sj"]), |
| "\n".join(doc["zkz_sj"]), |
| "\n".join(doc["tidy_bm_sj"]), |
| "\n".join(doc["tidy_fee_sj"]), |
| "\n".join(doc["tidy_ks_sj"]), |
| "\n".join(doc["tidy_zkz_sj"]), |
| "\n".join([f"{k}:{v}" for k,v in fn_dc.items()]) |
| ] |
| name = ["title", "zwk_year", "zwk_sheng", "zwk_diqu", "zwk_zip", "zwlx", |
| "bm_sj", "fee_sj", "ks_sj", "zkz_sj", |
| "tidy_bm_sj","tidy_fee_sj", "tidy_ks_sj", "tidy_zkz_sj", |
| "fn_list"] |
| a = pd.DataFrame(data=[row_item], columns=name) |
| return row_item |
|
|
| if __name__ == '__main__': |
| mydriver = getDriver() |
| |
| url = "https://www.js.msa.gov.cn/art/2023/2/24/art_11436_1391666.html" |
| if len(sys.argv) > 1: |
| url = sys.argv[1] |
| hm.set_driver(mydriver) |
| hm.go_to(sub_url) |
| |
| import time |
| time.sleep(2) |
| res = extract_from_driver(mydriver) |
| print("-raw, mostly contains----------------------------") |
| print(res) |
| print("报名,缴费,考试,准考证最相关信息") |
| bm_sj = doc["bm_sj"] |
| bm_sj = content_with_date(bm_sj) |
| fee_sj = content_with_date(doc["fee_sj"]) |
| ks_sj = content_with_date(doc["ks_sj"]) |
| zkz_sj = content_with_date(doc["zkz_sj"]) |
| print(bm_sj) |
| print(fee_sj) |
| print(ks_sj) |
| print(zkz_sj) |
| mydriver.close() |
| |