GradioDemo / shared /utils /review_logger.py
eigentom
Initial Update
90c099b
"""
Review Logger Utility
Captures and logs all intermediate outputs from the review pipeline
"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, List
class ReviewLogger:
"""
Logger for capturing complete review pipeline execution logs
"""
def __init__(self, log_dir: Optional[str] = None, enabled: bool = True):
"""
Initialize Review Logger
Args:
log_dir: Directory to save log files. If None, uses current directory.
enabled: Whether logging is enabled
"""
self.enabled = enabled
self.log_dir = Path(log_dir) if log_dir else Path.cwd()
self.log_dir.mkdir(parents=True, exist_ok=True)
# Current run data
self.current_run_id: Optional[str] = None
self.current_run_data: Optional[Dict[str, Any]] = None
def start_run(
self,
title: str,
abstract: str,
content: Optional[str] = None,
keywords: Optional[List[str]] = None,
publication_date_range: Optional[str] = None,
venues: Optional[str] = None,
review_format: str = "detailed",
) -> str:
"""
Start a new review run and generate UUID
IMPORTANT: If current_run_data already exists, this method will preserve existing
intermediate_outputs data to prevent data loss. Only input data and metadata are updated.
Args:
title: Paper title
abstract: Paper abstract
content: Paper content (optional)
keywords: Existing keywords (optional)
publication_date_range: Date range filter (optional)
venues: Venue filter (optional)
review_format: Review format
Returns:
Run UUID string
"""
if not self.enabled:
return ""
# Generate UUID based on timestamp
timestamp = datetime.now()
# Use timestamp-based UUID (UUID1 uses MAC address + timestamp)
run_id = str(uuid.uuid1())
# PRESERVE existing intermediate_outputs if current_run_data already exists
# This prevents data loss if start_run() is called multiple times
existing_intermediate_outputs = None
existing_final_output = None
existing_errors = None
if self.current_run_data is not None:
existing_intermediate_outputs = self.current_run_data.get("intermediate_outputs")
existing_final_output = self.current_run_data.get("final_output")
existing_errors = self.current_run_data.get("errors", [])
self.current_run_id = run_id
# Initialize intermediate_outputs: use existing data if available, otherwise create new
if existing_intermediate_outputs is not None:
# Preserve existing intermediate outputs
intermediate_outputs = existing_intermediate_outputs
# Only initialize None fields if they don't exist
if "generated_keywords" not in intermediate_outputs:
intermediate_outputs["generated_keywords"] = None
if "retrieved_papers" not in intermediate_outputs:
intermediate_outputs["retrieved_papers"] = []
if "paper_summaries" not in intermediate_outputs:
intermediate_outputs["paper_summaries"] = []
if "related_work_json_list" not in intermediate_outputs:
intermediate_outputs["related_work_json_list"] = None
if "paper_results_analyzer_output" not in intermediate_outputs:
intermediate_outputs["paper_results_analyzer_output"] = None
if "paper_insight_miner_output" not in intermediate_outputs:
intermediate_outputs["paper_insight_miner_output"] = None
if "review_prompt" not in intermediate_outputs:
intermediate_outputs["review_prompt"] = None
if "review_llm_response" not in intermediate_outputs:
intermediate_outputs["review_llm_response"] = None
if "parsed_review" not in intermediate_outputs:
intermediate_outputs["parsed_review"] = None
if "refiner_prompt" not in intermediate_outputs:
intermediate_outputs["refiner_prompt"] = None
if "refiner_llm_response" not in intermediate_outputs:
intermediate_outputs["refiner_llm_response"] = None
if "parsed_refined_review" not in intermediate_outputs:
intermediate_outputs["parsed_refined_review"] = None
else:
# Create new intermediate_outputs structure
intermediate_outputs = {
"generated_keywords": None,
"retrieved_papers": [],
"paper_summaries": [],
"related_work_json_list": None,
"paper_results_analyzer_output": None,
"paper_insight_miner_output": None,
"review_prompt": None,
"review_llm_response": None,
"parsed_review": None,
"refiner_prompt": None,
"refiner_llm_response": None,
"parsed_refined_review": None,
}
self.current_run_data = {
"run_id": run_id,
"timestamp": timestamp.isoformat(),
"input": {
"title": title,
"abstract": abstract,
"content": content,
"keywords": keywords,
"publication_date_range": publication_date_range,
"venues": venues,
"review_format": review_format,
},
"intermediate_outputs": intermediate_outputs,
"final_output": existing_final_output,
"errors": existing_errors if existing_errors is not None else [],
}
return run_id
def log_keywords(self, keywords: List[str]):
"""Log generated search keywords"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["generated_keywords"] = keywords
def log_retrieved_papers(self, papers: List[Dict[str, Any]]):
"""Log retrieved papers (raw)"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
# Store paper metadata (may be large, so we store essential info)
self.current_run_data["intermediate_outputs"]["retrieved_papers"] = [
{
"paper_id": p.get("paper_id"),
"title": p.get("title"),
"authors": p.get("authors", [])[:10], # Limit authors
"year": p.get("year"),
"venue": p.get("venue"),
"abstract": p.get("abstract", "")[:500], # Truncate abstract
"citation_counts": p.get("citation_counts", 0),
}
for p in papers
]
def log_paper_summary(self, paper_title: str, summary: str, paper_index: int):
"""Log a single paper summary"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
if "paper_summaries" not in self.current_run_data["intermediate_outputs"]:
self.current_run_data["intermediate_outputs"]["paper_summaries"] = []
self.current_run_data["intermediate_outputs"]["paper_summaries"].append({
"paper_index": paper_index,
"paper_title": paper_title,
"summary": summary,
})
def log_related_work_json_list(self, related_work_json_list: List[Dict[str, Any]]):
"""Log the final related work JSON list"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["related_work_json_list"] = related_work_json_list
def log_paper_results_analyzer_output(self, results_analyzer_output: str):
"""Log the paper results analyzer JSON output"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["paper_results_analyzer_output"] = results_analyzer_output
def log_paper_insight_miner_output(self, insight_miner_output: str):
"""Log the paper insight miner JSON output"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["paper_insight_miner_output"] = insight_miner_output
def log_review_prompt(self, prompt: str, system_message: Optional[str] = None):
"""Log the review prompt sent to LLM"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["review_prompt"] = {
"system_message": system_message,
"user_prompt": prompt,
}
def log_review_llm_response(self, response: str):
"""Log the raw LLM response for review"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["review_llm_response"] = response
def log_parsed_review(self, parsed_review: Dict[str, Any]):
"""Log the parsed review dictionary"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["parsed_review"] = parsed_review
def log_refiner_prompt(self, prompt: str, system_message: Optional[str] = None):
"""Log the refiner prompt sent to LLM"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["refiner_prompt"] = {
"system_message": system_message,
"user_prompt": prompt,
}
def log_refiner_llm_response(self, response: str):
"""Log the raw LLM response for refiner"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["refiner_llm_response"] = response
def log_parsed_refined_review(self, parsed_review: Dict[str, Any]):
"""Log the parsed refined review dictionary"""
if self.enabled and self.current_run_data:
# Ensure intermediate_outputs exists
if "intermediate_outputs" not in self.current_run_data:
self.current_run_data["intermediate_outputs"] = {}
self.current_run_data["intermediate_outputs"]["parsed_refined_review"] = parsed_review
def log_final_output(self, final_output: Dict[str, Any]):
"""Log the final review output"""
if self.enabled and self.current_run_data:
self.current_run_data["final_output"] = final_output
def log_error(self, error: str, step: Optional[str] = None):
"""Log an error that occurred during execution"""
if self.enabled and self.current_run_data:
if "errors" not in self.current_run_data:
self.current_run_data["errors"] = []
self.current_run_data["errors"].append({
"step": step,
"error": error,
"timestamp": datetime.now().isoformat(),
})
def save_run(self) -> Optional[str]:
"""
Save the current run to a JSON file
Returns:
Path to saved log file, or None if logging is disabled
"""
if not self.enabled or not self.current_run_data:
return None
# Generate filename with timestamp and UUID
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"review_log_{timestamp_str}_{self.current_run_id[:8]}.json"
log_path = self.log_dir / filename
# Save to JSON
with open(log_path, 'w', encoding='utf-8') as f:
json.dump(self.current_run_data, f, indent=2, ensure_ascii=False)
return str(log_path)
def get_current_run_id(self) -> Optional[str]:
"""Get the current run ID"""
return self.current_run_id if self.enabled else None