Gemini
feat: add detailed logging
01d5a5d
import logging
import os
from datetime import datetime
import numpy as np
from flask import Blueprint, jsonify
from lpm_kernel.L1.bio import Bio
# from lpm_kernel.L1.l1_repository import L1Repository
from lpm_kernel.L1.l1_generator import L1Generator
from lpm_kernel.L1.serializers import NotesStorage, NoteSerializer
from lpm_kernel.L1.utils import save_true_topics
from lpm_kernel.api.common.responses import APIResponse
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.kernel.chunk_service import ChunkService
from lpm_kernel.kernel.l1.l1_manager import (
generate_l1_from_l0,
generate_and_store_status_bio,
get_latest_status_bio,
extract_notes_from_documents,
document_service,
)
from lpm_kernel.kernel.note_service import NoteService
from lpm_kernel.models.l1 import (
L1Version,
L1Bio,
L1Shade,
L1Cluster,
L1ChunkTopic,
L1GenerationResult,
)
logger = logging.getLogger(__name__)
kernel_bp = Blueprint("kernel", __name__, url_prefix="/api/kernel")
# l1_repository = L1Repository()
l1_generator = L1Generator()
def __store_version(
session, new_version_number: int, description: str = None
) -> L1Version:
"""Store L1 version information"""
version = L1Version(
version=new_version_number,
create_time=datetime.now(),
status="active",
description=description or f"L1 data version {new_version_number}",
)
session.add(version)
return version
def __store_bio(session, new_version: int, bio_data: Bio) -> None:
"""Store Bio data"""
if not bio_data:
logger.warning("No bio data found")
return
bio_record = L1Bio(
version=new_version,
content=bio_data.content_second_view,
content_third_view=bio_data.content_third_view,
summary=bio_data.summary_second_view,
summary_third_view=bio_data.summary_third_view,
create_time=datetime.now(),
)
session.add(bio_record)
def __store_shades(session, new_version: int, shades_list: list) -> None:
"""Store Shades data"""
if not shades_list:
logger.warning("No shades data found")
return
for shade in shades_list:
shade_data = L1Shade(
version=new_version,
name=shade.name,
aspect=shade.aspect,
icon=shade.icon,
desc_third_view=shade.desc_third_view,
content_third_view=shade.content_third_view,
desc_second_view=shade.desc_second_view,
content_second_view=shade.content_second_view,
create_time=datetime.now(),
)
session.add(shade_data)
def __store_clusters(session, new_version: int, cluster_list: list) -> None:
"""Store Clusters data"""
if not cluster_list:
logger.warning("No clusters data found")
return
for cluster in cluster_list:
cluster_data = L1Cluster(
version=new_version,
cluster_id=cluster.get("clusterId"),
memory_ids=[m.get("memoryId") for m in cluster.get("memoryList", [])],
cluster_center=cluster.get("clusterCenter"),
create_time=datetime.now(),
)
session.add(cluster_data)
def __store_chunk_topics(session, new_version: int, chunk_topics_dict: dict) -> None:
"""Store Chunk Topics data"""
if not isinstance(chunk_topics_dict, dict):
logger.warning(f"Invalid chunk_topics format: {type(chunk_topics_dict)}")
return
logger.info(f"Found chunk topics dict: {chunk_topics_dict}")
for cluster_id, cluster_data in chunk_topics_dict.items():
# for each chunkId, create an unique record
for chunk_id in cluster_data.get("chunkIds", []):
topic_data = L1ChunkTopic(
version=new_version,
chunk_id=chunk_id,
topic=cluster_data.get("topic"),
tags=cluster_data.get("tags"),
create_time=datetime.now(),
)
session.add(topic_data)
def store_l1_data(session, l1_data: L1GenerationResult) -> int:
"""Store L1 data based on L0 data"""
try:
# 1. Get current latest version
latest_version = (
session.query(L1Version).order_by(L1Version.version.desc()).first()
)
new_version_number = (latest_version.version + 1) if latest_version else 1
logger.info(f"Creating new version: {new_version_number}")
# 2. Create new version record
version = __store_version(session, new_version_number)
# 3. Store Bio data
__store_bio(session, new_version_number, l1_data.bio)
# 4. Store Shades data
if hasattr(l1_data.bio, "shades_list"):
__store_shades(session, new_version_number, l1_data.bio.shades_list)
# 5. Store Clusters data
cluster_list = l1_data.clusters.get("clusterList", [])
__store_clusters(session, new_version_number, cluster_list)
# 6. Store Chunk Topics data
__store_chunk_topics(session, new_version_number, l1_data.chunk_topics)
# 7. Commit transaction
session.commit()
logger.info(f"Successfully stored L1 data version {new_version_number}")
return new_version_number
except Exception as e:
session.rollback()
logger.error(f"Error creating L1 version: {str(e)}")
raise
def serialize_value(value):
"""iterate over the nested dictionary and serialize the values"""
if isinstance(value, np.ndarray):
return {
"type": "ndarray",
"shape": list(value.shape), # turn the shape into a list
"dtype": str(value.dtype),
}
elif isinstance(value, Bio):
return str(value)
elif isinstance(value, dict):
return {k: serialize_value(v) for k, v in value.items()}
elif isinstance(value, (list, tuple)):
return [serialize_value(item) for item in value]
elif isinstance(value, (int, float, str, bool)) or value is None:
return value
else:
return str(value)
@kernel_bp.route("/l1/global/generate", methods=["POST"])
def generate_l1():
"""Generate L1 data from L0 data and store"""
try:
# 1. Generate L1 data
result = generate_l1_from_l0()
if result is None:
return jsonify(APIResponse.error("No valid L1 data generated"))
# 2. Store L1 data
with DatabaseSession.session() as session:
version_number = store_l1_data(session, result)
# 3. Convert result to serializable format
serializable_result = serialize_value(result.to_dict())
# 4. return the result
response_data = {
"version": version_number,
"message": f"L1 data generated and stored successfully with version {version_number}",
"data": serializable_result,
}
return jsonify(APIResponse.success(data=response_data))
except Exception as e:
logger.error(f"Error generating L1: {str(e)}", exc_info=True)
return jsonify(APIResponse.error(str(e)))
@kernel_bp.route("/l1/global/versions", methods=["GET"])
def list_l1_versions():
"""Get all L1 data versions"""
try:
with DatabaseSession.session() as session:
versions = session.query(L1Version).order_by(L1Version.version.desc()).all()
version_list = [
{
"version": v.version,
"create_time": v.create_time.isoformat(),
"status": v.status,
"description": v.description,
}
for v in versions
]
return jsonify(APIResponse.success(data=version_list))
except Exception as e:
logger.error(f"Error listing L1 versions: {str(e)}", exc_info=True)
return jsonify(APIResponse.error(str(e)))
@kernel_bp.route("/l1/global/version/<int:version>", methods=["GET"])
def get_l1_version(version):
"""Get specified version of L1 data"""
try:
with DatabaseSession.session() as session:
# Get all data for this version
bio = session.query(L1Bio).filter(L1Bio.version == version).first()
shades = session.query(L1Shade).filter(L1Shade.version == version).all()
clusters = (
session.query(L1Cluster).filter(L1Cluster.version == version).all()
)
chunk_topics = (
session.query(L1ChunkTopic)
.filter(L1ChunkTopic.version == version)
.all()
)
if not bio:
return jsonify(APIResponse.error(f"Version {version} not found"))
# Build response data
data = {
"version": version,
"bio": {
"content": bio.content,
"content_third_view": bio.content_third_view,
"summary": bio.summary,
"summary_third_view": bio.summary_third_view,
"shades": [
{
"name": s.name,
"aspect": s.aspect,
"icon": s.icon,
"desc_third_view": s.desc_third_view,
"content_third_view": s.content_third_view,
"desc_second_view": s.desc_second_view,
"content_second_view": s.content_second_view,
}
for s in shades
],
},
"clusters": [
{
"cluster_id": c.cluster_id,
"memory_ids": c.memory_ids,
"cluster_center": c.cluster_center,
}
for c in clusters
],
"chunk_topics": [
{"chunk_id": t.chunk_id, "topic": t.topic, "tags": t.tags}
for t in chunk_topics
],
}
return jsonify(APIResponse.success(data=data))
except Exception as e:
logger.error(f"Error getting L1 version {version}: {str(e)}", exc_info=True)
return jsonify(APIResponse.error(str(e)))
@kernel_bp.route("/l1/status_bio/generate", methods=["POST"])
def generate_status_biography():
"""Generate status biography"""
# Call l1_manager method to generate and store status biography
status_bio = generate_and_store_status_bio()
# Build response data
response_data = {
"content": status_bio.content_second_view,
"content_third_view": status_bio.content_third_view,
"summary": status_bio.summary_second_view,
"summary_third_view": status_bio.summary_third_view,
"shades": [
{
"name": shade.name,
"aspect": shade.aspect,
"icon": shade.icon,
"desc_third_view": shade.desc_third_view,
"content_third_view": shade.content_third_view,
"desc_second_view": shade.desc_second_view,
"content_second_view": shade.content_second_view,
}
for shade in status_bio.shades_list
],
}
return jsonify(APIResponse.success(data=response_data))
@kernel_bp.route("/l1/status_bio/get", methods=["GET"])
def get_status_biography():
"""Get status biography"""
# Call l1_manager method to get status biography
status_bio = get_latest_status_bio()
return jsonify(APIResponse.success(data=status_bio))
@kernel_bp.route("/l1/latest/save_topics", methods=["GET"])
def save_latest_topics():
"""get the latest L1 topics and save to the pointed directory"""
# use the related resources directory
base_dir = os.path.join(os.getcwd(), "resources/L2/data_pipeline/raw_data")
os.makedirs(base_dir, exist_ok=True)
chunk_service = ChunkService()
topics_data = chunk_service.query_topics_data()
save_true_topics(topics_data, os.path.join(base_dir, "topics.json"))
return jsonify(APIResponse.success(data={}))
@kernel_bp.route("/l1/latest/save_notes", methods=["GET"])
def save_latest_notes():
"""Get latest version of notes and save to file"""
documents = document_service.list_documents_with_l0()
notes_list, _ = extract_notes_from_documents(documents)
if not notes_list:
return jsonify(APIResponse.error("No notes found"))
# Get latest topics information
note_service = NoteService()
note_service.prepareNotes(notes_list)
# use NotesStorage to save notes
storage = NotesStorage()
result = storage.save_notes(notes_list)
return jsonify(APIResponse.success(data=result))
@kernel_bp.route("/l1/notes", methods=["GET"])
def get_notes():
"""Get notes from file"""
storage = NotesStorage()
try:
notes_list = storage.load_notes()
except FileNotFoundError:
return jsonify(
APIResponse.error("Notes file not found. Please save notes first.")
)
# serialize notes
serializable_notes = [NoteSerializer.to_dict(note) for note in notes_list]
return jsonify(
APIResponse.success(
data={"notes": serializable_notes, "count": len(serializable_notes)}
)
)