LifeLog / data.py
arunsa's picture
LifeLog v1.0 - A Debugger for Your Life Decisions
8f74b6b
Raw
History Blame Contribute Delete
4.06 kB
import json
import hashlib
from datetime import datetime
from pathlib import Path
from filelock import FileLock
DATA_DIR = Path("data")
CARDS_DIR = DATA_DIR / "cards"
DECISIONS_FILE = DATA_DIR / "decisions.json"
LOCK_FILE = DATA_DIR / ".decisions.lock"
def ensure_dirs():
DATA_DIR.mkdir(exist_ok=True)
CARDS_DIR.mkdir(exist_ok=True)
def generate_commit_hash(timestamp: str, raw_input: str) -> str:
content = f"{timestamp}:{raw_input}"
return hashlib.sha256(content.encode()).hexdigest()[:7]
def generate_id() -> str:
now = datetime.now()
decisions = load_decisions()
today_count = sum(
1 for d in decisions
if d["timestamp"].startswith(now.strftime("%Y-%m-%d"))
)
return f"dec_{now.strftime('%Y%m%d')}_{today_count + 1:03d}"
def load_decisions() -> list[dict]:
ensure_dirs()
if not DECISIONS_FILE.exists():
return []
try:
with open(DECISIONS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return []
def _write_decisions(decisions: list[dict]):
ensure_dirs()
with open(DECISIONS_FILE, "w", encoding="utf-8") as f:
json.dump(decisions, f, indent=2, default=str)
def save_decision(record: dict):
lock = FileLock(str(LOCK_FILE))
with lock:
decisions = load_decisions()
decisions.append(record)
_write_decisions(decisions)
def update_decision(decision_id: str, updates: dict):
lock = FileLock(str(LOCK_FILE))
with lock:
decisions = load_decisions()
for d in decisions:
if d["id"] == decision_id:
d.update(updates)
break
_write_decisions(decisions)
def get_open_decisions() -> list[dict]:
return [d for d in load_decisions() if d.get("status") == "open"]
def get_decision_by_id(decision_id: str) -> dict | None:
for d in load_decisions():
if d["id"] == decision_id:
return d
return None
def resolve_decision(decision_id: str, description: str, valence: str):
decision = get_decision_by_id(decision_id)
if not decision:
return
predictions = decision.get("consequence_predictions", [])
high_preds = [p for p in predictions if p.get("probability") == "high"]
if high_preds:
matching = sum(1 for p in high_preds if p.get("valence") == valence)
accuracy = matching / len(high_preds)
else:
accuracy = 0.5
outcome = {
"timestamp": datetime.now().isoformat(),
"description": description,
"actual_valence": valence,
"prediction_accuracy": round(accuracy, 2),
}
update_decision(decision_id, {"status": "resolved", "outcome": outcome})
def export_decisions() -> str:
return json.dumps(load_decisions(), indent=2, default=str)
def create_decision_record(
raw_input: str,
input_type: str,
follow_up_qa: list[dict],
category: str,
subcategory: str,
severity: int,
status_emoji: str,
consequence_predictions: list[dict],
moment_card_prompt: str,
moment_card_path: str | None = None,
image_description: str | None = None,
) -> dict:
now = datetime.now()
decision_id = generate_id()
commit_hash = generate_commit_hash(now.isoformat(), raw_input)
record = {
"id": decision_id,
"timestamp": now.isoformat(),
"input_type": input_type,
"raw_input": raw_input,
"image_description": image_description,
"follow_up_qa": follow_up_qa,
"category": category,
"subcategory": subcategory,
"severity": severity,
"consequence_predictions": consequence_predictions,
"moment_card_prompt": moment_card_prompt,
"moment_card_path": moment_card_path,
"status": "open",
"outcome": None,
"debug_metadata": {
"commit_hash": commit_hash,
"branch": category,
"status_emoji": status_emoji,
},
}
save_decision(record)
return record