| import json |
| import os |
| from fuzzywuzzy import fuzz |
| from typing import List, Dict, Any |
| import yaml |
| import warnings |
| import pandas as pd |
|
|
| |
| |
| PORTFOLIO_COMPANY_LIST_IDENTIFIER = ["portfolio company or platforms","\u20acm","$m","Unrealised fair market valuation","Realised proceeds in the period","Portfolio Company or Platforms","portfolio company", "active investment", "realized/unrealized company","Realized Company","Unrealized Company", "quoted/unquoted company", "portfolio investment", "portfolio company"] |
| FUZZY_MATCH_THRESHOLD = 70 |
| EXCLUDE_COMPANY_NAMES = ["total", "subtotal","Total","Investments","Fund"] |
|
|
|
|
| def get_file_name_without_extension(file_path: str) -> str: |
| """Extract file name without extension from path.""" |
| return os.path.splitext(os.path.basename(file_path))[0] |
|
|
| def fuzzy_match(text: str, patterns: List[str], threshold: int = FUZZY_MATCH_THRESHOLD) -> bool: |
| """Check if text fuzzy matches any of the patterns.""" |
| text = str(text).lower() |
| for pattern in patterns: |
| if fuzz.partial_ratio(text, pattern.lower()) >= threshold: |
| return True |
| return False |
|
|
| def extract_portfolio_companies_from_table(table_data: Dict) -> List[str]: |
| """Extract company names from a portfolio company table.""" |
| companies = [] |
| if not table_data.get("table_info"): |
| return companies |
| |
| |
| company_column = None |
| for i, header in enumerate(table_data.get("table_column_header", [])): |
| if fuzzy_match(header, PORTFOLIO_COMPANY_LIST_IDENTIFIER): |
| company_column = i |
| break |
| |
| if company_column is None: |
| return companies |
| |
| |
| company_column_name = table_data["table_column_header"][company_column] |
| print("company_column::",company_column) |
| print("cpmpany_column_name::",company_column_name) |
| |
| |
| for row in table_data["table_info"]: |
| if not isinstance(row, dict): |
| continue |
| company_name = str(row.get(company_column_name, "")).strip() |
| if company_name and not fuzzy_match(company_name, EXCLUDE_COMPANY_NAMES): |
| companies.append(company_name) |
| |
| return companies |
|
|
| def get_portfolio_company_list(intermediate_data: List[Dict]) -> List[str]: |
| """Extract portfolio companies from all tables in the document.""" |
| portfolio_companies = set() |
| |
| for entry in intermediate_data: |
| if "table_content" not in entry: |
| continue |
| for table in entry["table_content"]: |
| companies = extract_portfolio_companies_from_table(table) |
| portfolio_companies.update(companies) |
| |
| return list(portfolio_companies) |
|
|
| def merge_content_under_same_header( |
| intermediate_data: List[Dict], |
| portfolio_company_list: List[str], |
| start_index: int |
| ) -> Dict: |
| """ |
| Merge content under the same header until next company match is found. |
| Returns merged content and the next index to process. |
| """ |
| merged_entry = { |
| "header": intermediate_data[start_index]["header"], |
| "content": intermediate_data[start_index].get("content", ""), |
| "table_content": intermediate_data[start_index].get("table_content", []), |
| "label_name": intermediate_data[start_index]["label_name"], |
| "page_number": intermediate_data[start_index]["page_number"], |
| "pdf_name": intermediate_data[start_index]["pdf_name"] |
| } |
| |
| current_index = start_index + 1 |
| while current_index < len(intermediate_data): |
| current_entry = intermediate_data[current_index] |
| |
| |
| if current_entry["header"] != merged_entry["header"]: |
| break |
| |
| |
| content_match = any(company in current_entry.get("content", "") |
| for company in portfolio_company_list) |
| table_match = False |
| for table in current_entry.get("table_content", []): |
| if extract_portfolio_companies_from_table(table): |
| table_match = True |
| break |
| |
| if content_match or table_match: |
| break |
| |
| |
| if "content" in current_entry: |
| if merged_entry["content"]: |
| merged_entry["content"] += "\n" + current_entry["content"] |
| else: |
| merged_entry["content"] = current_entry["content"] |
| |
| |
| if "table_content" in current_entry: |
| merged_entry["table_content"].extend(current_entry["table_content"]) |
| |
| current_index += 1 |
| |
| return merged_entry, current_index |
|
|
|
|
|
|
| def process_table_page_ids(merged_output): |
| """ |
| Process the data to update the page_number key by combining its existing values with unique page numbers |
| from table_content metadata, for pages that contain table_content. |
| |
| Args: |
| data (dict): Input data dictionary with page numbers as keys and page content as values. |
| |
| Returns: |
| dict: Modified data with updated page_number key including existing and metadata page numbers. |
| """ |
| |
| for current_merged_entry in merged_output: |
| |
| if 'table_content' in current_merged_entry: |
| |
| existing_page_numbers = set(current_merged_entry.get('page_number', '').split(',')) if current_merged_entry.get('page_number') else set() |
| |
| |
| for table in current_merged_entry['table_content']: |
| if 'metadata' in table and 'table_page_id' in table['metadata']: |
| existing_page_numbers.add(str(table['metadata']['table_page_id'])) |
| |
| |
| if existing_page_numbers: |
| current_merged_entry['page_number'] = ','.join(sorted(existing_page_numbers, key=int)) |
| |
| return merged_output |
|
|
|
|
| |
| |
|
|
| def merge_portfolio_company_sections(intermediate_data: List[Dict]) -> tuple[List[Dict], List[str], List[str]]: |
| """Merge all content and tables under the same portfolio company header until next company is found. |
| Returns: |
| - merged_output: List of merged document sections |
| - fuzzy_matched_companies: List of companies that were fuzzy matched in headers |
| - portfolio_companies: List of all portfolio companies found in tables |
| """ |
| portfolio_companies = get_portfolio_company_list(intermediate_data) |
| print(f"Extracted portfolio companies: {portfolio_companies}") |
| |
| merged_output = [] |
| |
| current_chunk = None |
| active_company = None |
| |
| for entry in intermediate_data: |
| |
| |
| |
| |
| |
| |
| entry_copy = entry.copy() |
|
|
| header_companies, fuzzy_matched_companies = match_company_names(entry["header"], portfolio_companies) |
| |
| |
| |
| if header_companies: |
| print("&"*100) |
| print("*"*100) |
| print("entry_header::", entry["header"]) |
| print("page number of header::", entry["page_number"]) |
|
|
| print("*"*100) |
| print("header_companies::", header_companies) |
| print("*"*100) |
| |
| |
| if current_chunk: |
| merged_output.append(current_chunk) |
| current_chunk = None |
| active_company = None |
| |
| |
| |
| active_company = header_companies[0] |
| current_chunk = { |
| "page_number": entry["page_number"], |
| "pdf_name": entry["pdf_name"], |
| "header": entry["header"], |
| "label_name": entry["label_name"], |
| "content": entry.get("content", ""), |
| "table_content": entry.get("table_content", []), |
| "matched_company": active_company |
| } |
| |
| |
| for additional_company in header_companies[1:]: |
| merged_output.append({ |
| "page_number": entry["page_number"], |
| "pdf_name": entry["pdf_name"], |
| "header": entry["header"], |
| "label_name": entry["label_name"], |
| "content": entry.get("content", ""), |
| "table_content": entry.get("table_content", []), |
| "matched_company": additional_company |
| }) |
| |
| elif current_chunk: |
| |
| if "content" in entry: |
| if current_chunk["content"]: |
| current_chunk["content"] += "\n\n" + entry["content"] |
| current_chunk["page_number"] += "," + str(entry["page_number"]) |
| page_numbers_list = list(dict.fromkeys(str(current_chunk["page_number"]).split(","))) |
| page_numbers_list = [num.strip() for num in page_numbers_list if num.strip()] |
| current_chunk["page_number"] = ",".join(page_numbers_list) |
|
|
| else: |
| current_chunk["content"] = entry["content"] |
| current_chunk["page_number"] = str(entry["page_number"]) |
| |
| if "table_content" in entry: |
| current_chunk["table_content"].extend(entry["table_content"]) |
| if current_chunk["page_number"]: |
| if "metadata" in entry["table_content"]: |
| if "table_page_id" in entry["table_content"]["metadata"]: |
| current_chunk["page_number"] += "," + str(entry["table_content"]["metadata"]["table_page_id"]) |
|
|
| current_chunk["page_number"] += "," + str(entry["page_number"]) |
| page_numbers_list = list(dict.fromkeys(str(current_chunk["page_number"]).split(","))) |
| page_numbers_list = [num.strip() for num in page_numbers_list if num.strip()] |
| current_chunk["page_number"] = ",".join(page_numbers_list) |
|
|
| |
| |
| |
| |
| |
|
|
| else: |
| |
| entry_copy = entry.copy() |
| if "page_number" in entry_copy : |
| page_numbers_list = list(dict.fromkeys(str(entry_copy["page_number"]).split(","))) |
| page_numbers_list = [num.strip() for num in page_numbers_list if num.strip()] |
| entry_copy["page_number"] = ",".join(page_numbers_list) |
|
|
| |
| merged_output.append(entry_copy) |
| |
| |
| if current_chunk: |
| |
| page_numbers_list = list(dict.fromkeys(str(current_chunk["page_number"]).split(","))) |
| page_numbers_list = [num.strip() for num in page_numbers_list if num.strip()] |
| entry_copy["page_number"] = ",".join(page_numbers_list) |
| merged_output.append(current_chunk) |
|
|
| merged_output_new = process_table_page_ids(merged_output=merged_output) |
| |
| return merged_output_new,fuzzy_matched_companies, portfolio_companies |
|
|
| |
|
|
| |
|
|
| import re |
|
|
| def match_company_names(header_text: str, companies: List[str], threshold: int = FUZZY_MATCH_THRESHOLD) -> List[str]: |
| """Match company names in text, first checking header text abbreviations, then company abbreviations.""" |
| header_text = str(header_text).lower().strip() |
| matched_companies = [] |
| fuzzy_matched_companies = [] |
| |
| |
| header_abbreviations = [ |
| ''.join(word[0] for word in header_text.split() if word), |
| re.sub(r'[aeiou\s]', '', header_text), |
| header_text.replace(' ', '') |
| ] |
| |
| for company in companies: |
| company_lower = company.lower() |
| |
| |
| for header_pattern in [header_text] + header_abbreviations: |
| if fuzz.partial_ratio(header_pattern, company_lower) >= threshold: |
| matched_companies.append(company) |
| fuzzy_matched_companies.append(company) |
| break |
| else: |
| |
| company_abbreviations = [ |
| ''.join(word[0] for word in company_lower.split() if word), |
| re.sub(r'[aeiou\s]', '', company_lower), |
| company_lower.replace(' ', '') |
| ] |
| for company_pattern in company_abbreviations: |
| if fuzz.partial_ratio(header_text, company_pattern) >= threshold: |
| matched_companies.append(company) |
| fuzzy_matched_companies.append(company) |
| break |
|
|
| |
| matched_companies = list(dict.fromkeys(matched_companies)) |
| fuzzy_matched_companies = list(dict.fromkeys(fuzzy_matched_companies)) |
|
|
| return matched_companies, fuzzy_matched_companies |
|
|
|
|
| |
|
|
| def process_document_company_wise( |
| intermediate_str_chunk_json: List[Dict], |
| output_directory: str, |
| file_name: str |
| ) -> List[Dict]: |
| """Process the document and return merged content in original format.""" |
| |
| if isinstance(intermediate_str_chunk_json, str): |
| intermediate_str_chunk_json = json.loads(intermediate_str_chunk_json) |
| |
| |
| |
| merged_content,matched_company_list,portfolio_company_list = merge_portfolio_company_sections(intermediate_str_chunk_json) |
| |
| merged_content[0]["portfolio_companies_list_fuzzy_matched"] = matched_company_list |
| merged_content[0]["portfolio_companies_list_before"] = portfolio_company_list |
|
|
| print("matched_company_list::",matched_company_list) |
| print("portfolio_company_list::",portfolio_company_list) |
| |
| |
| os.makedirs(output_directory, exist_ok=True) |
| |
| |
| output_path = os.path.join(output_directory, f"{file_name}_h2h_merged_output.json") |
| with open(output_path, "w", encoding="utf-8") as f: |
| json.dump(merged_content, f, indent=4, ensure_ascii=False) |
| print(f"Saved merged output to {output_path}") |
| |
| return merged_content |
|
|
|
|
| def read_json(file_path): |
| """Reads a JSON file and returns the parsed data.""" |
| with open(file_path, 'r', encoding='utf-8') as file: |
| data = json.load(file) |
| return data |
|
|
|
|
| |
| if __name__ == "__main__": |
| input_str_chunk_json_path="/shared_disk/kushal/db_str_chunking/new_ws_structured_code/Triton2023Q4_patria_sample_output/Triton2023Q4_patria_sample_json_output/Triton2023Q4_patria_sample_final_h2h_extraction.json" |
| input_json = read_json(input_str_chunk_json_path) |
| |
| |
| result = process_document_company_wise( |
| intermediate_str_chunk_json=input_json, |
| output_directory="db_structured_chunking/structure_chunking/src/iqeq_modification/testing_sample/output", |
| file_name="sample_report" |
| ) |
| |
| print("Processing complete.") |
| |
|
|
|
|