""" Data handler for CarbonFootprint by GXS application. Manages data import, export, and processing. """ import pandas as pd import json import os from datetime import datetime import csv from io import StringIO from fpdf import FPDF import matplotlib.pyplot as plt import seaborn as sns from emission_factors import get_emission_factor, get_categories, get_activities # Constants DATA_DIR = "data" EMISSIONS_FILE = os.path.join(DATA_DIR, "emissions.json") COMPANY_INFO_FILE = os.path.join(DATA_DIR, "company_info.json") # Ensure data directory exists os.makedirs(DATA_DIR, exist_ok=True) class DataHandler: def __init__(self): """Initialize the DataHandler class.""" self.load_emissions_data() self.load_company_info() def load_emissions_data(self): """Load emissions data from file.""" if os.path.exists(EMISSIONS_FILE): with open(EMISSIONS_FILE, 'r') as f: try: self.emissions_data = pd.DataFrame(json.load(f)) # Convert date strings to datetime objects if 'date' in self.emissions_data.columns: self.emissions_data['date'] = pd.to_datetime(self.emissions_data['date']) except json.JSONDecodeError: self.create_empty_emissions_data() else: self.create_empty_emissions_data() def create_empty_emissions_data(self): """Create empty emissions dataframe.""" self.emissions_data = pd.DataFrame(columns=[ 'date', 'scope', 'category', 'activity', 'quantity', 'unit', 'emission_factor', 'emissions_kgCO2e', 'notes' ]) def load_company_info(self): """Load company information from file.""" if os.path.exists(COMPANY_INFO_FILE): with open(COMPANY_INFO_FILE, 'r') as f: try: self.company_info = json.load(f) except json.JSONDecodeError: self.create_empty_company_info() else: self.create_empty_company_info() def create_empty_company_info(self): """Create empty company information.""" self.company_info = { "name": "", "industry": "", "location": "", "export_markets": [], "contact_person": "", "email": "", "phone": "", "address": "", "registration_number": "", "reporting_year": datetime.now().year } def save_emissions_data(self): """Save emissions data to file.""" # Convert datetime objects to strings data_to_save = self.emissions_data.copy() if 'date' in data_to_save.columns: data_to_save['date'] = data_to_save['date'].dt.strftime('%Y-%m-%d') with open(EMISSIONS_FILE, 'w') as f: json.dump(data_to_save.to_dict('records'), f, indent=2) def save_company_info(self): """Save company information to file.""" with open(COMPANY_INFO_FILE, 'w') as f: json.dump(self.company_info, f, indent=2) def add_emission_entry(self, date, scope, category, activity, quantity, unit, emission_factor, notes=""): """ Add a new emission entry. Args: date (datetime): Date of the emission scope (str): Emission scope (Scope 1, Scope 2, or Scope 3) category (str): Emission category activity (str): Specific activity quantity (float): Quantity of activity unit (str): Unit of measurement emission_factor (float): Emission factor notes (str, optional): Additional notes Returns: bool: True if successful, False otherwise """ try: # Calculate emissions emissions_kgCO2e = float(quantity) * float(emission_factor) # Create new entry new_entry = pd.DataFrame([{ 'date': pd.Timestamp(date), 'scope': scope, 'category': category, 'activity': activity, 'quantity': float(quantity), 'unit': unit, 'emission_factor': float(emission_factor), 'emissions_kgCO2e': emissions_kgCO2e, 'notes': notes }]) # Append to existing data self.emissions_data = pd.concat([self.emissions_data, new_entry], ignore_index=True) # Save data self.save_emissions_data() return True except Exception as e: print(f"Error adding emission entry: {str(e)}") return False def import_csv(self, file_path_or_buffer): """ Import emissions data from CSV. Args: file_path_or_buffer: Path to CSV file or file-like object Returns: tuple: (success, message) """ try: # Read CSV df = pd.read_csv(file_path_or_buffer) # Check required columns required_columns = ['date', 'scope', 'category', 'activity', 'quantity', 'unit', 'emission_factor'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"Missing required columns: {', '.join(missing_columns)}" # Convert date strings to datetime objects df['date'] = pd.to_datetime(df['date']) # Calculate emissions if not provided if 'emissions_kgCO2e' not in df.columns: df['emissions_kgCO2e'] = df['quantity'].astype(float) * df['emission_factor'].astype(float) # Add notes column if not present if 'notes' not in df.columns: df['notes'] = "" # Append to existing data self.emissions_data = pd.concat([self.emissions_data, df], ignore_index=True) # Save data self.save_emissions_data() return True, f"Successfully imported {len(df)} entries" except Exception as e: return False, f"Error importing CSV: {str(e)}" def export_csv(self, file_path=None, start_date=None, end_date=None): """ Export emissions data to CSV. Args: file_path (str, optional): Path to save CSV file start_date (datetime, optional): Start date for filtering end_date (datetime, optional): End date for filtering Returns: str or bool: CSV string if file_path is None, otherwise True if successful """ try: # Filter data by date range if specified data = self.emissions_data.copy() if start_date and end_date: mask = (data['date'] >= pd.Timestamp(start_date)) & (data['date'] <= pd.Timestamp(end_date)) data = data.loc[mask] # Convert datetime objects to strings if 'date' in data.columns: data['date'] = data['date'].dt.strftime('%Y-%m-%d') if file_path: # Save to file data.to_csv(file_path, index=False) return True else: # Return CSV string csv_buffer = StringIO() data.to_csv(csv_buffer, index=False) return csv_buffer.getvalue() except Exception as e: print(f"Error exporting CSV: {str(e)}") return False def generate_pdf_report(self, file_path=None, start_date=None, end_date=None): """ Generate PDF report. Args: file_path (str, optional): Path to save PDF file start_date (datetime, optional): Start date for filtering end_date (datetime, optional): End date for filtering Returns: bytes or bool: PDF bytes if file_path is None, otherwise True if successful """ try: # Filter data by date range if specified data = self.emissions_data.copy() if start_date and end_date: mask = (data['date'] >= pd.Timestamp(start_date)) & (data['date'] <= pd.Timestamp(end_date)) data = data.loc[mask] # Create PDF pdf = FPDF() pdf.add_page() # Set font pdf.set_font("Arial", "B", 16) # Title pdf.cell(0, 10, "Carbon Emissions Report", 0, 1, "C") pdf.set_font("Arial", "", 12) # Company info pdf.cell(0, 10, f"Company: {self.company_info['name']}", 0, 1) pdf.cell(0, 10, f"Reporting Period: {start_date.strftime('%Y-%m-%d') if start_date else 'All'} to {end_date.strftime('%Y-%m-%d') if end_date else 'All'}", 0, 1) pdf.cell(0, 10, f"Generated on: {datetime.now().strftime('%Y-%m-%d')}", 0, 1) # Summary pdf.ln(10) pdf.set_font("Arial", "B", 14) pdf.cell(0, 10, "Summary", 0, 1) pdf.set_font("Arial", "", 12) total_emissions = data['emissions_kgCO2e'].sum() pdf.cell(0, 10, f"Total Emissions: {total_emissions:.2f} kgCO2e", 0, 1) # Emissions by scope scope_data = data.groupby('scope')['emissions_kgCO2e'].sum().reset_index() pdf.ln(5) pdf.cell(0, 10, "Emissions by Scope:", 0, 1) for _, row in scope_data.iterrows(): pdf.cell(0, 10, f"{row['scope']}: {row['emissions_kgCO2e']:.2f} kgCO2e ({row['emissions_kgCO2e'] / total_emissions * 100:.1f}%)", 0, 1) # Emissions by category category_data = data.groupby('category')['emissions_kgCO2e'].sum().reset_index() pdf.ln(5) pdf.cell(0, 10, "Top Categories:", 0, 1) for _, row in category_data.nlargest(5, 'emissions_kgCO2e').iterrows(): pdf.cell(0, 10, f"{row['category']}: {row['emissions_kgCO2e']:.2f} kgCO2e ({row['emissions_kgCO2e'] / total_emissions * 100:.1f}%)", 0, 1) # Data table pdf.ln(10) pdf.set_font("Arial", "B", 14) pdf.cell(0, 10, "Emissions Data", 0, 1) pdf.set_font("Arial", "B", 10) # Table header col_widths = [25, 25, 30, 30, 20, 15, 25, 30] headers = ['Date', 'Scope', 'Category', 'Activity', 'Quantity', 'Unit', 'Factor', 'Emissions (kgCO2e)'] for i, header in enumerate(headers): pdf.cell(col_widths[i], 10, header, 1) pdf.ln() # Table data pdf.set_font("Arial", "", 8) for _, row in data.iterrows(): pdf.cell(col_widths[0], 10, row['date'].strftime('%Y-%m-%d') if isinstance(row['date'], pd.Timestamp) else str(row['date']), 1) pdf.cell(col_widths[1], 10, str(row['scope']), 1) pdf.cell(col_widths[2], 10, str(row['category']), 1) pdf.cell(col_widths[3], 10, str(row['activity']), 1) pdf.cell(col_widths[4], 10, f"{row['quantity']:.2f}", 1) pdf.cell(col_widths[5], 10, str(row['unit']), 1) pdf.cell(col_widths[6], 10, f"{row['emission_factor']:.4f}", 1) pdf.cell(col_widths[7], 10, f"{row['emissions_kgCO2e']:.2f}", 1) pdf.ln() if file_path: # Save to file pdf.output(file_path) return True else: # Return PDF bytes return pdf.output(dest='S').encode('latin1') except Exception as e: print(f"Error generating PDF report: {str(e)}") return False def get_emissions_summary(self): """ Get emissions summary statistics. Returns: dict: Summary statistics """ if len(self.emissions_data) == 0: return { "total_emissions": 0, "scope_breakdown": {}, "category_breakdown": {}, "time_series": {} } # Total emissions total_emissions = self.emissions_data['emissions_kgCO2e'].sum() # Emissions by scope scope_data = self.emissions_data.groupby('scope')['emissions_kgCO2e'].sum().to_dict() # Emissions by category category_data = self.emissions_data.groupby('category')['emissions_kgCO2e'].sum().to_dict() # Time series data (monthly) time_data = self.emissions_data.copy() if 'date' in time_data.columns and len(time_data) > 0: time_data['month'] = time_data['date'].dt.strftime('%Y-%m') time_series = time_data.groupby(['month', 'scope'])['emissions_kgCO2e'].sum().reset_index() time_series_dict = {} for _, row in time_series.iterrows(): if row['month'] not in time_series_dict: time_series_dict[row['month']] = {} time_series_dict[row['month']][row['scope']] = row['emissions_kgCO2e'] else: time_series_dict = {} return { "total_emissions": total_emissions, "scope_breakdown": scope_data, "category_breakdown": category_data, "time_series": time_series_dict } def get_filtered_data(self, start_date=None, end_date=None, scope=None, category=None): """ Get filtered emissions data. Args: start_date (datetime, optional): Start date for filtering end_date (datetime, optional): End date for filtering scope (str, optional): Scope for filtering category (str, optional): Category for filtering Returns: pandas.DataFrame: Filtered data """ data = self.emissions_data.copy() # Apply filters if start_date and end_date: mask = (data['date'] >= pd.Timestamp(start_date)) & (data['date'] <= pd.Timestamp(end_date)) data = data.loc[mask] if scope: data = data[data['scope'] == scope] if category: data = data[data['category'] == category] return data