not-sure / utils /data_io.py
mabuseif's picture
Upload 25 files
6cc22a6 verified
"""
Data import/export module for HVAC Load Calculator.
This module provides functionality for importing and exporting building data and results.
"""
import json
import csv
import os
import datetime
import pandas as pd
import numpy as np
import base64
import io
import zipfile
import streamlit as st
from typing import Dict, List, Any, Optional, Tuple
from models.building import Building, CoolingLoadResult
def export_building_to_json(building: Building, file_path: str = None) -> str:
"""
Export building data to JSON format.
Args:
building: Building model to export
file_path: Optional file path to save JSON data
Returns:
JSON string representation of the building
"""
# Convert building to dictionary
building_dict = building.to_dict()
# Convert to JSON string
json_str = json.dumps(building_dict, indent=2)
# Save to file if path provided
if file_path:
with open(file_path, 'w') as f:
f.write(json_str)
return json_str
def import_building_from_json(json_str: str) -> Building:
"""
Import building data from JSON string.
Args:
json_str: JSON string representation of the building
Returns:
Building model
"""
# Parse JSON string
building_dict = json.loads(json_str)
# Create building from dictionary
building = Building.from_dict(building_dict)
return building
def export_result_to_json(result: CoolingLoadResult, file_path: str = None) -> str:
"""
Export cooling load result to JSON format.
Args:
result: Cooling load result to export
file_path: Optional file path to save JSON data
Returns:
JSON string representation of the result
"""
# Convert result to dictionary
result_dict = result.to_dict()
# Convert to JSON string
json_str = json.dumps(result_dict, indent=2)
# Save to file if path provided
if file_path:
with open(file_path, 'w') as f:
f.write(json_str)
return json_str
def import_result_from_json(json_str: str) -> CoolingLoadResult:
"""
Import cooling load result from JSON string.
Args:
json_str: JSON string representation of the result
Returns:
Cooling load result
"""
# Parse JSON string
result_dict = json.loads(json_str)
# Create result from dictionary
result = CoolingLoadResult.from_dict(result_dict)
return result
def export_to_csv(data: Dict[str, Any], file_path: str = None) -> str:
"""
Export data to CSV format.
Args:
data: Dictionary of data to export
file_path: Optional file path to save CSV data
Returns:
CSV string representation of the data
"""
# Convert to DataFrame
df = pd.DataFrame(data)
# Convert to CSV string
csv_str = df.to_csv(index=False)
# Save to file if path provided
if file_path:
with open(file_path, 'w') as f:
f.write(csv_str)
return csv_str
def export_monthly_breakdown_to_csv(monthly_breakdown: Dict[str, Dict[str, Any]], file_path: str = None) -> str:
"""
Export monthly breakdown data to CSV format.
Args:
monthly_breakdown: Monthly breakdown data
file_path: Optional file path to save CSV data
Returns:
CSV string representation of the monthly breakdown
"""
# Create data for CSV
csv_data = []
for month, data in monthly_breakdown.items():
if month != "annual":
csv_data.append({
"Month": month,
"Peak Load (W)": data.get("peak_load_w", 0),
"Average Load (W)": data.get("average_load_w", 0),
"Energy (kWh)": data.get("energy_kwh", 0),
"Average Temperature (°C)": data.get("avg_temp_c", 0),
"Cooling Degree Days": data.get("cooling_degree_days", 0)
})
# Add annual total
if "annual" in monthly_breakdown:
annual = monthly_breakdown["annual"]
csv_data.append({
"Month": "Annual",
"Peak Load (W)": annual.get("peak_load_w", 0),
"Average Load (W)": annual.get("average_load_w", 0),
"Energy (kWh)": annual.get("energy_kwh", 0),
"Average Temperature (°C)": annual.get("avg_temp_c", 0),
"Cooling Degree Days": annual.get("cooling_degree_days", 0)
})
# Convert to DataFrame
df = pd.DataFrame(csv_data)
# Convert to CSV string
csv_str = df.to_csv(index=False)
# Save to file if path provided
if file_path:
with open(file_path, 'w') as f:
f.write(csv_str)
return csv_str
def export_building_comparison_to_csv(buildings: Dict[str, Building], results: Dict[str, CoolingLoadResult],
monthly_breakdowns: Dict[str, Dict[str, Any]], file_path: str = None) -> str:
"""
Export building comparison data to CSV format.
Args:
buildings: Dictionary of building models {name: Building}
results: Dictionary of cooling load results {name: CoolingLoadResult}
monthly_breakdowns: Dictionary of monthly breakdowns {name: monthly_breakdown}
file_path: Optional file path to save CSV data
Returns:
CSV string representation of the building comparison
"""
# Create data for CSV
csv_data = []
for name, building in buildings.items():
result = results.get(name)
monthly_breakdown = monthly_breakdowns.get(name)
if result and monthly_breakdown:
annual = monthly_breakdown.get("annual", {})
# Building properties
walls_u_avg = np.mean([wall.u_value for wall in building.walls]) if building.walls else 0
glass_u_avg = np.mean([glass.u_value for glass in building.glass]) if building.glass else 0
glass_shgc_avg = np.mean([glass.shgc for glass in building.glass]) if building.glass else 0
# Calculate wall-to-window ratio
total_wall_area = sum([wall.area for wall in building.walls]) if building.walls else 0
total_glass_area = sum([glass.area for glass in building.glass]) if building.glass else 0
wwr = total_glass_area / (total_wall_area + total_glass_area) if (total_wall_area + total_glass_area) > 0 else 0
csv_data.append({
"Building Name": building.settings.name,
"Location": building.location.city,
"Floor Area (m²)": building.settings.floor_area,
"Indoor Design Temperature (°C)": building.settings.indoor_temp,
"Wall Average U-Value (W/m²·K)": walls_u_avg,
"Roof U-Value (W/m²·K)": building.roof.u_value,
"Glass Average U-Value (W/m²·K)": glass_u_avg,
"Glass Average SHGC": glass_shgc_avg,
"Window-to-Wall Ratio": wwr,
"Number of Occupants": building.people.count,
"Lighting Power (W)": building.lighting.power,
"Equipment Power (W)": building.equipment.power,
"Peak Cooling Load (kW)": result.peak_total_load / 1000,
"Peak Cooling Load (W/m²)": result.peak_total_load / building.settings.floor_area,
"Sensible Heat Ratio": result.peak_sensible_load / result.peak_total_load if result.peak_total_load > 0 else 0,
"Annual Energy Consumption (kWh)": annual.get("energy_kwh", 0),
"Annual Energy Consumption (kWh/m²)": annual.get("energy_kwh", 0) / building.settings.floor_area
})
# Convert to DataFrame
df = pd.DataFrame(csv_data)
# Convert to CSV string
csv_str = df.to_csv(index=False)
# Save to file if path provided
if file_path:
with open(file_path, 'w') as f:
f.write(csv_str)
return csv_str
def generate_pdf_report(building: Building, result: CoolingLoadResult, monthly_breakdown: Dict[str, Any] = None) -> bytes:
"""
Generate PDF report for building and cooling load results.
Args:
building: Building model
result: Cooling load result
monthly_breakdown: Optional monthly breakdown data
Returns:
PDF report as bytes
"""
try:
from reportlab.lib.pagesizes import letter
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
import matplotlib.pyplot as plt
import io
except ImportError:
# If reportlab is not installed, return empty bytes
return b""
# Create PDF buffer
buffer = io.BytesIO()
# Create PDF document
doc = SimpleDocTemplate(buffer, pagesize=letter)
styles = getSampleStyleSheet()
# Create content
content = []
# Title
title_style = styles["Title"]
content.append(Paragraph(f"HVAC Cooling Load Report", title_style))
content.append(Spacer(1, 0.25 * inch))
# Building information
heading_style = styles["Heading1"]
normal_style = styles["Normal"]
content.append(Paragraph("Building Information", heading_style))
content.append(Spacer(1, 0.1 * inch))
building_info = [
["Building Name", building.settings.name],
["Location", building.location.city],
["Floor Area", f"{building.settings.floor_area} m²"],
["Indoor Design Temperature", f"{building.settings.indoor_temp} °C"],
["Indoor Design Humidity", f"{building.settings.indoor_humidity} %"],
["Number of Occupants", str(building.people.count)]
]
building_table = Table(building_info, colWidths=[2.5 * inch, 3.5 * inch])
building_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey),
('TEXTCOLOR', (0, 0), (0, -1), colors.black),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('BACKGROUND', (1, 0), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
content.append(building_table)
content.append(Spacer(1, 0.25 * inch))
# Peak cooling load
content.append(Paragraph("Peak Cooling Load", heading_style))
content.append(Spacer(1, 0.1 * inch))
peak_load_info = [
["Sensible Load", f"{result.peak_sensible_load:.1f} W"],
["Latent Load", f"{result.peak_latent_load:.1f} W"],
["Total Load", f"{result.peak_total_load:.1f} W"],
["Peak Hour", f"{result.peak_hour}:00"],
["Sensible Heat Ratio", f"{result.peak_sensible_load / result.peak_total_load:.2f}" if result.peak_total_load > 0 else "0.00"]
]
peak_load_table = Table(peak_load_info, colWidths=[2.5 * inch, 3.5 * inch])
peak_load_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey),
('TEXTCOLOR', (0, 0), (0, -1), colors.black),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('BACKGROUND', (1, 0), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
content.append(peak_load_table)
content.append(Spacer(1, 0.25 * inch))
# Load breakdown
content.append(Paragraph("Load Breakdown", heading_style))
content.append(Spacer(1, 0.1 * inch))
# Create pie charts for load breakdown
external_loads = result.external_loads
internal_loads = result.internal_loads
# External loads breakdown
external_breakdown = {
"Roof": external_loads.get("roof", 0),
"Walls": external_loads.get("walls_total", 0),
"Glass Conduction": external_loads.get("glass_conduction_total", 0),
"Glass Solar": external_loads.get("glass_solar_total", 0)
}
# Internal loads breakdown
internal_breakdown = {
"People (Sensible)": internal_loads.get("people_sensible", 0),
"People (Latent)": internal_loads.get("people_latent", 0),
"Lighting": internal_loads.get("lighting", 0),
"Equipment (Sensible)": internal_loads.get("equipment_sensible", 0),
"Equipment (Latent)": internal_loads.get("equipment_latent", 0)
}
# Create pie charts
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
ax1.pie(
external_breakdown.values(),
labels=external_breakdown.keys(),
autopct='%1.1f%%',
startangle=90
)
ax1.axis('equal')
ax1.set_title("External Loads")
ax2.pie(
internal_breakdown.values(),
labels=internal_breakdown.keys(),
autopct='%1.1f%%',
startangle=90
)
ax2.axis('equal')
ax2.set_title("Internal Loads")
# Save figure to buffer
img_buffer = io.BytesIO()
plt.tight_layout()
plt.savefig(img_buffer, format='png')
img_buffer.seek(0)
# Add image to content
img = Image(img_buffer, width=6 * inch, height=3 * inch)
content.append(img)
content.append(Spacer(1, 0.25 * inch))
# Monthly breakdown
if monthly_breakdown:
content.append(Paragraph("Monthly Breakdown", heading_style))
content.append(Spacer(1, 0.1 * inch))
# Create monthly breakdown table
monthly_table_data = [["Month", "Peak Load (kW)", "Avg. Load (kW)", "Energy (kWh)"]]
for month, data in monthly_breakdown.items():
if month != "annual":
monthly_table_data.append([
month,
f"{data.get('peak_load_w', 0) / 1000:.2f}",
f"{data.get('average_load_w', 0) / 1000:.2f}",
f"{data.get('energy_kwh', 0):.1f}"
])
# Add annual total
if "annual" in monthly_breakdown:
annual = monthly_breakdown["annual"]
monthly_table_data.append([
"Annual",
f"{annual.get('peak_load_w', 0) / 1000:.2f}",
f"{annual.get('average_load_w', 0) / 1000:.2f}",
f"{annual.get('energy_kwh', 0):.1f}"
])
monthly_table = Table(monthly_table_data, colWidths=[1.5 * inch, 1.5 * inch, 1.5 * inch, 1.5 * inch])
monthly_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 6),
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
content.append(monthly_table)
content.append(Spacer(1, 0.25 * inch))
# Create monthly bar chart
months = [month for month in monthly_breakdown.keys() if month != "annual"]
peak_loads = [monthly_breakdown[month]["peak_load_w"] / 1000 for month in months]
plt.figure(figsize=(8, 4))
plt.bar(months, peak_loads)
plt.xlabel("Month")
plt.ylabel("Peak Cooling Load (kW)")
plt.title("Monthly Peak Cooling Load")
plt.xticks(rotation=45)
plt.tight_layout()
# Save figure to buffer
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png')
img_buffer.seek(0)
# Add image to content
img = Image(img_buffer, width=6 * inch, height=3 * inch)
content.append(img)
content.append(Spacer(1, 0.25 * inch))
# Footer
content.append(Paragraph(f"Report generated on {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", normal_style))
# Build PDF
doc.build(content)
# Get PDF data
pdf_data = buffer.getvalue()
buffer.close()
return pdf_data
def create_project_archive(building: Building, result: CoolingLoadResult, monthly_breakdown: Dict[str, Any] = None) -> bytes:
"""
Create a ZIP archive containing all project data.
Args:
building: Building model
result: Cooling load result
monthly_breakdown: Optional monthly breakdown data
Returns:
ZIP archive as bytes
"""
# Create ZIP buffer
buffer = io.BytesIO()
# Create ZIP file
with zipfile.ZipFile(buffer, 'w') as zip_file:
# Add building data
building_json = export_building_to_json(building)
zip_file.writestr('building.json', building_json)
# Add result data
result_json = export_result_to_json(result)
zip_file.writestr('result.json', result_json)
# Add monthly breakdown data if available
if monthly_breakdown:
monthly_csv = export_monthly_breakdown_to_csv(monthly_breakdown)
zip_file.writestr('monthly_breakdown.csv', monthly_csv)
# Add PDF report if reportlab is available
try:
pdf_data = generate_pdf_report(building, result, monthly_breakdown)
if pdf_data:
zip_file.writestr('report.pdf', pdf_data)
except:
pass
# Get ZIP data
zip_data = buffer.getvalue()
buffer.close()
return zip_data
def get_download_link(data: bytes, filename: str, text: str) -> str:
"""
Generate a download link for binary data.
Args:
data: Binary data to download
filename: Name of the file to download
text: Text to display for the download link
Returns:
HTML string for the download link
"""
b64 = base64.b64encode(data).decode()
href = f'<a href="data:application/octet-stream;base64,{b64}" download="{filename}">{text}</a>'
return href
def display_export_options(building: Building, result: CoolingLoadResult, monthly_breakdown: Dict[str, Any] = None):
"""
Display export options in Streamlit UI.
Args:
building: Building model
result: Cooling load result
monthly_breakdown: Optional monthly breakdown data
"""
st.subheader("Export Options")
export_tabs = st.tabs(["Building Data", "Results", "Monthly Data", "Complete Project"])
with export_tabs[0]:
st.write("Export building data to JSON format.")
# Export building data
building_json = export_building_to_json(building)
# Create download button
st.download_button(
label="Download Building Data (JSON)",
data=building_json,
file_name=f"{building.settings.name}_building.json",
mime="application/json"
)
with export_tabs[1]:
st.write("Export calculation results to JSON format.")
# Export result data
result_json = export_result_to_json(result)
# Create download button
st.download_button(
label="Download Results (JSON)",
data=result_json,
file_name=f"{building.settings.name}_results.json",
mime="application/json"
)
with export_tabs[2]:
st.write("Export monthly breakdown data to CSV format.")
if monthly_breakdown:
# Export monthly breakdown data
monthly_csv = export_monthly_breakdown_to_csv(monthly_breakdown)
# Create download button
st.download_button(
label="Download Monthly Data (CSV)",
data=monthly_csv,
file_name=f"{building.settings.name}_monthly.csv",
mime="text/csv"
)
else:
st.info("Monthly breakdown data not available.")
with export_tabs[3]:
st.write("Export complete project as ZIP archive.")
# Create ZIP archive
zip_data = create_project_archive(building, result, monthly_breakdown)
# Create download button
st.download_button(
label="Download Complete Project (ZIP)",
data=zip_data,
file_name=f"{building.settings.name}_project.zip",
mime="application/zip"
)
st.info("The ZIP archive contains all project data, including building information, calculation results, and monthly breakdown data.")
def display_import_options():
"""
Display import options in Streamlit UI.
Returns:
Tuple of (building, result, monthly_breakdown) if import successful, None otherwise
"""
st.subheader("Import Options")
import_tabs = st.tabs(["Building Data", "Complete Project"])
with import_tabs[0]:
st.write("Import building data from JSON file.")
# File uploader
uploaded_file = st.file_uploader("Upload Building JSON", type=["json"])
if uploaded_file is not None:
try:
# Read JSON data
json_str = uploaded_file.getvalue().decode('utf-8')
# Import building
building = import_building_from_json(json_str)
st.success(f"Successfully imported building: {building.settings.name}")
return (building, None, None)
except Exception as e:
st.error(f"Error importing building data: {str(e)}")
with import_tabs[1]:
st.write("Import complete project from ZIP archive.")
# File uploader
uploaded_file = st.file_uploader("Upload Project ZIP", type=["zip"])
if uploaded_file is not None:
try:
# Read ZIP data
zip_data = uploaded_file.getvalue()
# Create ZIP buffer
buffer = io.BytesIO(zip_data)
# Extract ZIP file
with zipfile.ZipFile(buffer, 'r') as zip_file:
# Extract building data
if 'building.json' in zip_file.namelist():
building_json = zip_file.read('building.json').decode('utf-8')
building = import_building_from_json(building_json)
else:
st.error("Building data not found in ZIP archive.")
return None
# Extract result data
if 'result.json' in zip_file.namelist():
result_json = zip_file.read('result.json').decode('utf-8')
result = import_result_from_json(result_json)
else:
result = None
# Extract monthly breakdown data
if 'monthly_breakdown.csv' in zip_file.namelist():
monthly_csv = zip_file.read('monthly_breakdown.csv').decode('utf-8')
monthly_df = pd.read_csv(io.StringIO(monthly_csv))
# Convert to dictionary
monthly_breakdown = {}
for _, row in monthly_df.iterrows():
month = row['Month']
monthly_breakdown[month] = {
'peak_load_w': row['Peak Load (W)'],
'average_load_w': row['Average Load (W)'],
'energy_kwh': row['Energy (kWh)']
}
if 'Average Temperature (°C)' in row:
monthly_breakdown[month]['avg_temp_c'] = row['Average Temperature (°C)']
if 'Cooling Degree Days' in row:
monthly_breakdown[month]['cooling_degree_days'] = row['Cooling Degree Days']
else:
monthly_breakdown = None
st.success(f"Successfully imported project: {building.settings.name}")
return (building, result, monthly_breakdown)
except Exception as e:
st.error(f"Error importing project: {str(e)}")
return None