|
|
""" |
|
|
Log Parser for Extracting Candidates and Feedback |
|
|
|
|
|
Parses optimization logs to extract candidate prompts, feedback, and scores. |
|
|
""" |
|
|
|
|
|
import re |
|
|
from typing import List, Dict, Optional, Tuple |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
class OptimizationLogParser: |
|
|
"""Parse optimization logs to extract candidates and feedback""" |
|
|
|
|
|
def __init__(self, log_file: str): |
|
|
""" |
|
|
Initialize parser with log file path. |
|
|
|
|
|
Args: |
|
|
log_file: Path to log file |
|
|
""" |
|
|
self.log_file = Path(log_file) |
|
|
self.content = "" |
|
|
if self.log_file.exists(): |
|
|
with open(self.log_file, 'r', encoding='utf-8') as f: |
|
|
self.content = f.read() |
|
|
|
|
|
def extract_iterations(self) -> List[Dict]: |
|
|
"""Extract iteration information from logs""" |
|
|
iterations = [] |
|
|
|
|
|
|
|
|
iteration_pattern = r'Iteration\s+(\d+)|Starting GEPA optimization|π Starting GEPA optimization' |
|
|
|
|
|
|
|
|
for match in re.finditer(iteration_pattern, self.content): |
|
|
|
|
|
iter_num = 1 |
|
|
if match.group(1): |
|
|
iter_num = int(match.group(1)) |
|
|
|
|
|
|
|
|
start_pos = match.start() |
|
|
next_match = list(re.finditer(iteration_pattern, self.content)) |
|
|
next_idx = next((i for i, m in enumerate(next_match) if m.start() > start_pos), None) |
|
|
|
|
|
if next_idx is not None: |
|
|
end_pos = next_match[next_idx].start() |
|
|
iter_content = self.content[start_pos:end_pos] |
|
|
else: |
|
|
iter_content = self.content[start_pos:] |
|
|
|
|
|
iterations.append({ |
|
|
'iteration': iter_num, |
|
|
'content': iter_content, |
|
|
'start_pos': start_pos |
|
|
}) |
|
|
|
|
|
return iterations |
|
|
|
|
|
def extract_candidates(self, iteration_content: str) -> List[Dict]: |
|
|
""" |
|
|
Extract candidate prompts from iteration content. |
|
|
|
|
|
Args: |
|
|
iteration_content: Content for a single iteration |
|
|
|
|
|
Returns: |
|
|
List of candidate dictionaries |
|
|
""" |
|
|
candidates = [] |
|
|
|
|
|
|
|
|
|
|
|
gepa_patterns = [ |
|
|
r'π PROPOSED PROMPT.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
r'PROPOSED PROMPT.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
r'GEPA REFLECTION.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
] |
|
|
|
|
|
for pattern in gepa_patterns: |
|
|
for match in re.finditer(pattern, iteration_content, re.DOTALL): |
|
|
prompt = match.group(1).strip() |
|
|
if prompt and len(prompt) > 20: |
|
|
candidates.append({ |
|
|
'source': 'GEPA_Reflection', |
|
|
'prompt': prompt, |
|
|
'position': match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
crossover_patterns = [ |
|
|
r'𧬠Crossover.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
r'Crossover.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
] |
|
|
|
|
|
for pattern in crossover_patterns: |
|
|
for match in re.finditer(pattern, iteration_content, re.DOTALL): |
|
|
prompt = match.group(1).strip() |
|
|
if prompt and len(prompt) > 20: |
|
|
candidates.append({ |
|
|
'source': 'LLEGO_Crossover', |
|
|
'prompt': prompt, |
|
|
'position': match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
mutation_patterns = [ |
|
|
r'π² Mutation.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
r'Mutation.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
] |
|
|
|
|
|
for pattern in mutation_patterns: |
|
|
for match in re.finditer(pattern, iteration_content, re.DOTALL): |
|
|
prompt = match.group(1).strip() |
|
|
if prompt and len(prompt) > 20: |
|
|
candidates.append({ |
|
|
'source': 'LLEGO_Mutation', |
|
|
'prompt': prompt, |
|
|
'position': match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
generic_patterns = [ |
|
|
r'"([^"]{50,})"', |
|
|
r'```\s*(.*?)\s*```', |
|
|
] |
|
|
|
|
|
for pattern in generic_patterns: |
|
|
for match in re.finditer(pattern, iteration_content, re.DOTALL): |
|
|
prompt = match.group(1).strip() |
|
|
|
|
|
if (len(prompt) > 50 and |
|
|
any(keyword in prompt.lower() for keyword in |
|
|
['you are', 'task', 'instruction', 'element', 'identify', 'select'])): |
|
|
|
|
|
if not any(c['prompt'] == prompt for c in candidates): |
|
|
candidates.append({ |
|
|
'source': 'Unknown', |
|
|
'prompt': prompt, |
|
|
'position': match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
candidates.sort(key=lambda x: x['position']) |
|
|
|
|
|
return candidates |
|
|
|
|
|
def extract_feedback(self, iteration_content: str) -> List[Dict]: |
|
|
""" |
|
|
Extract feedback from iteration content. |
|
|
|
|
|
Args: |
|
|
iteration_content: Content for a single iteration |
|
|
|
|
|
Returns: |
|
|
List of feedback dictionaries |
|
|
""" |
|
|
feedback_list = [] |
|
|
|
|
|
|
|
|
feedback_patterns = [ |
|
|
r'π¬ FEEDBACK:\s*(.*?)(?=\n\n|\nπ|\nπ|\nπ‘|$)', |
|
|
r'FEEDBACK:\s*(.*?)(?=\n\n|\nπ|\nπ|\nπ‘|$)', |
|
|
r'Feedback:\s*(.*?)(?=\n\n|\nπ|\nπ|\nπ‘|$)', |
|
|
] |
|
|
|
|
|
for pattern in feedback_patterns: |
|
|
for match in re.finditer(pattern, iteration_content, re.DOTALL): |
|
|
feedback_text = match.group(1).strip() |
|
|
if feedback_text and len(feedback_text) > 10: |
|
|
feedback_list.append({ |
|
|
'feedback': feedback_text, |
|
|
'position': match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
judge_patterns = [ |
|
|
r'LLM-as-Judge.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
r'Judge Feedback.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', |
|
|
] |
|
|
|
|
|
for pattern in judge_patterns: |
|
|
for match in re.finditer(pattern, iteration_content, re.DOTALL): |
|
|
feedback_text = match.group(1).strip() |
|
|
if feedback_text and len(feedback_text) > 10: |
|
|
feedback_list.append({ |
|
|
'feedback': feedback_text, |
|
|
'position': match.start(), |
|
|
'source': 'LLM-as-Judge' |
|
|
}) |
|
|
|
|
|
|
|
|
feedback_list.sort(key=lambda x: x['position']) |
|
|
|
|
|
return feedback_list |
|
|
|
|
|
def extract_scores(self, iteration_content: str) -> List[Dict]: |
|
|
""" |
|
|
Extract scores from iteration content. |
|
|
|
|
|
Args: |
|
|
iteration_content: Content for a single iteration |
|
|
|
|
|
Returns: |
|
|
List of score dictionaries |
|
|
""" |
|
|
scores = [] |
|
|
|
|
|
|
|
|
score_patterns = [ |
|
|
r'Score:\s*([\d.]+)', |
|
|
r'Average score:\s*([\d.]+)', |
|
|
r'π― SCORE:\s*([\d.]+)', |
|
|
r'π Score:\s*([\d.]+)', |
|
|
] |
|
|
|
|
|
for pattern in score_patterns: |
|
|
for match in re.finditer(pattern, iteration_content): |
|
|
score_value = float(match.group(1)) |
|
|
scores.append({ |
|
|
'score': score_value, |
|
|
'position': match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
scores.sort(key=lambda x: x['position']) |
|
|
|
|
|
return scores |
|
|
|
|
|
def parse_all(self) -> Dict: |
|
|
""" |
|
|
Parse entire log file and extract all information. |
|
|
|
|
|
Returns: |
|
|
Dictionary with all extracted information |
|
|
""" |
|
|
iterations = self.extract_iterations() |
|
|
|
|
|
result = { |
|
|
'iterations': [], |
|
|
'total_iterations': len(iterations), |
|
|
'all_candidates': [], |
|
|
'all_feedback': [] |
|
|
} |
|
|
|
|
|
for iter_info in iterations: |
|
|
iter_num = iter_info['iteration'] |
|
|
iter_content = iter_info['content'] |
|
|
|
|
|
candidates = self.extract_candidates(iter_content) |
|
|
feedback = self.extract_feedback(iter_content) |
|
|
scores = self.extract_scores(iter_content) |
|
|
|
|
|
|
|
|
for i, candidate in enumerate(candidates): |
|
|
|
|
|
candidate_pos = candidate['position'] |
|
|
nearest_score = None |
|
|
min_distance = float('inf') |
|
|
|
|
|
for score_info in scores: |
|
|
if score_info['position'] > candidate_pos: |
|
|
distance = score_info['position'] - candidate_pos |
|
|
if distance < min_distance: |
|
|
min_distance = distance |
|
|
nearest_score = score_info['score'] |
|
|
|
|
|
if nearest_score is not None: |
|
|
candidate['score'] = nearest_score |
|
|
|
|
|
|
|
|
nearest_feedback = None |
|
|
min_distance = float('inf') |
|
|
|
|
|
for feedback_info in feedback: |
|
|
if feedback_info['position'] > candidate_pos: |
|
|
distance = feedback_info['position'] - candidate_pos |
|
|
if distance < min_distance and distance < 5000: |
|
|
min_distance = distance |
|
|
nearest_feedback = feedback_info['feedback'] |
|
|
|
|
|
if nearest_feedback: |
|
|
candidate['feedback'] = nearest_feedback |
|
|
|
|
|
result['iterations'].append({ |
|
|
'iteration': iter_num, |
|
|
'candidates': candidates, |
|
|
'feedback': feedback, |
|
|
'scores': scores |
|
|
}) |
|
|
|
|
|
result['all_candidates'].extend(candidates) |
|
|
result['all_feedback'].extend(feedback) |
|
|
|
|
|
return result |
|
|
|
|
|
|