"""L2 Generator module for handling L2 level data processing and model operations. This module provides the L2Generator class which is responsible for data preprocessing, subjective data generation, model conversion, and inference with the trained model. """ from typing import Dict, List import os from openai import OpenAI from lpm_kernel.L1.bio import Note from lpm_kernel.L2.data import L2DataProcessor import yaml import logging from lpm_kernel.L2.data_pipeline.data_prep.preference.preference_QA_generate import PreferenceQAGenerator from lpm_kernel.L2.data_pipeline.data_prep.diversity.diversity_data_generator import DiversityDataGenerator from lpm_kernel.L2.data_pipeline.data_prep.selfqa.selfqa_generator import SelfQA import json class L2Generator: """L2 level generator for handling data and model operations. This class manages operations related to L2 processing, including data preprocessing, subjective data generation, model conversion, and model inference. Attributes: data_path: Path to the raw data directory. data_processor: Instance of L2DataProcessor for handling data processing. """ def __init__(self, data_path: str = "../raw_data", preferred_lang: str = "English", is_cot: bool = True): """Initialize the L2Generator with data path and preferred language. Args: data_path: Path to the raw data directory. Defaults to "../raw_data". preferred_lang: Preferred language for data processing. Defaults to "English". is_cot: Whether to use Chain of Thought reasoning. Can be bool or string. """ self.data_path = data_path self.data_processor = L2DataProcessor(data_path, preferred_lang) self.preferred_lang = preferred_lang # Convert is_cot to bool if it's a string if isinstance(is_cot, str): self.is_cot = is_cot.lower() == 'true' else: self.is_cot = bool(is_cot) logging.info(f"L2Generator initialized with is_cot={self.is_cot}") def data_preprocess(self, note_list: List[Note], basic_info: Dict): """Preprocess the input notes and basic information. Args: note_list: List of Note objects to process. basic_info: Dictionary containing basic user information. """ self.data_processor(note_list, basic_info) def gen_subjective_data( self, note_list: List[Note], basic_info: Dict, data_output_base_dir: str, topics_path: str, entities_path: str, graph_path: str, config_path: str, ): """Generate subjective data for personalization. This method orchestrates the generation of subjective data including preferences, diversity, self-Q&A data, and graph indexing. Args: note_list: List of Note objects to process. basic_info: Dictionary containing user information. data_output_base_dir: Base directory for output data. topics_path: Path to topics data. entities_path: Path to entity data. graph_path: Path to graph data. config_path: Path to configuration file. """ if not os.path.exists(data_output_base_dir): os.makedirs(data_output_base_dir) # Check if the file exists if not os.path.exists(topics_path): # Create an empty file with open(topics_path, "w") as f: f.write(json.dumps([])) # Generate subjective data self.data_processor.gen_subjective_data( note_list=note_list, data_output_base_dir=data_output_base_dir, preference_output_path="preference.json", diversity_output_path="diversity.json", selfqa_output_path="selfqa.json", global_bio=basic_info["globalBio"], topics_path=topics_path, entitys_path=entities_path, graph_path=graph_path, user_name=basic_info["username"], config_path=config_path, user_intro=basic_info["aboutMe"], ) # Merge JSON files for training self.merge_json_files(data_output_base_dir) # Release Ollama models from memory after data synthesis is complete self._release_ollama_models() def gen_preference_data( self, note_list: List[Note], basic_info: Dict, data_output_base_dir: str, topics_path: str, entities_path: str, graph_path: str, config_path: str, ): global_bio = basic_info["globalBio"] preference_output_path = os.path.join(data_output_base_dir, "preference.json") processor = PreferenceQAGenerator( filename=topics_path, bio=global_bio, preference_language=self.preferred_lang, is_cot=self.is_cot ) processor.process_clusters(preference_output_path) def gen_diversity_data( self, note_list: List[Note], basic_info: Dict, data_output_base_dir: str, topics_path: str, entities_path: str, graph_path: str, config_path: str ): global_bio = basic_info["globalBio"] user_name = basic_info["username"] output_path = os.path.join(data_output_base_dir, "diversity.json") processor = DiversityDataGenerator(self.preferred_lang, is_cot=self.is_cot) processor.generate_data( entities_path, note_list, config_path, graph_path, user_name, global_bio, output_path ) def gen_selfqa_data( self, note_list: List[Note], basic_info: Dict, data_output_base_dir: str, topics_path: str, entities_path: str, graph_path: str, config_path: str ): global_bio = basic_info["globalBio"] user_name = basic_info["username"] user_intro = basic_info["aboutMe"] output_path = os.path.join(data_output_base_dir, "selfqa.json") selfqa = SelfQA( user_name=user_name, user_input_introduction=user_intro, user_global_bio= global_bio, preferred_language=self.preferred_lang, is_cot=self.is_cot ) q_a_list = selfqa.generate_qa() with open(output_path, "w", encoding="utf-8") as f: json.dump(q_a_list, f, ensure_ascii=False, indent=4) def merge_json_files(self, data_output_base_dir: str): preference_output_path = os.path.join(data_output_base_dir, "preference.json") diversity_output_path = os.path.join(data_output_base_dir, "diversity.json") selfqa_output_path = os.path.join(data_output_base_dir, "selfqa.json") json_files_to_merge = [ preference_output_path, diversity_output_path, selfqa_output_path, ] merged_data = [] for file_path in json_files_to_merge: if file_path and os.path.exists(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: file_data = json.load(f) if isinstance(file_data, list): merged_data.extend(file_data) else: merged_data.append(file_data) except Exception as e: logging.error(f"Error merging file {file_path}: {str(e)}") # Save the merged data merged_output_path = os.path.join(data_output_base_dir, "merged.json") with open(merged_output_path, 'w', encoding='utf-8') as f: json.dump(merged_data, f, ensure_ascii=False, indent=2) def _release_ollama_models(self): """Release Ollama models from memory to free up VRAM for training. This method calls the release function defined in the train module. It's important to release models after data synthesis and before training to ensure VRAM is properly freed. """ try: from lpm_kernel.L2.train import release_ollama_models release_ollama_models() except Exception as e: import logging logging = logging.getLogger(__name__) logging.warning(f"Failed to release Ollama models: {str(e)}") def clean_graphrag_keys(self): GRAPH_CONFIG = os.path.join( os.getcwd(), "lpm_kernel/L2/data_pipeline/graphrag_indexing/settings.yaml" ) with open(GRAPH_CONFIG, "r", encoding="utf-8") as file: settings = yaml.safe_load(file) settings["input"]["base_dir"] = "/your_dir" settings["output"]["base_dir"] = "/your_dir" settings["reporting"]["base_dir"] = "/your_dir" settings["models"]["default_chat_model"]["api_key"] = "sk-xxxxxx" ENV_CONFIG = os.path.join( os.getcwd(), "lpm_kernel/L2/data_pipeline/graphrag_indexing/.env" ) with open(ENV_CONFIG, "w", encoding="utf-8") as file: file.write("GRAPHRAG_API_KEY=sk-xxxxxx") with open(GRAPH_CONFIG, "w", encoding="utf-8") as file: yaml.dump(settings, file, default_flow_style=False, allow_unicode=True) logging.info("Graphrag config updated successfully")