Ilia Tambovtsev commited on
Commit
2c56f66
·
1 Parent(s): 531f639

feat: setup mlflow evaluation

Browse files
src/config/navigator.py CHANGED
@@ -1,10 +1,11 @@
1
- from pathlib import Path
2
  from dataclasses import dataclass
 
3
  from typing import List, Optional, Union
4
- import logging
5
 
6
  logger = logging.getLogger(__name__)
7
 
 
8
  @dataclass
9
  class Navigator:
10
  """Project paths manager"""
@@ -22,7 +23,10 @@ class Navigator:
22
  self.raw = self.data / "raw"
23
  self.interim = self.data / "interim"
24
  self.processed = self.data / "processed"
 
25
  self.eval = self.processed / "eval"
 
 
26
 
27
  # src paths
28
  self.src = self.root / "src"
@@ -44,12 +48,12 @@ class Navigator:
44
  return self.processed / filename
45
 
46
  def find_file_by_substr(
47
- self,
48
- substr: str,
49
- extension: Optional[str] = None,
50
- base_dir: Optional[Path] = None,
51
- return_first: bool = True
52
- ) -> Optional[Union[List[Path], Path]] :
53
  """
54
  Find file by substring.
55
 
@@ -64,10 +68,10 @@ class Navigator:
64
  if extension is None:
65
  extension = ""
66
 
67
- search_pattern = fr"*{substr}*"
68
 
69
  if base_dir is None:
70
- base_dir = self.data
71
 
72
  # find results matching pattern
73
  results = base_dir.rglob(search_pattern)
@@ -77,11 +81,12 @@ class Navigator:
77
 
78
  # sort by length so that the shortest is the first
79
  # thus we avoid picking modified file
80
- results = list(sorted(
81
- results,
82
- key=lambda path: len(path.name),
83
- ))
84
-
 
85
 
86
  if extension is not None:
87
  results = [path for path in results if path.name.endswith(extension)]
 
1
+ import logging
2
  from dataclasses import dataclass
3
+ from pathlib import Path
4
  from typing import List, Optional, Union
 
5
 
6
  logger = logging.getLogger(__name__)
7
 
8
+
9
  @dataclass
10
  class Navigator:
11
  """Project paths manager"""
 
23
  self.raw = self.data / "raw"
24
  self.interim = self.data / "interim"
25
  self.processed = self.data / "processed"
26
+
27
  self.eval = self.processed / "eval"
28
+ self.eval_artifacts = self.eval / "artifacts"
29
+ self.eval_runs = self.eval / "runs"
30
 
31
  # src paths
32
  self.src = self.root / "src"
 
48
  return self.processed / filename
49
 
50
  def find_file_by_substr(
51
+ self,
52
+ substr: str,
53
+ extension: Optional[str] = None,
54
+ base_dir: Optional[Path] = None,
55
+ return_first: bool = True,
56
+ ) -> Optional[Union[List[Path], Path]]:
57
  """
58
  Find file by substring.
59
 
 
68
  if extension is None:
69
  extension = ""
70
 
71
+ search_pattern = rf"*{substr}*"
72
 
73
  if base_dir is None:
74
+ base_dir = self.data
75
 
76
  # find results matching pattern
77
  results = base_dir.rglob(search_pattern)
 
81
 
82
  # sort by length so that the shortest is the first
83
  # thus we avoid picking modified file
84
+ results = list(
85
+ sorted(
86
+ results,
87
+ key=lambda path: len(path.name),
88
+ )
89
+ )
90
 
91
  if extension is not None:
92
  results = [path for path in results if path.name.endswith(extension)]
src/config/spreadsheets.py CHANGED
@@ -5,7 +5,7 @@ import pandas as pd
5
  from dotenv import load_dotenv
6
 
7
 
8
- def load_spreadsheet(sheet_id: Optional[str] = None) -> pd.DataFrame:
9
  if sheet_id is None:
10
  load_dotenv()
11
  sheet_id = os.environ.get("BENCHMARK_SPREADSHEET_ID")
@@ -13,5 +13,7 @@ def load_spreadsheet(sheet_id: Optional[str] = None) -> pd.DataFrame:
13
  csv_load_url = (
14
  f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv"
15
  )
 
 
16
  df = pd.read_csv(csv_load_url)
17
  return df
 
5
  from dotenv import load_dotenv
6
 
7
 
8
+ def load_spreadsheet(sheet_id: Optional[str] = None, gid: Optional[str] = None) -> pd.DataFrame:
9
  if sheet_id is None:
10
  load_dotenv()
11
  sheet_id = os.environ.get("BENCHMARK_SPREADSHEET_ID")
 
13
  csv_load_url = (
14
  f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv"
15
  )
16
+ if gid is not None:
17
+ csv_load_url = f"{csv_load_url}&gid={gid}"
18
  df = pd.read_csv(csv_load_url)
19
  return df
src/eval/eval_mlflow.py CHANGED
@@ -1,10 +1,10 @@
1
  import os
2
- from tempfile import NamedTemporaryFile
3
  import time
4
  from pathlib import Path
 
5
  from typing import Dict, List, Optional, Union
6
 
7
- import mlflow
8
  import pandas as pd
9
  from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
10
  from langchain_core.prompts import PromptTemplate
@@ -12,19 +12,15 @@ from langchain_openai import ChatOpenAI
12
  from pydantic import BaseModel, ConfigDict, Field
13
 
14
  from src.config import Config, load_spreadsheet
15
- from src.rag import (
16
- ChromaSlideStore,
17
- HyperbolicScorer,
18
- MinScorer,
19
- PresentationRetriever,
20
- ScorerTypes,
21
- )
22
 
23
 
24
  class RetrievalMetrics:
25
  """Metrics calculators for retrieval evaluation"""
26
 
27
  @staticmethod
 
28
  def presentation_match(run_output: Dict, ground_truth: Dict) -> float:
29
  """Check if top-1 retrieved presentation matches ground truth"""
30
  best_pres_info = run_output["contexts"][0]
@@ -130,19 +126,25 @@ Format output as JSON:
130
  | JsonOutputParser(pydantic_object=self.RelevanceOutput)
131
  )
132
 
133
- def evaluate(self, run_output: Dict, ground_truth: Dict) -> Dict[str, Union[float, str]]:
 
 
134
  """Evaluate relevance of retrieved content"""
135
  time.sleep(1.05) # Rate limiting
136
  question = ground_truth["question"]
137
  pres = run_output["contexts"][0]
138
 
139
- contexts_used = pres["contexts"] if self.n_contexts <= 0 else pres["contexts"][:self.n_contexts]
 
 
 
 
140
  pres_context = "\n\n---\n\n".join(contexts_used)
141
 
142
  llm_out = self.chain.invoke(dict(query=question, context=pres_context))
143
  return {
144
  "llm_relevance_score": float(llm_out["relevance_score"]),
145
- "llm_relevance_explanation": llm_out["explanation"]
146
  }
147
 
148
 
@@ -150,7 +152,8 @@ class EvaluationConfig(BaseModel):
150
  """Configuration for RAG evaluation"""
151
 
152
  experiment_name: str = "RAG_test"
153
- tracking_uri: str = "sqlite:///data/processed/eval/mlruns.db"
 
154
 
155
  scorers: List[ScorerTypes] = [MinScorer(), HyperbolicScorer()]
156
  n_contexts: int = 2
@@ -166,7 +169,7 @@ class RAGEvaluator:
166
  self,
167
  storage: ChromaSlideStore,
168
  config: EvaluationConfig,
169
- llm: Optional[ChatOpenAI] = None
170
  ):
171
  self.storage = storage
172
  self.config = config
@@ -174,31 +177,27 @@ class RAGEvaluator:
174
 
175
  # Setup MLFlow
176
  mlflow.set_tracking_uri(config.tracking_uri)
 
177
 
178
  # Initialize metrics calculators
179
  self.metrics = {
180
- name: getattr(RetrievalMetrics, name)
181
- for name in self.config.metrics
182
  }
183
 
184
  if llm:
185
  self.llm_evaluator = LLMRelevanceEvaluator(
186
- llm=self.llm,
187
- n_contexts=self.config.n_contexts
188
  )
189
 
190
  @staticmethod
191
- def load_questions_from_sheet(sheet_id: str) -> pd.DataFrame:
192
  """Load evaluation questions from spreadsheet"""
193
- df = load_spreadsheet(sheet_id)
194
  df.fillna(dict(page=""), inplace=True)
195
  return df
196
 
197
  def evaluate_single(
198
- self,
199
- retriever: PresentationRetriever,
200
- question: str,
201
- ground_truth: Dict
202
  ) -> Dict:
203
  """Evaluate single query"""
204
  # Run retrieval
@@ -218,7 +217,18 @@ class RAGEvaluator:
218
 
219
  def run_evaluation(self, questions_df: pd.DataFrame) -> None:
220
  """Run evaluation for all configured scorers"""
221
- mlflow.set_experiment(self.config.experiment_name)
 
 
 
 
 
 
 
 
 
 
 
222
 
223
  for scorer in self.config.scorers:
224
  with mlflow.start_run(run_name=f"scorer_{scorer.id}"):
@@ -229,7 +239,7 @@ class RAGEvaluator:
229
  retriever = PresentationRetriever(
230
  storage=self.storage,
231
  scorer=scorer,
232
- n_contexts=self.config.n_contexts
233
  )
234
 
235
  # Run evaluation for each question
@@ -242,14 +252,14 @@ class RAGEvaluator:
242
  ground_truth = {
243
  "question": row["question"],
244
  "pres_name": row["pres_name"],
245
- "pages": [int(x) if x else -1 for x in row["page"].split(",")]
246
  }
247
 
248
  output = retriever(dict(question=row["question"]))
249
  results = self.evaluate_single(
250
  retriever=retriever,
251
  question=row["question"],
252
- ground_truth=ground_truth
253
  )
254
 
255
  for name, value in results.items():
@@ -265,29 +275,31 @@ class RAGEvaluator:
265
  p["pres_name"] for p in output["contexts"]
266
  ],
267
  "retrieved_pages": [
268
- ",".join(map(str, p["pages"]))
269
- for p in output["contexts"]
270
  ],
271
  **{
272
  f"metric_{name}": value
273
  for name, value in results.items()
274
  if isinstance(value, (int, float))
275
- }
276
  }
277
 
278
  # Add LLM explanation if available
279
  if "llm_relevance_explanation" in results:
280
- result_row["llm_explanation"] = results["llm_relevance_explanation"]
 
 
281
 
282
  results_log.append(result_row)
283
 
284
  # Save metrics results
285
  results_df = pd.DataFrame(results_log)
286
 
287
- # Save whith file
 
288
  with NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
289
  results_df.to_csv(f.name, index=False)
290
- fpath = str(Config().navigator.eval / "detailed_results.csv")
291
  mlflow.log_artifact(f.name, fpath)
292
 
293
  # Log average metrics
@@ -296,8 +308,6 @@ class RAGEvaluator:
296
  mlflow.log_metric(f"mean_{name}", sum(values) / len(values))
297
 
298
 
299
-
300
-
301
  def main():
302
  from dotenv import load_dotenv
303
 
@@ -312,25 +322,29 @@ def main():
312
  llm = project_config.model_config.load_vsegpt(model="openai/gpt-4o-mini")
313
  embeddings = project_config.embedding_config.load_vsegpt()
314
 
315
- storage = ChromaSlideStore(collection_name="pres0", embedding_model=embeddings)
316
 
 
 
317
  eval_config = EvaluationConfig(
318
- experiment_name="PresRetrieve_mlflow",
319
  metrics=["presentation_match", "page_match"],
320
  scorers=[MinScorer(), HyperbolicScorer()],
 
 
321
  )
322
 
323
  evaluator = RAGEvaluator(
324
  storage=storage,
325
  config=eval_config,
326
- llm=llm
327
  )
328
 
329
  # Load questions
330
  sheet_id = os.environ["BENCHMARK_SPREADSHEET_ID"]
331
  questions_df = evaluator.load_questions_from_sheet(sheet_id)
332
 
333
- questions_df.sample(5)
334
 
335
  # Run evaluation
336
  evaluator.run_evaluation(questions_df)
 
1
  import os
 
2
  import time
3
  from pathlib import Path
4
+ from tempfile import NamedTemporaryFile
5
  from typing import Dict, List, Optional, Union
6
 
7
+ import mlflow, mlflow.config
8
  import pandas as pd
9
  from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
10
  from langchain_core.prompts import PromptTemplate
 
12
  from pydantic import BaseModel, ConfigDict, Field
13
 
14
  from src.config import Config, load_spreadsheet
15
+ from src.rag import (ChromaSlideStore, HyperbolicScorer, MinScorer,
16
+ PresentationRetriever, ScorerTypes)
 
 
 
 
 
17
 
18
 
19
  class RetrievalMetrics:
20
  """Metrics calculators for retrieval evaluation"""
21
 
22
  @staticmethod
23
+ @mlflow.trace
24
  def presentation_match(run_output: Dict, ground_truth: Dict) -> float:
25
  """Check if top-1 retrieved presentation matches ground truth"""
26
  best_pres_info = run_output["contexts"][0]
 
126
  | JsonOutputParser(pydantic_object=self.RelevanceOutput)
127
  )
128
 
129
+ def evaluate(
130
+ self, run_output: Dict, ground_truth: Dict
131
+ ) -> Dict[str, Union[float, str]]:
132
  """Evaluate relevance of retrieved content"""
133
  time.sleep(1.05) # Rate limiting
134
  question = ground_truth["question"]
135
  pres = run_output["contexts"][0]
136
 
137
+ contexts_used = (
138
+ pres["contexts"]
139
+ if self.n_contexts <= 0
140
+ else pres["contexts"][: self.n_contexts]
141
+ )
142
  pres_context = "\n\n---\n\n".join(contexts_used)
143
 
144
  llm_out = self.chain.invoke(dict(query=question, context=pres_context))
145
  return {
146
  "llm_relevance_score": float(llm_out["relevance_score"]),
147
+ "llm_relevance_explanation": llm_out["explanation"],
148
  }
149
 
150
 
 
152
  """Configuration for RAG evaluation"""
153
 
154
  experiment_name: str = "RAG_test"
155
+ tracking_uri: str = f"sqlite:///{Config().navigator.eval_runs / 'mlruns.db'}"
156
+ artifacts_uri: str = f"file:////{Config().navigator.eval_artifacts}"
157
 
158
  scorers: List[ScorerTypes] = [MinScorer(), HyperbolicScorer()]
159
  n_contexts: int = 2
 
169
  self,
170
  storage: ChromaSlideStore,
171
  config: EvaluationConfig,
172
+ llm: Optional[ChatOpenAI] = None,
173
  ):
174
  self.storage = storage
175
  self.config = config
 
177
 
178
  # Setup MLFlow
179
  mlflow.set_tracking_uri(config.tracking_uri)
180
+ mlflow.config.enable_async_logging(True)
181
 
182
  # Initialize metrics calculators
183
  self.metrics = {
184
+ name: getattr(RetrievalMetrics, name) for name in self.config.metrics
 
185
  }
186
 
187
  if llm:
188
  self.llm_evaluator = LLMRelevanceEvaluator(
189
+ llm=self.llm, n_contexts=self.config.n_contexts
 
190
  )
191
 
192
  @staticmethod
193
+ def load_questions_from_sheet(*args, **kwargs) -> pd.DataFrame:
194
  """Load evaluation questions from spreadsheet"""
195
+ df = load_spreadsheet(*args, **kwargs)
196
  df.fillna(dict(page=""), inplace=True)
197
  return df
198
 
199
  def evaluate_single(
200
+ self, retriever: PresentationRetriever, question: str, ground_truth: Dict
 
 
 
201
  ) -> Dict:
202
  """Evaluate single query"""
203
  # Run retrieval
 
217
 
218
  def run_evaluation(self, questions_df: pd.DataFrame) -> None:
219
  """Run evaluation for all configured scorers"""
220
+
221
+ # Load the existing experiment or create a new one
222
+ experiment = mlflow.get_experiment_by_name(self.config.experiment_name)
223
+ if experiment is not None:
224
+ experiment_id = experiment.experiment_id
225
+ else:
226
+ experiment_id = mlflow.create_experiment(
227
+ self.config.experiment_name,
228
+ artifact_location=self.config.artifacts_uri,
229
+ )
230
+
231
+ mlflow.set_experiment(experiment_id=experiment_id)
232
 
233
  for scorer in self.config.scorers:
234
  with mlflow.start_run(run_name=f"scorer_{scorer.id}"):
 
239
  retriever = PresentationRetriever(
240
  storage=self.storage,
241
  scorer=scorer,
242
+ n_contexts=self.config.n_contexts,
243
  )
244
 
245
  # Run evaluation for each question
 
252
  ground_truth = {
253
  "question": row["question"],
254
  "pres_name": row["pres_name"],
255
+ "pages": [int(x) if x else -1 for x in row["page"].split(",")],
256
  }
257
 
258
  output = retriever(dict(question=row["question"]))
259
  results = self.evaluate_single(
260
  retriever=retriever,
261
  question=row["question"],
262
+ ground_truth=ground_truth,
263
  )
264
 
265
  for name, value in results.items():
 
275
  p["pres_name"] for p in output["contexts"]
276
  ],
277
  "retrieved_pages": [
278
+ ",".join(map(str, p["pages"])) for p in output["contexts"]
 
279
  ],
280
  **{
281
  f"metric_{name}": value
282
  for name, value in results.items()
283
  if isinstance(value, (int, float))
284
+ },
285
  }
286
 
287
  # Add LLM explanation if available
288
  if "llm_relevance_explanation" in results:
289
+ result_row["llm_explanation"] = results[
290
+ "llm_relevance_explanation"
291
+ ]
292
 
293
  results_log.append(result_row)
294
 
295
  # Save metrics results
296
  results_df = pd.DataFrame(results_log)
297
 
298
+ # Save with file
299
+
300
  with NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
301
  results_df.to_csv(f.name, index=False)
302
+ fpath = str("detailed_results")
303
  mlflow.log_artifact(f.name, fpath)
304
 
305
  # Log average metrics
 
308
  mlflow.log_metric(f"mean_{name}", sum(values) / len(values))
309
 
310
 
 
 
311
  def main():
312
  from dotenv import load_dotenv
313
 
 
322
  llm = project_config.model_config.load_vsegpt(model="openai/gpt-4o-mini")
323
  embeddings = project_config.embedding_config.load_vsegpt()
324
 
325
+ storage = ChromaSlideStore(collection_name="pres1", embedding_model=embeddings)
326
 
327
+ db_path = project_config.navigator.eval_runs / "mlruns.db"
328
+ artifacts_path = project_config.navigator.eval_artifacts
329
  eval_config = EvaluationConfig(
330
+ experiment_name="PresRetrieve_mlflow_5",
331
  metrics=["presentation_match", "page_match"],
332
  scorers=[MinScorer(), HyperbolicScorer()],
333
+ tracking_uri=f"sqlite:////{db_path}",
334
+ artifacts_uri=f"file:////{artifacts_path}",
335
  )
336
 
337
  evaluator = RAGEvaluator(
338
  storage=storage,
339
  config=eval_config,
340
+ # llm=llm
341
  )
342
 
343
  # Load questions
344
  sheet_id = os.environ["BENCHMARK_SPREADSHEET_ID"]
345
  questions_df = evaluator.load_questions_from_sheet(sheet_id)
346
 
347
+ questions_df = questions_df.sample(5)
348
 
349
  # Run evaluation
350
  evaluator.run_evaluation(questions_df)
src/eval/evaluate.py CHANGED
@@ -255,9 +255,9 @@ class RAGEvaluatorLangsmith:
255
  self.llm = LangchainLLMWrapper(llm_unwrapped)
256
 
257
  @classmethod
258
- def load_questions_from_sheet(cls, sheet_id: str) -> pd.DataFrame:
259
  """Load evaluation questions from Google Sheets and preprocess dataset"""
260
- df = load_spreadsheet(sheet_id)
261
  df.fillna(dict(page=""), inplace=True)
262
  return df
263
 
 
255
  self.llm = LangchainLLMWrapper(llm_unwrapped)
256
 
257
  @classmethod
258
+ def load_questions_from_sheet(cls, *args, **kwargs) -> pd.DataFrame:
259
  """Load evaluation questions from Google Sheets and preprocess dataset"""
260
+ df = load_spreadsheet(*args, **kwargs)
261
  df.fillna(dict(page=""), inplace=True)
262
  return df
263