DailyRA / src /daily_ra /app.py
bivav's picture
Implement UPSERT functionality for daily_ra table in app.py and enforce unique constraint on work_date in schema.sql
d3c6fdd
import streamlit as st
import psycopg2
import json
from datetime import datetime
import pandas as pd
import os
import sys
from dotenv import load_dotenv
sys.path.append(os.path.join(os.getcwd(), "src"))
from daily_ra.services.llm_service import llm_service, DailyRAInput
# ============================
# 🔧 1. 設定
# ============================
load_dotenv()
DB_HOST = os.environ.get("DB_HOST")
DB_PORT = os.environ.get("DB_PORT", "5432")
DB_USER = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_NAME = os.environ.get("DB_NAME", "postgres")
if all([DB_HOST, DB_USER, DB_PASSWORD, DB_NAME]):
DB_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
else:
st.error("❌ Database connection details not found in environment variables!")
st.stop()
# ============================
# DB 接続
# ============================
try:
conn = psycopg2.connect(DB_URL)
conn.autocommit = True
cur = conn.cursor()
except Exception as e:
st.error(f"❌ Database connection failed: {str(e)}")
st.stop()
# ============================
# Excelファイル読み込み
# ============================
@st.cache_data
def load_category_data():
# Get the directory where this script is located
script_dir = os.path.dirname(os.path.abspath(__file__))
excel_path = os.path.join(script_dir, "All process.xlsx")
df = pd.read_excel(excel_path)
df.columns = [col.strip() for col in df.columns]
df = df.rename(columns={"章": "大分類", "工種": "小分類"})
return df
df_categories = load_category_data()
# ============================
# Streamlit UI
# ============================
st.title("日次RA入力")
st.subheader("作業内容の選択")
major_categories = sorted(df_categories["大分類"].dropna().unique())
selected_major = st.selectbox("章(大分類)を選択してください", ["--選択してください--"] + list(major_categories))
if selected_major != "--選択してください--":
filtered_df = df_categories[df_categories["大分類"] == selected_major]
sub_categories = sorted(filtered_df["小分類"].dropna().unique())
selected_sub = st.selectbox("工種(小分類)を選択してください", ["--選択してください--"] + list(sub_categories))
else:
selected_sub = "--選択してください--"
work_content = f"{selected_major} - {selected_sub}" if selected_major != "--選択してください--" and selected_sub != "--選択してください--" else ""
if work_content:
st.success(f"作業内容: {work_content}")
else:
st.warning("章と工種を選択してください。")
# ============================
# フォーム(その他入力)
# ============================
with st.form("ra_form"):
work_date = st.date_input("作業日")
hazard_points = st.text_area("作業危険ポイント")
general_comments = st.text_area("元請コメント")
risk_identification = st.text_area("危険性・有害性の特定")
mitigation_measures = st.text_area("危険性・有害性の低減策")
inspection_items = st.text_area("点検事項")
submitted = st.form_submit_button("保存")
# ============================
# フォーム送信処理
# ============================
if submitted:
if not work_content:
st.error("❌ 作業内容が未選択です。")
else:
form_data = {
"work_date": str(work_date),
"work_content": work_content,
"hazard_points": hazard_points,
"general_comments": general_comments,
"risk_identification": risk_identification,
"mitigation_measures": mitigation_measures,
"inspection_items": inspection_items
}
# --- DB保存 (UPSERT: work_dateが存在すれば更新、なければ挿入) ---
# First, check if a record with this work_date exists
check_sql = "SELECT id FROM daily_ra WHERE work_date = %s"
cur.execute(check_sql, (work_date,))
existing_record = cur.fetchone()
if existing_record:
# Update existing record
daily_id = existing_record[0]
update_sql = """UPDATE daily_ra
SET work_content = %s, hazard_points = %s, general_comments = %s,
risk_identification = %s, mitigation_measures = %s, inspection_items = %s
WHERE work_date = %s"""
cur.execute(update_sql, (
work_content, hazard_points, general_comments,
risk_identification, mitigation_measures, inspection_items, work_date
))
# Delete old rules for this daily_ra_id before inserting new ones
cur.execute("DELETE FROM rule_base WHERE daily_ra_id = %s", (daily_id,))
conn.commit()
st.success("✅ 既存データを更新しました!")
else:
# Insert new record
insert_sql = """INSERT INTO daily_ra
(work_date, work_content, hazard_points, general_comments, risk_identification, mitigation_measures, inspection_items, created_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,NOW()) RETURNING id"""
cur.execute(insert_sql, tuple(form_data.values()))
daily_id = cur.fetchone()[0]
conn.commit()
st.success("✅ 入力内容を保存しました!")
# --- 🔥 LLMでルール生成 ---
with st.spinner("🤖 LLMで安全ルールを生成中..."):
input_data = DailyRAInput(**form_data)
rules = llm_service.generate_rules(input_data)
# --- ルール保存 ---
if rules:
for r in rules:
sql_rule = """INSERT INTO rule_base (daily_ra_id, object1, object2, risk, created_at)
VALUES (%s,%s,%s,%s,NOW())"""
cur.execute(sql_rule, (daily_id, r.object1, r.object2, r.risk))
conn.commit()
st.success("✅ LLM生成ルールを保存しました!")
st.subheader("🔍 LLMが生成した安全ルール")
st.dataframe([r.dict() for r in rules])
else:
st.warning("⚠️ LLMによるルール生成に失敗しました。")
# --- JSON作成&保存 ---
json_data = {"daily_id": daily_id, "rules": [r.dict() for r in rules]}
json_dir = "/app/json_data" if os.path.exists("/app") else "json_data"
os.makedirs(json_dir, exist_ok=True)
json_path = os.path.join(json_dir, f"daily_ra_{daily_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
st.success(f"✅ JSONファイルを保存しました: {json_path}")
st.download_button(
label="📥 Download JSON File",
data=json.dumps(json_data, ensure_ascii=False, indent=2),
file_name=f"daily_ra_{daily_id}.json",
mime="application/json"
)