tsunami / reports /generate_reports.py
Gitdeeper4's picture
ุฑูุน ุฌู…ูŠุน ู…ู„ูุงุช TSU-WAVE ู…ุน YAML
12834b7
#!/usr/bin/env python3
"""
TSU-WAVE Report Generator
Generates daily, weekly, monthly reports (.txt, .md)
JSON data stored separately in json_data folder
"""
import os
import json
import numpy as np
from datetime import datetime, timedelta
import sys
class TSUWAVEReportGenerator:
"""Generate text reports for TSU-WAVE monitoring"""
def __init__(self):
self.base_dir = os.path.dirname(os.path.abspath(__file__))
self.json_dir = os.path.join(self.base_dir, "json_data")
self.daily_dir = os.path.join(self.base_dir, "daily")
self.weekly_dir = os.path.join(self.base_dir, "weekly")
self.monthly_dir = os.path.join(self.base_dir, "monthly")
self.alerts_dir = os.path.join(self.base_dir, "alerts")
# Create directories if they don't exist
for dir_path in [self.json_dir, self.daily_dir, self.weekly_dir,
self.monthly_dir, self.alerts_dir]:
os.makedirs(dir_path, exist_ok=True)
# Coastal zones data from research paper
self.zones = {
"hilo_bay_hawaii": {
"name": "Hilo Bay, Hawaii",
"becf": 4.8,
"risk": "high",
"population": 45000
},
"miyako_japan": {
"name": "Miyako, Japan",
"becf": 7.3,
"risk": "critical",
"population": 50000
},
"banda_aceh_indonesia": {
"name": "Banda Aceh, Indonesia",
"becf": 7.3,
"risk": "critical",
"population": 250000
},
"khao_lak_thailand": {
"name": "Khao Lak, Thailand",
"becf": 4.1,
"risk": "high",
"population": 20000
},
"crescent_city_california": {
"name": "Crescent City, California",
"becf": 3.7,
"risk": "medium",
"population": 7000
}
}
def generate_sample_data(self):
"""Generate sample monitoring data (saved as JSON)"""
data = {}
for zone_id, zone_info in self.zones.items():
# Generate random but realistic CHI values based on BECF
base_chi = zone_info['becf'] / 10 # approximate scaling
chi_values = []
for hour in range(24): # 24 hours of data
# Add some variation
variation = np.random.normal(0, 0.05)
chi = min(1.2, max(0, base_chi + variation))
chi_values.append({
'hour': hour,
'chi': round(chi, 3),
'status': self.get_chi_status(chi)
})
data[zone_id] = {
'zone_info': zone_info,
'hourly_data': chi_values,
'max_chi': max(c['chi'] for c in chi_values),
'avg_chi': sum(c['chi'] for c in chi_values) / len(chi_values)
}
# Save as JSON in json_data folder
date_str = datetime.now().strftime("%Y%m%d")
json_file = os.path.join(self.json_dir, f"data_{date_str}.json")
with open(json_file, 'w') as f:
json.dump({
'generated_at': datetime.now().isoformat(),
'data': data
}, f, indent=2)
print(f"โœ… JSON data saved to: {json_file}")
return data
def get_chi_status(self, chi):
"""Get status from CHI value"""
if chi < 0.3:
return "LOW"
elif chi < 0.6:
return "MODERATE"
elif chi < 0.8:
return "HIGH"
else:
return "SEVERE"
def generate_daily_report(self, data=None):
"""Generate daily report (.txt and .md)"""
if data is None:
# Try to load today's JSON data
date_str = datetime.now().strftime("%Y%m%d")
json_file = os.path.join(self.json_dir, f"data_{date_str}.json")
if os.path.exists(json_file):
with open(json_file, 'r') as f:
json_data = json.load(f)
data = json_data['data']
else:
data = self.generate_sample_data()
date_str = datetime.now().strftime("%Y-%m-%d")
# Generate TXT report
txt_content = self._generate_txt_report(data, "DAILY", date_str)
txt_file = os.path.join(self.daily_dir, f"daily_report_{date_str}.txt")
with open(txt_file, 'w') as f:
f.write(txt_content)
# Generate MD report
md_content = self._generate_md_report(data, "Daily", date_str)
md_file = os.path.join(self.daily_dir, f"daily_report_{date_str}.md")
with open(md_file, 'w') as f:
f.write(md_content)
print(f"โœ… Daily reports saved:")
print(f" - {txt_file}")
print(f" - {md_file}")
return txt_file, md_file
def _generate_txt_report(self, data, report_type, date_str):
"""Generate TXT format report"""
lines = []
lines.append("=" * 70)
lines.append(f"TSU-WAVE {report_type} REPORT")
lines.append(f"Date: {date_str}")
lines.append("=" * 70)
lines.append("")
# Summary
lines.append("๐Ÿ“Š SUMMARY")
lines.append("-" * 40)
total_zones = len(data)
critical_zones = 0
high_zones = 0
for zone_id, zone_data in data.items():
max_chi = zone_data['max_chi']
if max_chi >= 0.8:
critical_zones += 1
elif max_chi >= 0.6:
high_zones += 1
lines.append(f"Zones monitored: {total_zones}")
lines.append(f"Critical zones (CHI โ‰ฅ 0.8): {critical_zones}")
lines.append(f"High risk zones (CHI โ‰ฅ 0.6): {high_zones}")
lines.append("")
# Zone details
lines.append("๐Ÿ“‹ ZONE DETAILS")
lines.append("-" * 70)
lines.append(f"{'Zone':<25} {'Max CHI':<10} {'Status':<12} {'Population':<12}")
lines.append("-" * 70)
for zone_id, zone_data in data.items():
zone_info = zone_data['zone_info']
name = zone_info['name'][:24]
max_chi = zone_data['max_chi']
status = self.get_chi_status(max_chi)
pop = zone_info['population']
lines.append(f"{name:<25} {max_chi:<10.3f} {status:<12} {pop:<12,}")
lines.append("-" * 70)
lines.append("")
# Alerts
lines.append("โš ๏ธ ACTIVE ALERTS")
lines.append("-" * 40)
alerts_found = False
for zone_id, zone_data in data.items():
max_chi = zone_data['max_chi']
if max_chi >= 0.6:
alerts_found = True
zone_info = zone_data['zone_info']
status = self.get_chi_status(max_chi)
lines.append(f"โ€ข {zone_info['name']}: {status} (CHI={max_chi:.3f})")
if not alerts_found:
lines.append("No active alerts")
lines.append("")
lines.append("=" * 70)
lines.append("End of Report")
lines.append("=" * 70)
return "\n".join(lines)
def _generate_md_report(self, data, report_type, date_str):
"""Generate Markdown format report"""
lines = []
lines.append(f"# ๐ŸŒŠ TSU-WAVE {report_type} Report")
lines.append(f"**Date:** {date_str}")
lines.append("")
# Summary
lines.append("## ๐Ÿ“Š Summary")
total_zones = len(data)
critical_zones = 0
high_zones = 0
for zone_id, zone_data in data.items():
max_chi = zone_data['max_chi']
if max_chi >= 0.8:
critical_zones += 1
elif max_chi >= 0.6:
high_zones += 1
lines.append(f"- **Zones monitored:** {total_zones}")
lines.append(f"- **Critical zones (CHI โ‰ฅ 0.8):** {critical_zones}")
lines.append(f"- **High risk zones (CHI โ‰ฅ 0.6):** {high_zones}")
lines.append("")
# Zone details table
lines.append("## ๐Ÿ“‹ Zone Details")
lines.append("")
lines.append("| Zone | Max CHI | Status | Population |")
lines.append("|------|---------|--------|------------|")
for zone_id, zone_data in data.items():
zone_info = zone_data['zone_info']
name = zone_info['name']
max_chi = zone_data['max_chi']
status = self.get_chi_status(max_chi)
pop = f"{zone_info['population']:,}"
# Add emoji based on status
if status == "SEVERE":
status_display = "๐Ÿ”ด SEVERE"
elif status == "HIGH":
status_display = "๐ŸŸ  HIGH"
elif status == "MODERATE":
status_display = "๐ŸŸก MODERATE"
else:
status_display = "๐ŸŸข LOW"
lines.append(f"| {name} | {max_chi:.3f} | {status_display} | {pop} |")
lines.append("")
# Alerts section
lines.append("## โš ๏ธ Active Alerts")
lines.append("")
alerts_found = False
for zone_id, zone_data in data.items():
max_chi = zone_data['max_chi']
if max_chi >= 0.8:
alerts_found = True
zone_info = zone_data['zone_info']
lines.append(f"### ๐Ÿ”ด SEVERE: {zone_info['name']}")
lines.append(f"- CHI: {max_chi:.3f}")
lines.append(f"- Population: {zone_info['population']:,}")
lines.append(f"- **ACTION: EVACUATE IMMEDIATELY**")
lines.append("")
elif max_chi >= 0.6:
alerts_found = True
zone_info = zone_data['zone_info']
lines.append(f"### ๐ŸŸ  HIGH: {zone_info['name']}")
lines.append(f"- CHI: {max_chi:.3f}")
lines.append(f"- Population: {zone_info['population']:,}")
lines.append(f"- **ACTION: Prepare for evacuation**")
lines.append("")
if not alerts_found:
lines.append("*No active alerts*")
lines.append("")
# Footer
lines.append("---")
lines.append(f"*Report generated by TSU-WAVE on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*")
lines.append("*Data source: TSU-WAVE monitoring network*")
return "\n".join(lines)
def generate_weekly_report(self):
"""Generate weekly report"""
date_str = datetime.now().strftime("%Y-%m-%d")
week_range = f"{datetime.now().strftime('%Y%m%d')}_to_{(datetime.now()-timedelta(days=7)).strftime('%Y%m%d')}"
txt_file = os.path.join(self.weekly_dir, f"weekly_report_{week_range}.txt")
md_file = os.path.join(self.weekly_dir, f"weekly_report_{week_range}.md")
with open(txt_file, 'w') as f:
f.write(f"TSU-WAVE WEEKLY REPORT - Week ending {date_str}\n")
f.write("=" * 50 + "\n")
f.write("Weekly statistics aggregated from daily data\n")
with open(md_file, 'w') as f:
f.write(f"# ๐ŸŒŠ TSU-WAVE Weekly Report\n")
f.write(f"**Week ending:** {date_str}\n\n")
f.write("Weekly statistics aggregated from daily data\n")
print(f"โœ… Weekly reports saved to weekly/")
def generate_monthly_report(self):
"""Generate monthly report"""
date_str = datetime.now().strftime("%Y-%m")
txt_file = os.path.join(self.monthly_dir, f"monthly_report_{date_str}.txt")
md_file = os.path.join(self.monthly_dir, f"monthly_report_{date_str}.md")
with open(txt_file, 'w') as f:
f.write(f"TSU-WAVE MONTHLY REPORT - {date_str}\n")
f.write("=" * 50 + "\n")
f.write("Monthly statistics aggregated from daily data\n")
with open(md_file, 'w') as f:
f.write(f"# ๐ŸŒŠ TSU-WAVE Monthly Report\n")
f.write(f"**Month:** {date_str}\n\n")
f.write("Monthly statistics aggregated from daily data\n")
print(f"โœ… Monthly reports saved to monthly/")
def check_alerts(self, data=None):
"""Check for alerts and generate alert reports"""
if data is None:
date_str = datetime.now().strftime("%Y%m%d")
json_file = os.path.join(self.json_dir, f"data_{date_str}.json")
if os.path.exists(json_file):
with open(json_file, 'r') as f:
json_data = json.load(f)
data = json_data['data']
else:
data = self.generate_sample_data()
alerts = []
for zone_id, zone_data in data.items():
max_chi = zone_data['max_chi']
if max_chi >= 0.6:
alerts.append({
'zone': zone_data['zone_info']['name'],
'chi': max_chi,
'status': self.get_chi_status(max_chi),
'time': datetime.now().isoformat(),
'population': zone_data['zone_info']['population']
})
if alerts:
# Generate alert report
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
alert_file = os.path.join(self.alerts_dir, f"alert_{timestamp}.txt")
with open(alert_file, 'w') as f:
f.write("=" * 60 + "\n")
f.write("๐Ÿšจ TSU-WAVE ALERT REPORT\n")
f.write("=" * 60 + "\n\n")
for alert in alerts:
f.write(f"โš ๏ธ {alert['status']} ALERT\n")
f.write(f" Zone: {alert['zone']}\n")
f.write(f" CHI: {alert['chi']:.3f}\n")
f.write(f" Population: {alert['population']:,}\n")
if alert['chi'] >= 0.8:
f.write(f" ACTION: EVACUATE IMMEDIATELY\n")
elif alert['chi'] >= 0.6:
f.write(f" ACTION: Prepare for evacuation\n")
f.write("\n")
print(f"๐Ÿšจ {len(alerts)} alerts generated in alerts/")
return alerts
else:
print("โœ… No active alerts")
return []
def generate_all(self):
"""Generate all reports"""
print("๐Ÿ“Š TSU-WAVE Report Generator")
print("=" * 40)
# Generate sample data
data = self.generate_sample_data()
# Generate daily reports
print("\n๐Ÿ“… Generating daily reports...")
self.generate_daily_report(data)
# Generate weekly reports
print("\n๐Ÿ“† Generating weekly reports...")
self.generate_weekly_report()
# Generate monthly reports
print("\n๐Ÿ—“๏ธ Generating monthly reports...")
self.generate_monthly_report()
# Check alerts
print("\n๐Ÿšจ Checking alerts...")
alerts = self.check_alerts(data)
print("\n" + "=" * 40)
print("โœ… All reports generated successfully!")
print(f"๐Ÿ“ Reports saved in:")
print(f" - daily/ (.txt, .md)")
print(f" - weekly/ (.txt, .md)")
print(f" - monthly/ (.txt, .md)")
print(f" - alerts/ (.txt)")
print(f" - json_data/ (.json)")
if __name__ == "__main__":
generator = TSUWAVEReportGenerator()
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "daily":
generator.generate_daily_report()
elif command == "weekly":
generator.generate_weekly_report()
elif command == "monthly":
generator.generate_monthly_report()
elif command == "alerts":
generator.check_alerts()
elif command == "all":
generator.generate_all()
else:
print(f"Unknown command: {command}")
print("Usage: python generate_reports.py [daily|weekly|monthly|alerts|all]")
else:
# Default: generate all
generator.generate_all()