KiemkeKhinhakinh / data_handler.py
gaialive's picture
Upload 11 files
22836fe verified
"""
Data handler for CarbonFootprint by GXS application.
Manages data import, export, and processing.
"""
import pandas as pd
import json
import os
from datetime import datetime
import csv
from io import StringIO
from fpdf import FPDF
import matplotlib.pyplot as plt
import seaborn as sns
from emission_factors import get_emission_factor, get_categories, get_activities
# Constants
DATA_DIR = "data"
EMISSIONS_FILE = os.path.join(DATA_DIR, "emissions.json")
COMPANY_INFO_FILE = os.path.join(DATA_DIR, "company_info.json")
# Ensure data directory exists
os.makedirs(DATA_DIR, exist_ok=True)
class DataHandler:
def __init__(self):
"""Initialize the DataHandler class."""
self.load_emissions_data()
self.load_company_info()
def load_emissions_data(self):
"""Load emissions data from file."""
if os.path.exists(EMISSIONS_FILE):
with open(EMISSIONS_FILE, 'r') as f:
try:
self.emissions_data = pd.DataFrame(json.load(f))
# Convert date strings to datetime objects
if 'date' in self.emissions_data.columns:
self.emissions_data['date'] = pd.to_datetime(self.emissions_data['date'])
except json.JSONDecodeError:
self.create_empty_emissions_data()
else:
self.create_empty_emissions_data()
def create_empty_emissions_data(self):
"""Create empty emissions dataframe."""
self.emissions_data = pd.DataFrame(columns=[
'date', 'scope', 'category', 'activity', 'quantity',
'unit', 'emission_factor', 'emissions_kgCO2e', 'notes'
])
def load_company_info(self):
"""Load company information from file."""
if os.path.exists(COMPANY_INFO_FILE):
with open(COMPANY_INFO_FILE, 'r') as f:
try:
self.company_info = json.load(f)
except json.JSONDecodeError:
self.create_empty_company_info()
else:
self.create_empty_company_info()
def create_empty_company_info(self):
"""Create empty company information."""
self.company_info = {
"name": "",
"industry": "",
"location": "",
"export_markets": [],
"contact_person": "",
"email": "",
"phone": "",
"address": "",
"registration_number": "",
"reporting_year": datetime.now().year
}
def save_emissions_data(self):
"""Save emissions data to file."""
# Convert datetime objects to strings
data_to_save = self.emissions_data.copy()
if 'date' in data_to_save.columns:
data_to_save['date'] = data_to_save['date'].dt.strftime('%Y-%m-%d')
with open(EMISSIONS_FILE, 'w') as f:
json.dump(data_to_save.to_dict('records'), f, indent=2)
def save_company_info(self):
"""Save company information to file."""
with open(COMPANY_INFO_FILE, 'w') as f:
json.dump(self.company_info, f, indent=2)
def add_emission_entry(self, date, scope, category, activity, quantity, unit, emission_factor, notes=""):
"""
Add a new emission entry.
Args:
date (datetime): Date of the emission
scope (str): Emission scope (Scope 1, Scope 2, or Scope 3)
category (str): Emission category
activity (str): Specific activity
quantity (float): Quantity of activity
unit (str): Unit of measurement
emission_factor (float): Emission factor
notes (str, optional): Additional notes
Returns:
bool: True if successful, False otherwise
"""
try:
# Calculate emissions
emissions_kgCO2e = float(quantity) * float(emission_factor)
# Create new entry
new_entry = pd.DataFrame([{
'date': pd.Timestamp(date),
'scope': scope,
'category': category,
'activity': activity,
'quantity': float(quantity),
'unit': unit,
'emission_factor': float(emission_factor),
'emissions_kgCO2e': emissions_kgCO2e,
'notes': notes
}])
# Append to existing data
self.emissions_data = pd.concat([self.emissions_data, new_entry], ignore_index=True)
# Save data
self.save_emissions_data()
return True
except Exception as e:
print(f"Error adding emission entry: {str(e)}")
return False
def import_csv(self, file_path_or_buffer):
"""
Import emissions data from CSV.
Args:
file_path_or_buffer: Path to CSV file or file-like object
Returns:
tuple: (success, message)
"""
try:
# Read CSV
df = pd.read_csv(file_path_or_buffer)
# Check required columns
required_columns = ['date', 'scope', 'category', 'activity', 'quantity', 'unit', 'emission_factor']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"Missing required columns: {', '.join(missing_columns)}"
# Convert date strings to datetime objects
df['date'] = pd.to_datetime(df['date'])
# Calculate emissions if not provided
if 'emissions_kgCO2e' not in df.columns:
df['emissions_kgCO2e'] = df['quantity'].astype(float) * df['emission_factor'].astype(float)
# Add notes column if not present
if 'notes' not in df.columns:
df['notes'] = ""
# Append to existing data
self.emissions_data = pd.concat([self.emissions_data, df], ignore_index=True)
# Save data
self.save_emissions_data()
return True, f"Successfully imported {len(df)} entries"
except Exception as e:
return False, f"Error importing CSV: {str(e)}"
def export_csv(self, file_path=None, start_date=None, end_date=None):
"""
Export emissions data to CSV.
Args:
file_path (str, optional): Path to save CSV file
start_date (datetime, optional): Start date for filtering
end_date (datetime, optional): End date for filtering
Returns:
str or bool: CSV string if file_path is None, otherwise True if successful
"""
try:
# Filter data by date range if specified
data = self.emissions_data.copy()
if start_date and end_date:
mask = (data['date'] >= pd.Timestamp(start_date)) & (data['date'] <= pd.Timestamp(end_date))
data = data.loc[mask]
# Convert datetime objects to strings
if 'date' in data.columns:
data['date'] = data['date'].dt.strftime('%Y-%m-%d')
if file_path:
# Save to file
data.to_csv(file_path, index=False)
return True
else:
# Return CSV string
csv_buffer = StringIO()
data.to_csv(csv_buffer, index=False)
return csv_buffer.getvalue()
except Exception as e:
print(f"Error exporting CSV: {str(e)}")
return False
def generate_pdf_report(self, file_path=None, start_date=None, end_date=None):
"""
Generate PDF report.
Args:
file_path (str, optional): Path to save PDF file
start_date (datetime, optional): Start date for filtering
end_date (datetime, optional): End date for filtering
Returns:
bytes or bool: PDF bytes if file_path is None, otherwise True if successful
"""
try:
# Filter data by date range if specified
data = self.emissions_data.copy()
if start_date and end_date:
mask = (data['date'] >= pd.Timestamp(start_date)) & (data['date'] <= pd.Timestamp(end_date))
data = data.loc[mask]
# Create PDF
pdf = FPDF()
pdf.add_page()
# Set font
pdf.set_font("Arial", "B", 16)
# Title
pdf.cell(0, 10, "Carbon Emissions Report", 0, 1, "C")
pdf.set_font("Arial", "", 12)
# Company info
pdf.cell(0, 10, f"Company: {self.company_info['name']}", 0, 1)
pdf.cell(0, 10, f"Reporting Period: {start_date.strftime('%Y-%m-%d') if start_date else 'All'} to {end_date.strftime('%Y-%m-%d') if end_date else 'All'}", 0, 1)
pdf.cell(0, 10, f"Generated on: {datetime.now().strftime('%Y-%m-%d')}", 0, 1)
# Summary
pdf.ln(10)
pdf.set_font("Arial", "B", 14)
pdf.cell(0, 10, "Summary", 0, 1)
pdf.set_font("Arial", "", 12)
total_emissions = data['emissions_kgCO2e'].sum()
pdf.cell(0, 10, f"Total Emissions: {total_emissions:.2f} kgCO2e", 0, 1)
# Emissions by scope
scope_data = data.groupby('scope')['emissions_kgCO2e'].sum().reset_index()
pdf.ln(5)
pdf.cell(0, 10, "Emissions by Scope:", 0, 1)
for _, row in scope_data.iterrows():
pdf.cell(0, 10, f"{row['scope']}: {row['emissions_kgCO2e']:.2f} kgCO2e ({row['emissions_kgCO2e'] / total_emissions * 100:.1f}%)", 0, 1)
# Emissions by category
category_data = data.groupby('category')['emissions_kgCO2e'].sum().reset_index()
pdf.ln(5)
pdf.cell(0, 10, "Top Categories:", 0, 1)
for _, row in category_data.nlargest(5, 'emissions_kgCO2e').iterrows():
pdf.cell(0, 10, f"{row['category']}: {row['emissions_kgCO2e']:.2f} kgCO2e ({row['emissions_kgCO2e'] / total_emissions * 100:.1f}%)", 0, 1)
# Data table
pdf.ln(10)
pdf.set_font("Arial", "B", 14)
pdf.cell(0, 10, "Emissions Data", 0, 1)
pdf.set_font("Arial", "B", 10)
# Table header
col_widths = [25, 25, 30, 30, 20, 15, 25, 30]
headers = ['Date', 'Scope', 'Category', 'Activity', 'Quantity', 'Unit', 'Factor', 'Emissions (kgCO2e)']
for i, header in enumerate(headers):
pdf.cell(col_widths[i], 10, header, 1)
pdf.ln()
# Table data
pdf.set_font("Arial", "", 8)
for _, row in data.iterrows():
pdf.cell(col_widths[0], 10, row['date'].strftime('%Y-%m-%d') if isinstance(row['date'], pd.Timestamp) else str(row['date']), 1)
pdf.cell(col_widths[1], 10, str(row['scope']), 1)
pdf.cell(col_widths[2], 10, str(row['category']), 1)
pdf.cell(col_widths[3], 10, str(row['activity']), 1)
pdf.cell(col_widths[4], 10, f"{row['quantity']:.2f}", 1)
pdf.cell(col_widths[5], 10, str(row['unit']), 1)
pdf.cell(col_widths[6], 10, f"{row['emission_factor']:.4f}", 1)
pdf.cell(col_widths[7], 10, f"{row['emissions_kgCO2e']:.2f}", 1)
pdf.ln()
if file_path:
# Save to file
pdf.output(file_path)
return True
else:
# Return PDF bytes
return pdf.output(dest='S').encode('latin1')
except Exception as e:
print(f"Error generating PDF report: {str(e)}")
return False
def get_emissions_summary(self):
"""
Get emissions summary statistics.
Returns:
dict: Summary statistics
"""
if len(self.emissions_data) == 0:
return {
"total_emissions": 0,
"scope_breakdown": {},
"category_breakdown": {},
"time_series": {}
}
# Total emissions
total_emissions = self.emissions_data['emissions_kgCO2e'].sum()
# Emissions by scope
scope_data = self.emissions_data.groupby('scope')['emissions_kgCO2e'].sum().to_dict()
# Emissions by category
category_data = self.emissions_data.groupby('category')['emissions_kgCO2e'].sum().to_dict()
# Time series data (monthly)
time_data = self.emissions_data.copy()
if 'date' in time_data.columns and len(time_data) > 0:
time_data['month'] = time_data['date'].dt.strftime('%Y-%m')
time_series = time_data.groupby(['month', 'scope'])['emissions_kgCO2e'].sum().reset_index()
time_series_dict = {}
for _, row in time_series.iterrows():
if row['month'] not in time_series_dict:
time_series_dict[row['month']] = {}
time_series_dict[row['month']][row['scope']] = row['emissions_kgCO2e']
else:
time_series_dict = {}
return {
"total_emissions": total_emissions,
"scope_breakdown": scope_data,
"category_breakdown": category_data,
"time_series": time_series_dict
}
def get_filtered_data(self, start_date=None, end_date=None, scope=None, category=None):
"""
Get filtered emissions data.
Args:
start_date (datetime, optional): Start date for filtering
end_date (datetime, optional): End date for filtering
scope (str, optional): Scope for filtering
category (str, optional): Category for filtering
Returns:
pandas.DataFrame: Filtered data
"""
data = self.emissions_data.copy()
# Apply filters
if start_date and end_date:
mask = (data['date'] >= pd.Timestamp(start_date)) & (data['date'] <= pd.Timestamp(end_date))
data = data.loc[mask]
if scope:
data = data[data['scope'] == scope]
if category:
data = data[data['category'] == category]
return data