#!/usr/bin/env python3 """ Normalize Layer 2 structured financial datasets into ChatML JSONL format. Handles 5 structured financial datasets: - stocks_balance_sheet (paperswithbacktest/Stocks-Quarterly-BalanceSheet) - balance sheet data - stocks_income_statement (paperswithbacktest/Stocks-Quarterly-IncomeStatement) - P&L data - stocks_earnings (paperswithbacktest/Stocks-Quarterly-Earnings) - earnings data - risk_analysis (gretelai/gretel-financial-risk-analysis-v1) - risk analysis data - fingpt_forecaster (FinGPT/fingpt-forecaster-dow30-202305-202405) - stock forecasting data Converts structured data rows to JSON and pairs them with CFO-style analysis prompts and template responses. Includes 4-5 prompt variations per category for diversity. """ import json import os import random from pathlib import Path from datasets import load_from_disk from typing import Optional, Dict, List, Tuple def get_system_prompt(data_dir: Path) -> str: """Load the CFO system prompt from file.""" prompt_path = data_dir / "cfo_system_prompt.txt" with open(prompt_path, "r", encoding="utf-8") as f: return f.read().strip() def convert_row_to_json(row: Dict) -> str: """Convert a data row to a pretty-printed JSON string.""" return json.dumps(row, indent=2, ensure_ascii=False) # CFO-style prompt variations for balance sheet data BALANCE_SHEET_PROMPTS = [ "Analyze this quarterly balance sheet data and provide a CFO-level assessment.", "Review the following balance sheet figures and identify key financial health indicators.", "Given this balance sheet snapshot, what are the critical liquidity and solvency metrics?", "Assess the financial position based on this balance sheet data. What metrics matter most?", "Conduct a balance sheet analysis for strategic financial planning.", ] # CFO-style prompt variations for income statement data INCOME_STATEMENT_PROMPTS = [ "Analyze this quarterly income statement and provide insights on profitability and operational efficiency.", "Review the P&L statement and identify trends in revenue, expenses, and margins.", "Based on this income statement, what are the key performance drivers and areas of concern?", "Conduct a detailed analysis of revenue streams, operating costs, and net profitability.", "Assess this income statement for CEO/Board-level reporting. What's the narrative?", ] # CFO-style prompt variations for earnings data EARNINGS_PROMPTS = [ "Analyze these quarterly earnings results and provide strategic interpretation.", "Review the earnings data and identify performance drivers versus market expectations.", "Based on these earnings metrics, what guidance would you provide for the next quarter?", "Conduct a comprehensive earnings analysis including EPS, growth rates, and forward guidance.", "Assess earnings quality and sustainability. What risks or opportunities do you see?", ] # CFO-style prompt variations for risk analysis data RISK_ANALYSIS_PROMPTS = [ "Analyze this financial risk assessment and provide mitigation recommendations.", "Review the identified risks and prioritize them by impact and likelihood.", "Based on this risk analysis, what hedging or insurance strategies would you recommend?", "Conduct a comprehensive financial risk evaluation and outline a risk management plan.", "Assess the risk profile and identify key metrics to monitor for early warning signals.", ] # CFO-style prompt variations for forecasting data FORECASTING_PROMPTS = [ "Analyze this stock forecast and explain the underlying methodology and assumptions.", "Review the forecasted metrics and assess their reasonableness based on historical trends.", "Given these forecasting results, what is your confidence level and what could invalidate this outlook?", "Conduct a sensitivity analysis on the key forecasting assumptions.", "Assess this forecast for board presentation. What are the key takeaways and risks?", ] def generate_balance_sheet_response(row_json: str, ticker: str = "") -> str: """Generate a template CFO-level response for balance sheet analysis.""" return f"""Based on the balance sheet data provided{' for ' + ticker if ticker else ''}: **Key Financial Metrics:** The balance sheet reflects the company's financial position across assets, liabilities, and equity. Key metrics to monitor include: - Current ratio and quick ratio for liquidity assessment - Debt-to-equity and debt-to-asset ratios for leverage evaluation - Working capital trends for operational efficiency - Asset quality and composition (tangible vs intangible) **Balance Sheet Analysis:** From the provided figures, we can identify: 1. Liquidity position - ability to meet short-term obligations 2. Solvency position - long-term financial stability 3. Capital structure efficiency - optimal balance of debt and equity 4. Asset utilization - return on assets and asset turnover **Recommendations:** - Monitor cash and liquid assets against upcoming obligations - Evaluate working capital efficiency and optimize inventory/receivables cycles - Assess debt refinancing opportunities and covenant compliance - Review asset allocation for strategic alignment with business objectives **Forward Outlook:** The balance sheet position suggests [specific metrics warrant monitoring]. Recommend quarterly tracking of key ratios and balance sheet composition.""" def generate_income_statement_response(row_json: str, ticker: str = "") -> str: """Generate a template CFO-level response for income statement analysis.""" return f"""Based on the income statement analysis{' for ' + ticker if ticker else ''}: **Financial Performance Summary:** The P&L statement reveals important trends in operational and financial performance: - Revenue growth/decline trajectory and market share implications - Gross margin trends indicating pricing power and cost management - Operating margin reflecting operational efficiency - Net profitability and cash conversion quality **Detailed P&L Analysis:** Key observations from the provided figures: 1. Revenue Performance - growth rate, product/segment mix, pricing trends 2. Cost Structure - COGS and operating expense ratios, fixed vs variable split 3. Operating Income - EBITDA margins and sustainability 4. Bottom Line - net income quality, non-recurring items, effective tax rate **Profitability Drivers:** - Revenue growth momentum and growth sustainability - Gross margin management and cost of goods sold trends - Operating leverage and operating expense control - Impact of financing costs and tax efficiency **Strategic Implications:** The income statement demonstrates [specific performance characteristics]. Key action items: - Address [specific cost or revenue concern] if applicable - Leverage [specific strength] for strategic advantage - Monitor [specific metric] for forward guidance accuracy""" def generate_earnings_response(row_json: str, ticker: str = "") -> str: """Generate a template CFO-level response for earnings analysis.""" return f"""Based on the earnings analysis{' for ' + ticker if ticker else ''}: **Earnings Performance Overview:** The quarterly/annual earnings results provide critical insight into financial performance: - Earnings per share (EPS) - headline metric for investor communications - Earnings growth rate - operational momentum and scaling - Earnings quality - cash generation and sustainability - Guidance accuracy - management credibility and forecasting capability **Key Earnings Metrics:** 1. Net Income - absolute profitability and year-over-year growth 2. EPS - per-share profitability and shareholder value creation 3. Operating Cash Flow - earnings quality and working capital management 4. EBITDA - operational earnings independent of capital structure **Performance Assessment:** The earnings results indicate: - Strong/weak demand environment reflected in revenue - Operational leverage working as expected/diverging from forecast - Cost management effectiveness in current environment - Balance sheet impact from current earnings level **Forward Guidance & Outlook:** Based on current earnings trajectory: - Next quarter/year guidance reflects [specific assumptions] - Key variables affecting forecast: [revenue growth, margin, tax rate] - Risk factors that could impact earnings: [operational, market, regulatory] - Upside/downside scenarios and probability weighting""" def generate_risk_analysis_response(row_json: str) -> str: """Generate a template CFO-level response for risk analysis.""" return """Based on the financial risk analysis: **Risk Assessment Summary:** The identified risks span operational, financial, and market dimensions: - Operational risks - business continuity and efficiency threats - Financial risks - credit, liquidity, and interest rate exposure - Market risks - competitive, demand, and pricing pressures - Regulatory/Compliance risks - legal and regulatory changes **Risk Prioritization Matrix:** Risks assessed by impact and likelihood: 1. Critical risks requiring immediate mitigation strategies 2. Important risks with medium-term planning requirements 3. Emerging risks requiring monitoring and contingency planning **Mitigation Strategies:** For each identified risk: - Risk reduction actions - operational changes to minimize exposure - Risk transfer - insurance, hedging, or outsourcing strategies - Risk acceptance - tolerance levels and monitoring protocols - Contingency planning - response actions if risk materializes **Risk Monitoring Framework:** Key performance indicators to track risk levels: - Operational metrics reflecting business health - Financial metrics indicating stress conditions - Market indicators signaling external shifts - Early warning indicators triggering escalation **Board Reporting:** The current risk profile suggests [overall assessment]. Recommended governance oversight: - Monthly monitoring of critical risk indicators - Quarterly deep-dive reviews of major risk categories - Annual comprehensive risk assessment update""" def generate_forecasting_response(row_json: str, ticker: str = "") -> str: """Generate a template CFO-level response for forecasting analysis.""" return f"""Based on the forecasting analysis{' for ' + ticker if ticker else ''}: **Forecast Overview:** The stock price and fundamental forecast projects future financial performance: - Stock price targets based on discounted cash flow or comparable multiples - Revenue and earnings forecasts reflecting growth assumptions - Margin assumptions reflecting competitive positioning - Terminal value assumptions reflecting long-term stability **Methodology Assessment:** The forecast employs: - DCF methodology with specific discount rate and growth assumptions - Comparable company multiples reflecting peer valuation - Historical trend analysis adjusted for forward drivers - Scenario analysis covering base, upside, and downside cases **Key Assumptions Review:** Critical assumptions underlying the forecast: 1. Revenue Growth - [specific rate] based on [market/operational drivers] 2. Operating Margins - [specific levels] reflecting [cost structure/scale] 3. CapEx Requirements - [specific levels] for [growth/maintenance] 4. Discount Rate - [specific rate] reflecting [risk/cost of capital] **Sensitivity Analysis:** Forecast sensitivity to key variables: - [Variable 1]: [Range of outcomes] - [Variable 2]: [Range of outcomes] - [Variable 3]: [Range of outcomes] - [Variable 4]: [Range of outcomes] **Risk & Confidence Assessment:** Confidence level: [High/Medium/Low] based on: - Assumption confidence and historical accuracy - Market dynamics and competitive positioning - Regulatory and macroeconomic risks - Valuation reasonableness relative to peers **Investment Perspective:** The forecast suggests [valuation assessment]. Key catalysts: - Positive: [upside drivers] - Negative: [downside risks] - Timeline: [key inflection points]""" def normalize_balance_sheet(sample: Dict) -> Optional[Dict]: """ Normalize balance sheet data. Converts row to JSON and pairs with CFO analysis prompt/response. """ row_json = convert_row_to_json(sample) ticker = sample.get("ticker", sample.get("symbol", "")) # Select a random prompt for diversity prompt = random.choice(BALANCE_SHEET_PROMPTS) response = generate_balance_sheet_response(row_json, ticker) return { "messages": [ {"role": "system", "content": ""}, # Will be filled later {"role": "user", "content": f"{prompt}\n\n```json\n{row_json}\n```"}, {"role": "assistant", "content": response} ] } def normalize_income_statement(sample: Dict) -> Optional[Dict]: """ Normalize income statement data. Converts row to JSON and pairs with CFO analysis prompt/response. """ row_json = convert_row_to_json(sample) ticker = sample.get("ticker", sample.get("symbol", "")) prompt = random.choice(INCOME_STATEMENT_PROMPTS) response = generate_income_statement_response(row_json, ticker) return { "messages": [ {"role": "system", "content": ""}, {"role": "user", "content": f"{prompt}\n\n```json\n{row_json}\n```"}, {"role": "assistant", "content": response} ] } def normalize_earnings(sample: Dict) -> Optional[Dict]: """ Normalize earnings data. Converts row to JSON and pairs with CFO analysis prompt/response. """ row_json = convert_row_to_json(sample) ticker = sample.get("ticker", sample.get("symbol", "")) prompt = random.choice(EARNINGS_PROMPTS) response = generate_earnings_response(row_json, ticker) return { "messages": [ {"role": "system", "content": ""}, {"role": "user", "content": f"{prompt}\n\n```json\n{row_json}\n```"}, {"role": "assistant", "content": response} ] } def normalize_risk_analysis(sample: Dict) -> Optional[Dict]: """ Normalize risk analysis data. Extracts input/output if available, otherwise converts row to JSON. """ # Try to extract structured input/output columns if "risk_scenario" in sample and "mitigation" in sample: user_content = sample.get("risk_scenario", "") response = sample.get("mitigation", "") elif "input" in sample and "output" in sample: user_content = sample.get("input", "") response = sample.get("output", "") else: # Fall back to JSON conversion with analysis prompt row_json = convert_row_to_json(sample) prompt = random.choice(RISK_ANALYSIS_PROMPTS) user_content = f"{prompt}\n\n```json\n{row_json}\n```" response = generate_risk_analysis_response(row_json) if len(user_content) < 10 or len(response) < 20: return None return { "messages": [ {"role": "system", "content": ""}, {"role": "user", "content": user_content}, {"role": "assistant", "content": response} ] } def normalize_fingpt_forecaster(sample: Dict) -> Optional[Dict]: """ Normalize FinGPT forecaster data. Extracts input/output if available, otherwise converts row to JSON. """ # Try to extract structured input/output columns if "input" in sample and "output" in sample: user_content = sample.get("input", "") response = sample.get("output", "") else: # Fall back to JSON conversion with analysis prompt row_json = convert_row_to_json(sample) ticker = sample.get("ticker", sample.get("symbol", "")) prompt = random.choice(FORECASTING_PROMPTS) user_content = f"{prompt}\n\n```json\n{row_json}\n```" response = generate_forecasting_response(row_json, ticker) if len(user_content) < 10 or len(response) < 20: return None return { "messages": [ {"role": "system", "content": ""}, {"role": "user", "content": user_content}, {"role": "assistant", "content": response} ] } def process_dataset(dataset_name: str, dataset, normalize_fn) -> Tuple[int, int]: """ Process a single dataset and return (valid_count, filtered_count). Args: dataset_name: Name of the dataset for logging dataset: The loaded dataset object normalize_fn: Function to normalize samples from this dataset Returns: Tuple of (number of valid samples, number of filtered samples) """ normalized_samples = [] valid_count = 0 filtered_count = 0 # Handle both single split and multiple splits if isinstance(dataset, dict): splits = list(dataset.keys()) else: splits = ["train"] if hasattr(dataset, "__len__") else [] for split in splits: split_data = dataset[split] if isinstance(dataset, dict) else dataset for sample in split_data: normalized = normalize_fn(sample) if normalized is not None: normalized_samples.append(normalized) valid_count += 1 else: filtered_count += 1 print(f" {dataset_name}: {valid_count} valid, {filtered_count} filtered") return normalized_samples, valid_count, filtered_count def main(): """Main normalization pipeline for Layer 2 datasets.""" # Setup paths script_dir = Path(__file__).parent raw_dir = script_dir / "raw" processed_dir = script_dir / "processed" processed_dir.mkdir(exist_ok=True) # Load system prompt system_prompt = get_system_prompt(script_dir) print(f"Loading datasets from: {raw_dir}") print(f"Output will be saved to: {processed_dir / 'layer2.jsonl'}") print("-" * 60) # Define datasets and their normalization functions datasets_config = [ ("stocks_balance_sheet", normalize_balance_sheet), ("stocks_income_statement", normalize_income_statement), ("stocks_earnings", normalize_earnings), ("risk_analysis", normalize_risk_analysis), ("fingpt_forecaster", normalize_fingpt_forecaster), ] all_samples = [] total_valid = 0 total_filtered = 0 for dataset_name, normalize_fn in datasets_config: dataset_path = raw_dir / dataset_name if not dataset_path.exists(): print(f" {dataset_name}: SKIPPED (directory not found)") continue try: print(f"\nProcessing {dataset_name}...") # Load dataset from disk dataset = load_from_disk(str(dataset_path)) # Process the dataset normalized_samples, valid_count, filtered_count = process_dataset( dataset_name, dataset, normalize_fn ) all_samples.extend(normalized_samples) total_valid += valid_count total_filtered += filtered_count except Exception as e: print(f" {dataset_name}: ERROR - {type(e).__name__}: {str(e)[:100]}") # Add system prompt to all samples print("\nAdding system prompt to all samples...") for sample in all_samples: sample["messages"][0]["content"] = system_prompt # Write to output file output_path = processed_dir / "layer2.jsonl" print(f"\nWriting {len(all_samples)} samples to {output_path}...") with open(output_path, "w", encoding="utf-8") as f: for sample in all_samples: f.write(json.dumps(sample, ensure_ascii=False) + "\n") # Print summary print("\n" + "=" * 60) print("LAYER 2 NORMALIZATION SUMMARY") print("=" * 60) print(f"Total valid samples: {total_valid}") print(f"Total filtered samples: {total_filtered}") print(f"Output file: {output_path}") if output_path.exists(): print(f"Output file size: {output_path.stat().st_size / (1024*1024):.2f} MB") if __name__ == "__main__": main()