import os import json import logging from typing import Any, Dict, List, Optional from pydantic import BaseModel, ValidationError from pydantic_settings import BaseSettings import pandas as pd from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side from openpyxl.utils import get_column_letter # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Settings(BaseSettings): """Application settings loaded from environment variables or .env file.""" input_file: str = "data/output2/notes_output.json" output_folder: str = "data/output3" output_file: str = "data/final_notes_output.xlsx" settings = Settings() class BreakdownItem(BaseModel): description: str amount: float amount_lakhs: Optional[float] = None class MatchedAccount(BaseModel): account: str amount: float amount_lakhs: Optional[float] = None group: Optional[str] = None class NoteData(BaseModel): note_number: Optional[str] = None note_title: Optional[str] = None full_title: Optional[str] = None table_data: Optional[List[Dict[str, Any]]] = [] breakdown: Optional[Dict[str, BreakdownItem]] = {} matched_accounts: Optional[List[MatchedAccount]] = [] total_amount: Optional[float] = None total_amount_lakhs: Optional[float] = None matched_accounts_count: Optional[int] = None comparative_data: Optional[Dict[str, Any]] = {} notes_and_disclosures: Optional[List[str]] = [] markdown_content: Optional[str] = "" def create_output_folder(folder_path: str) -> None: """Create output folder if it doesn't exist.""" if not os.path.exists(folder_path): os.makedirs(folder_path) logger.info(f"Created folder: {folder_path}") def read_json_file(file_path: str) -> Optional[Dict[str, Any]]: """Read and parse JSON file.""" try: with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) logger.info(f"Successfully read JSON file: {file_path}") return data except FileNotFoundError: logger.error(f"File '{file_path}' not found.") return None except json.JSONDecodeError as e: logger.error(f"Invalid JSON format in '{file_path}': {e}") return None except Exception as e: logger.error(f"Error reading file '{file_path}': {e}") return None def normalize_llm_note_json(llm_json: Dict[str, Any]) -> Dict[str, Any]: """ Convert LLM note JSON (single note, custom structure) to the standard notes_output.json format. """ if "note_number" in llm_json or "full_title" in llm_json or "table_data" in llm_json: return llm_json normalized = { "note_number": llm_json.get("metadata", {}).get("note_number", ""), "note_title": llm_json.get("title", ""), "full_title": llm_json.get("full_title", ""), "table_data": [], "breakdown": {}, "matched_accounts": [], "total_amount": None, "total_amount_lakhs": None, "matched_accounts_count": None, "comparative_data": {}, "notes_and_disclosures": [], "markdown_content": "", } if "structure" in llm_json: for item in llm_json["structure"]: if "category" in item and "subcategories" in item: for sub in item["subcategories"]: row = { "particulars": sub.get("label", ""), "current_year": sub.get("value", ""), "previous_year": "" } normalized["table_data"].append(row) return normalized def create_financial_table_sheet(workbook: Workbook, sheet_name: str, note_data: Dict[str, Any]) -> None: """Create a properly formatted financial table sheet.""" ws = workbook.create_sheet(title=sheet_name) header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") bold_font = Font(bold=True) center_alignment = Alignment(horizontal="center", vertical="center") right_alignment = Alignment(horizontal="right", vertical="center") thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) current_row = 1 # Add Note Title note_title = note_data.get('full_title', note_data.get('note_title', 'Note')) ws.cell(row=current_row, column=1, value=note_title) ws.cell(row=current_row, column=1).font = Font(bold=True, size=14) current_row += 2 # Process table_data if available if 'table_data' in note_data and note_data['table_data']: table_data = note_data['table_data'] df = pd.DataFrame(table_data) for col_num, column_name in enumerate(df.columns, 1): cell = ws.cell(row=current_row, column=col_num, value=column_name.replace('_', ' ').title()) cell.font = header_font cell.fill = header_fill cell.alignment = center_alignment cell.border = thin_border current_row += 1 for _, row in df.iterrows(): for col_num, value in enumerate(row, 1): cell = ws.cell(row=current_row, column=col_num, value=value) cell.border = thin_border if col_num > 1: cell.alignment = right_alignment if isinstance(value, str) and ('**' in value or 'Total' in value or 'Particulars' in value): cell.font = bold_font cell.value = value.replace('**', '') current_row += 1 current_row += 1 # Add breakdown information if available if 'breakdown' in note_data and note_data['breakdown']: ws.cell(row=current_row, column=1, value="Breakdown Details:") ws.cell(row=current_row, column=1).font = bold_font current_row += 1 ws.cell(row=current_row, column=1, value="Description") ws.cell(row=current_row, column=2, value="Amount") ws.cell(row=current_row, column=3, value="Amount (Lakhs)") for col in range(1, 4): cell = ws.cell(row=current_row, column=col) cell.font = header_font cell.fill = header_fill cell.alignment = center_alignment cell.border = thin_border current_row += 1 for key, value in note_data['breakdown'].items(): if isinstance(value, dict): desc = value.get('description', key) amount = value.get('amount', 0) amount_lakhs = value.get('amount_lakhs', 0) ws.cell(row=current_row, column=1, value=desc).border = thin_border ws.cell(row=current_row, column=2, value=amount).border = thin_border ws.cell(row=current_row, column=3, value=amount_lakhs).border = thin_border ws.cell(row=current_row, column=2).alignment = right_alignment ws.cell(row=current_row, column=3).alignment = right_alignment current_row += 1 current_row += 1 # Add matched accounts if available if 'matched_accounts' in note_data and note_data['matched_accounts']: ws.cell(row=current_row, column=1, value="Account-wise Breakdown:") ws.cell(row=current_row, column=1).font = bold_font current_row += 1 headers = ["Account", "Amount", "Amount (Lakhs)", "Group"] for col_num, header in enumerate(headers, 1): cell = ws.cell(row=current_row, column=col_num, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = center_alignment cell.border = thin_border current_row += 1 for account in note_data['matched_accounts']: ws.cell(row=current_row, column=1, value=account.get('account', '')).border = thin_border ws.cell(row=current_row, column=2, value=account.get('amount', 0)).border = thin_border ws.cell(row=current_row, column=3, value=account.get('amount_lakhs', 0)).border = thin_border ws.cell(row=current_row, column=4, value=account.get('group', '')).border = thin_border ws.cell(row=current_row, column=2).alignment = right_alignment ws.cell(row=current_row, column=3).alignment = right_alignment current_row += 1 current_row += 1 # Add summary information if 'total_amount' in note_data: ws.cell(row=current_row, column=1, value="Summary:") ws.cell(row=current_row, column=1).font = bold_font current_row += 1 ws.cell(row=current_row, column=1, value="Total Amount:") ws.cell(row=current_row, column=2, value=note_data.get('total_amount', 0)) ws.cell(row=current_row, column=2).alignment = right_alignment current_row += 1 ws.cell(row=current_row, column=1, value="Total Amount (Lakhs):") ws.cell(row=current_row, column=2, value=note_data.get('total_amount_lakhs', 0)) ws.cell(row=current_row, column=2).alignment = right_alignment current_row += 1 ws.cell(row=current_row, column=1, value="Matched Accounts Count:") ws.cell(row=current_row, column=2, value=note_data.get('matched_accounts_count', 0)) ws.cell(row=current_row, column=2).alignment = right_alignment current_row += 1 # Auto-adjust column widths for column in ws.columns: max_length = 0 column_letter = get_column_letter(column[0].column) for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except Exception: pass adjusted_width = min(max_length + 2, 60) ws.column_dimensions[column_letter].width = adjusted_width def convert_json_to_excel(input_file: str, output_file: str) -> bool: """Main function to convert JSON to Excel.""" json_data = read_json_file(input_file) if json_data is None: return False # Normalize if needed if isinstance(json_data, dict) and "notes" not in json_data: normalized_note = normalize_llm_note_json(json_data) json_data = {"notes": [normalized_note]} elif isinstance(json_data, list): json_data = {"notes": json_data} workbook = Workbook() default_sheet = workbook.active workbook.remove(default_sheet) if 'notes' in json_data: notes_data = json_data['notes'] for note in notes_data: try: validated_note = NoteData(**note) except ValidationError as ve: logger.warning(f"Validation error for note: {ve}") validated_note = note # fallback to raw dict note_title = note.get('full_title', note.get('note_title', f"Note {note.get('note_number', '')}")) clean_sheet_name = str(note_title).replace('/', '_').replace('\\', '_').replace('*', '_') clean_sheet_name = clean_sheet_name.replace('?', '_').replace('[', '_').replace(']', '_') clean_sheet_name = clean_sheet_name[:31] logger.info(f"Processing: {clean_sheet_name}") create_financial_table_sheet(workbook, clean_sheet_name, note) else: for note_key, note_data in json_data.items(): clean_sheet_name = str(note_key).replace('/', '_').replace('\\', '_').replace('*', '_') clean_sheet_name = clean_sheet_name.replace('?', '_').replace('[', '_').replace(']', '_') clean_sheet_name = clean_sheet_name[:31] logger.info(f"Processing: {clean_sheet_name}") if isinstance(note_data, dict): create_financial_table_sheet(workbook, clean_sheet_name, note_data) else: simple_data = {"value": note_data} create_financial_table_sheet(workbook, clean_sheet_name, simple_data) try: workbook.save(output_file) logger.info(f"Successfully saved Excel file: {output_file}") return True except Exception as e: logger.error(f"Error saving Excel file: {e}") return False def json_to_xlsx(input_json: str, output_xlsx: str) -> None: """ Convert the given JSON file to Excel using the existing logic. """ convert_json_to_excel(input_json, output_xlsx) def main() -> None: """Main execution function.""" input_file = settings.input_file output_folder = settings.output_folder output_file = os.path.join(output_folder, settings.output_file) create_output_folder(output_folder) if not os.path.exists(input_file): logger.error(f"Input file '{input_file}' not found. Please ensure the file exists in the correct location.") return success = convert_json_to_excel(input_file, output_file) if success: logger.info("=" * 50) logger.info("CONVERSION COMPLETED SUCCESSFULLY!") logger.info("=" * 50) logger.info(f"Input file: {input_file}") logger.info(f"Output file: {output_file}") logger.info("The Excel file has been created with:") logger.info("- Each note as a separate sheet") logger.info("- Proper financial table formatting") logger.info("- Table data displayed in tabular format") logger.info("- Breakdown and account details included") logger.info("- Professional styling and formatting") else: logger.error("=" * 50) logger.error("CONVERSION FAILED!") logger.error("=" * 50) logger.error("Please check the error messages above.") if __name__ == "__main__": main()