| """ |
| 統一ログ機能モジュール |
| 画像生成の全パラメータと結果を包括的に記録する統一ログシステム |
| |
| 設計原則: |
| - 生成に使用された全パラメータの記録 |
| - 生成画像との確実な紐づけ |
| - JSON形式での構造化データ保存 |
| - 検索・分析しやすい形式 |
| - パフォーマンス情報の詳細記録 |
| """ |
|
|
| import json |
| import os |
| import time |
| from datetime import datetime |
| from typing import Dict, Any, Optional |
| import torch |
| from PIL import Image |
|
|
|
|
| class UnifiedLogger: |
| """統一ログ機能クラス""" |
| |
| def __init__(self, log_dir: str = "logs"): |
| """ |
| ログ機能の初期化 |
| |
| Args: |
| log_dir: ログファイルを保存するディレクトリ |
| """ |
| self.log_dir = log_dir |
| self.json_log_file = os.path.join(log_dir, "generation_history.json") |
| |
| |
| os.makedirs(log_dir, exist_ok=True) |
| |
| |
| self._load_existing_logs() |
| |
| def _load_existing_logs(self): |
| """既存のログデータを読み込み""" |
| if os.path.exists(self.json_log_file): |
| try: |
| with open(self.json_log_file, 'r', encoding='utf-8') as f: |
| self.log_data = json.load(f) |
| except (json.JSONDecodeError, FileNotFoundError): |
| self.log_data = {"metadata": self._create_metadata(), "generations": []} |
| else: |
| self.log_data = {"metadata": self._create_metadata(), "generations": []} |
| |
| def _create_metadata(self) -> Dict[str, Any]: |
| """ログファイルのメタデータを作成""" |
| return { |
| "format_version": "2.0", |
| "created_at": datetime.now().isoformat(), |
| "last_updated": datetime.now().isoformat(), |
| "description": "Unified generation history for emix-0-5 UI project", |
| "model_info": { |
| "model_name": "aipicasso/emix-0-5", |
| "model_type": "StableDiffusionXL", |
| "specialized_for": "Japanese anime / illustration style" |
| }, |
| "log_schema": { |
| "timestamp": "ISO format timestamp", |
| "generation_id": "Unique identifier for each generation", |
| "prompts": "All text prompts used", |
| "parameters": "Complete parameter set used for generation", |
| "output": "Generated image information", |
| "performance": "Execution metrics", |
| "system_info": "Hardware and software environment" |
| } |
| } |
| |
| def _get_image_info(self, filepath: str) -> Dict[str, Any]: |
| """画像ファイルの詳細情報を取得""" |
| if not os.path.exists(filepath): |
| return {"error": "File not found"} |
| |
| try: |
| |
| file_size_bytes = os.path.getsize(filepath) |
| file_size_mb = round(file_size_bytes / (1024 * 1024), 3) |
| |
| |
| with Image.open(filepath) as img: |
| width, height = img.size |
| mode = img.mode |
| format_type = img.format |
| |
| return { |
| "filepath": os.path.abspath(filepath), |
| "file_url": f"file:///{os.path.abspath(filepath).replace(os.sep, '/')}", |
| "filename": os.path.basename(filepath), |
| "file_size_bytes": file_size_bytes, |
| "file_size_mb": file_size_mb, |
| "image_width": width, |
| "image_height": height, |
| "image_mode": mode, |
| "image_format": format_type, |
| "created_at": datetime.fromtimestamp(os.path.getctime(filepath)).isoformat() |
| } |
| except Exception as e: |
| return {"error": f"Failed to get image info: {str(e)}"} |
| |
| def _get_system_info(self) -> Dict[str, Any]: |
| """システム情報を取得""" |
| system_info = { |
| "python_version": None, |
| "torch_version": None, |
| "cuda_available": False, |
| "cuda_version": None, |
| "gpu_name": None, |
| "vram_total_gb": 0, |
| "vram_allocated_gb": 0 |
| } |
| |
| try: |
| import sys |
| system_info["python_version"] = sys.version.split()[0] |
| |
| system_info["torch_version"] = torch.__version__ |
| system_info["cuda_available"] = torch.cuda.is_available() |
| |
| if torch.cuda.is_available(): |
| system_info["cuda_version"] = torch.version.cuda |
| system_info["gpu_name"] = torch.cuda.get_device_name(0) |
| system_info["vram_total_gb"] = round( |
| torch.cuda.get_device_properties(0).total_memory / (1024**3), 2 |
| ) |
| system_info["vram_allocated_gb"] = round( |
| torch.cuda.memory_allocated(0) / (1024**3), 2 |
| ) |
| except Exception as e: |
| system_info["error"] = f"Failed to get system info: {str(e)}" |
| |
| return system_info |
| |
| def log_generation( |
| self, |
| prompt: str, |
| negative_prompt: str = "", |
| parameters: Dict[str, Any] = None, |
| output_filepath: str = "", |
| execution_time: float = 0.0, |
| additional_info: Dict[str, Any] = None |
| ) -> str: |
| """ |
| 画像生成の完全なログを記録 |
| |
| Args: |
| prompt: メインプロンプト |
| negative_prompt: ネガティブプロンプト |
| parameters: 生成に使用された全パラメータ |
| output_filepath: 生成された画像ファイルパス |
| execution_time: 実行時間(秒) |
| additional_info: 追加情報 |
| |
| Returns: |
| generation_id: 生成された記録のユニークID |
| """ |
| |
| |
| timestamp = datetime.now() |
| generation_id = f"gen_{timestamp.strftime('%Y%m%d_%H%M%S')}_{int(time.time() * 1000) % 100000}" |
| |
| |
| if parameters is None: |
| parameters = {} |
| |
| |
| complete_parameters = { |
| |
| "num_inference_steps": parameters.get("num_inference_steps", 20), |
| "guidance_scale": parameters.get("guidance_scale", 7.5), |
| "width": parameters.get("width", 1024), |
| "height": parameters.get("height", 1024), |
| "seed": parameters.get("seed", None), |
| |
| |
| "scheduler_type": parameters.get("scheduler_type", "default"), |
| "eta": parameters.get("eta", 0.0), |
| |
| |
| "num_images": parameters.get("num_images", 1), |
| "batch_size": parameters.get("batch_size", 1), |
| |
| |
| "torch_dtype": str(parameters.get("torch_dtype", "float16")), |
| "enable_xformers": parameters.get("enable_xformers", False), |
| "enable_cpu_offload": parameters.get("enable_cpu_offload", False), |
| |
| |
| **{k: v for k, v in parameters.items() if k not in [ |
| "num_inference_steps", "guidance_scale", "width", "height", |
| "seed", "scheduler_type", "eta", "num_images", "batch_size", |
| "torch_dtype", "enable_xformers", "enable_cpu_offload" |
| ]} |
| } |
| |
| |
| log_entry = { |
| "generation_id": generation_id, |
| "timestamp": timestamp.isoformat(), |
| "prompts": { |
| "main_prompt": prompt, |
| "negative_prompt": negative_prompt, |
| "prompt_length": len(prompt), |
| "negative_prompt_length": len(negative_prompt) |
| }, |
| "parameters": complete_parameters, |
| "output": self._get_image_info(output_filepath) if output_filepath else {}, |
| "performance": { |
| "execution_time_seconds": round(execution_time, 3), |
| "estimated_speed_sec_per_step": round( |
| execution_time / max(complete_parameters.get("num_inference_steps", 1), 1), 3 |
| ) if execution_time > 0 else 0 |
| }, |
| "system_info": self._get_system_info(), |
| "additional_info": additional_info or {} |
| } |
| |
| |
| self.log_data["generations"].append(log_entry) |
| self.log_data["metadata"]["last_updated"] = timestamp.isoformat() |
| |
| |
| self._save_logs() |
| |
| return generation_id |
| |
| def _save_logs(self): |
| """ログをファイルに保存""" |
| try: |
| with open(self.json_log_file, 'w', encoding='utf-8') as f: |
| json.dump(self.log_data, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| print(f"ログ保存エラー: {e}") |
| |
| def get_generation_by_id(self, generation_id: str) -> Optional[Dict[str, Any]]: |
| """generation_idで特定の生成記録を取得""" |
| for generation in self.log_data["generations"]: |
| if generation["generation_id"] == generation_id: |
| return generation |
| return None |
| |
| def get_recent_generations(self, count: int = 10) -> list: |
| """最近の生成記録を取得""" |
| return self.log_data["generations"][-count:] if self.log_data["generations"] else [] |
| |
| def search_by_prompt(self, search_term: str, case_sensitive: bool = False) -> list: |
| """プロンプトで検索""" |
| results = [] |
| search_term = search_term if case_sensitive else search_term.lower() |
| |
| for generation in self.log_data["generations"]: |
| main_prompt = generation["prompts"]["main_prompt"] |
| if not case_sensitive: |
| main_prompt = main_prompt.lower() |
| |
| if search_term in main_prompt: |
| results.append(generation) |
| |
| return results |
| |
| def get_statistics(self) -> Dict[str, Any]: |
| """生成統計を取得""" |
| generations = self.log_data["generations"] |
| |
| if not generations: |
| return {"total_generations": 0} |
| |
| total_time = sum(g["performance"]["execution_time_seconds"] for g in generations) |
| avg_time = total_time / len(generations) |
| |
| schedulers = {} |
| for g in generations: |
| scheduler = g["parameters"].get("scheduler_type", "unknown") |
| schedulers[scheduler] = schedulers.get(scheduler, 0) + 1 |
| |
| return { |
| "total_generations": len(generations), |
| "total_execution_time_hours": round(total_time / 3600, 2), |
| "average_execution_time_seconds": round(avg_time, 2), |
| "scheduler_usage": schedulers, |
| "date_range": { |
| "first": generations[0]["timestamp"], |
| "last": generations[-1]["timestamp"] |
| } |
| } |
| |
| def cleanup_old_logs(self, keep_days: int = 30): |
| """古いログエントリを削除""" |
| cutoff_date = datetime.now().timestamp() - (keep_days * 24 * 3600) |
| |
| original_count = len(self.log_data["generations"]) |
| self.log_data["generations"] = [ |
| g for g in self.log_data["generations"] |
| if datetime.fromisoformat(g["timestamp"]).timestamp() > cutoff_date |
| ] |
| |
| removed_count = original_count - len(self.log_data["generations"]) |
| |
| if removed_count > 0: |
| self._save_logs() |
| print(f"古いログエントリ {removed_count} 件を削除しました") |
| |
| return removed_count |
|
|
|
|
| |
| _global_logger = None |
|
|
| def get_logger(log_dir: str = "logs") -> UnifiedLogger: |
| """グローバルロガーインスタンスを取得""" |
| global _global_logger |
| if _global_logger is None: |
| _global_logger = UnifiedLogger(log_dir) |
| return _global_logger |
|
|
| def log_generation(**kwargs) -> str: |
| """グローバルロガーを使用して生成をログ""" |
| logger = get_logger() |
| return logger.log_generation(**kwargs) |
|
|
| def get_statistics() -> Dict[str, Any]: |
| """生成統計を取得""" |
| logger = get_logger() |
| return logger.get_statistics() |