fj198602's picture
download
raw
7.45 kB
import json
import os
import re
from pathlib import Path
from tqdm import tqdm
import yaml
from src.utils import get_llm_response
from src.prompt_generator import (
retrieve_relevant_skills_prompt,
generate_overall_procedure_prompt,
generate_overall_procedure_code_prompt
)
class SkillModule:
def __init__(self, **kwargs):
self.skills_dir = Path(kwargs.get("skills_dir", "skills"))
self.overall_procedure_examples_path = kwargs.get("overall_procedure_examples_path", "")
self.procedure_code_template_path = kwargs.get("procedure_code_template_path", None)
self.model = kwargs.get("model", "gpt-4o")
self.metadata = self._load_metadata()
# Load procedure code template and overall procedure examples
if self.procedure_code_template_path is not None and os.path.exists(self.procedure_code_template_path):
with open(self.procedure_code_template_path, "r") as f:
self.procedure_code_template = f.read()
else:
self.procedure_code_template = ''
if self.overall_procedure_examples_path is not None and os.path.exists(self.overall_procedure_examples_path):
with open(self.overall_procedure_examples_path, "r") as f:
self.overall_procedure_examples = f.read()
else:
self.overall_procedure_examples = ''
def _load_metadata(self):
"""Load existing metadata from file, return empty dict if file does not exist."""
metadata = {}
for skill_dir in self.skills_dir.iterdir():
if skill_dir.is_dir():
skill_md_path = skill_dir / "SKILL.md"
if skill_md_path.exists():
try:
content = skill_md_path.read_text(encoding="utf-8")
if content.strip().startswith('---'):
parts = content.split('---', 2)
if len(parts) >= 3:
header_str = parts[1]
header_data = yaml.safe_load(header_str)
if isinstance(header_data, dict) and header_data.get('name') and header_data.get('description'):
metadata[header_data['name']] = {
'description': header_data['description'],
'skill_dir': str(skill_dir)
}
else:
print(f"[WARNING] Invalid metadata format in {skill_dir.name}, skipping.")
else:
print(f"[WARNING] No valid metadata found in {skill_dir.name}, skipping.")
else:
print(f"[WARNING] No metadata header found in {skill_dir.name}, skipping.")
except Exception as e:
print(f"[ERROR] Failed to read or parse SKILL.md for {skill_dir.name}: {e}")
else:
print(f"[WARNING] SKILL.md not found for {skill_dir.name}, skipping.")
print(f"[INFO] Loaded metadata for {len(metadata)} skills.")
return metadata
def retrieve_relevant_skills(self, task):
"""
Retrieve relevant skills from metadata based on task description.
"""
response = get_llm_response(
retrieve_relevant_skills_prompt(self.metadata, task),
is_string=True,
model=self.model
)
relevant_skill_names = response.split("<Relevant_Skill_Names>")[1].split("</Relevant_Skill_Names>")[0].strip("`json\n").strip("`\n").strip("```\n")
relevant_skill_names = json.loads(relevant_skill_names)
return relevant_skill_names
def generate_overall_procedure(self, task, skill_names):
"""
Generate overall procedure by combining individual skill contents.
"""
# skill_contents = []
# try:
# for skill_name in skill_names:
# skill_path = Path(self.metadata[skill_name]['path'])
# with open(skill_path, 'r') as file:
# skill_content = file.read()
# skill_contents.append((skill_name, skill_content))
# except Exception as e:
# print(f"[ERROR] Failed to read skill scripts: {e}")
skill_contents = []
try:
for skill_name in skill_names:
skill_dir = Path(self.metadata[skill_name]['skill_dir'])
if not skill_dir.is_dir():
continue
# 1. Initialize combined text with skill name as header
combined_text = f"=== Skill: {skill_name} ===\n"
# 2. First read the main SKILL.md for core instructions, if it exists
main_file = skill_dir / "SKILL.md"
if main_file.exists():
combined_text += f"\n[File: SKILL.md]\n"
combined_text += main_file.read_text(encoding='utf-8') + "\n"
# 3. Then read all other files in the skill directory (excluding SKILL.md) and append their content
for file_path in skill_dir.rglob('*'):
if file_path.is_file() and file_path.name != "SKILL.md":
try:
relative_path = file_path.relative_to(skill_dir)
content = file_path.read_text(encoding='utf-8')
combined_text += f"\n[File: {relative_path}]\n"
combined_text += content + "\n"
except (UnicodeDecodeError, Exception):
continue
skill_contents.append((skill_name, combined_text))
except Exception as e:
print(f"[ERROR] Failed to compile skill data: {e}")
response = get_llm_response(
generate_overall_procedure_prompt(task, self.overall_procedure_examples, skill_contents),
is_string=True,
model=self.model
)
overall_procedure = response.split("<Overall_Procedure>")[1].split("</Overall_Procedure>")[0].strip()
return overall_procedure
def generate_overall_procedure_code(self, task, overall_procedure):
"""
Generate overall procedure code.
"""
response = get_llm_response(
generate_overall_procedure_code_prompt(task, overall_procedure, self.procedure_code_template),
is_string=True,
model=self.model
)
pattern = r"<Overall_Procedure_Code>(.*?)</Overall_Procedure_Code>"
matchs = re.findall(pattern, response, re.DOTALL)
if matchs:
raw_content = matchs[-1]
if "<Overall_Procedure_Code>" in raw_content: # handle nested tags
overall_procedure_code = raw_content.split("<Overall_Procedure_Code>")[-1]
overall_procedure_code = raw_content.strip().strip("```python").strip("```")
else:
overall_procedure_code = ""
return overall_procedure_code

Xet Storage Details

Size:
7.45 kB
·
Xet hash:
690cb7980b71613cee5dfca8b9582dad6341cb3059a94687a87fd0ef6f4a52d8

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.