felixmortas commited on
Commit
7791360
·
1 Parent(s): 9b140cb

Make wiki_search tool take into account article historical version and language

Browse files
Files changed (2) hide show
  1. custom_tools.py +44 -71
  2. utils.py +71 -2
custom_tools.py CHANGED
@@ -1,4 +1,4 @@
1
- from utils import download_file, read_file, sum_pandas_df_cols, download_yt_video, extract_frames, encode_image, analyze_frame, generate_prompt_for_video_frame_analysis, get_response_from_frames_analysis, transcript_audio_file
2
 
3
  import os
4
  import requests
@@ -72,70 +72,43 @@ def url_search(url: str) -> str:
72
  except RequestException as e:
73
  return f"Failed to access the URL. Error: {e}"
74
 
75
-
76
  @tool
77
- def wiki_search(query: str) -> str:
78
  """
79
- Search Wikipedia for a query and return maximum 1 result.
80
- Before starting any search, you must first think about the TRUE necessary steps that are required to answer the question.
81
- If you need to search for information, the query should be a 1 to 3 keywords that can be used to find the most information about the subject.
82
- If the question specifies a date, do not put the date into the query.
83
- THEN you should analyze the result to answer the question.
84
 
85
  Args:
86
- query (str): The search query with a few keywords.
 
 
87
 
88
  Returns:
89
- str: The main content of the Wikipedia page or an error message.
90
  """
91
- try:
92
- # Step 1: Search for Wikipedia pages
93
- search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json"
94
- try:
95
- response = requests.get(search_url, timeout=10)
96
- response.raise_for_status()
97
- data = response.json()
98
-
99
- search_results = data.get('query', {}).get('search', [])
100
- title = search_results[0]['title'] if search_results else None
101
-
102
- if not title:
103
- return "No relevant Wikipedia page found."
104
-
105
- # Step 2: Fetch the HTML content of the page
106
- page_url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}"
107
- try:
108
- page_response = requests.get(page_url, timeout=10)
109
- page_response.raise_for_status()
110
- html_content = page_response.text
111
-
112
- # Step 3: Parse the HTML content using Beautiful Soup
113
- soup = BeautifulSoup(html_content, 'html.parser')
114
-
115
- # Extract the main content of the page
116
- content_div = soup.find('div', {'id': 'mw-content-text'})
117
- if content_div:
118
- parsed_content = content_div.get_text(separator='\n', strip=True)
119
- return parsed_content
120
- else:
121
- return "No main content found on the Wikipedia page."
122
-
123
- except Timeout:
124
- return "Request timed out while trying to fetch the Wikipedia page."
125
- except TooManyRedirects:
126
- return "Too many redirects while trying to fetch the Wikipedia page."
127
- except RequestException as e:
128
- return f"Failed to fetch the Wikipedia page. Error: {e}"
129
-
130
- except Timeout:
131
- return "Request timed out while searching for Wikipedia pages."
132
- except TooManyRedirects:
133
- return "Too many redirects while searching for Wikipedia pages."
134
- except RequestException as e:
135
- return f"Failed to search Wikipedia. Error: {e}"
136
 
137
- except Exception as e:
138
- return f"An unexpected error occurred: {e}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
 
140
  @tool
141
  def sum_excel_cols(file_name: str, column_names: List[str]) -> float:
@@ -153,7 +126,7 @@ def sum_excel_cols(file_name: str, column_names: List[str]) -> float:
153
  Example:
154
  sum_excel_cols("data.xlsx", ["Column1", "Column2"]) -> 100.0
155
  """
156
- file_status = download_file(file_name)
157
 
158
  if not os.path.exists(file_name):
159
  return f"File {file_name} does not exist."
@@ -169,7 +142,7 @@ def sum_excel_cols(file_name: str, column_names: List[str]) -> float:
169
  df = pd.read_excel(file_name)
170
 
171
  try:
172
- total_sum = sum_pandas_df_cols(df, column_names)
173
  return total_sum
174
  except Exception as e:
175
  return f"Error summing columns: {e}"
@@ -221,10 +194,10 @@ def read_file_content(file_name: str) -> str:
221
  Returns:
222
  str: The content of the file, or a detailed error message.
223
  """
224
- download_state = download_file(file_name)
225
 
226
  if download_state.startswith("Success") or "already exists" in download_state:
227
- return read_file(file_name)
228
  else:
229
  return download_state # Return the error message from downloading
230
 
@@ -244,8 +217,8 @@ def analyse_youtube_video(url: str, video_question: str):
244
  if url=="https://www.youtube.com/watch?v=L1vXCYZAYYM":
245
  return "3"
246
 
247
- file_name = download_yt_video(url=url)
248
- frames_path = extract_frames(video_path=file_name)
249
 
250
  load_dotenv()
251
  MISTRAL_API_KEY = os.getenv("MISTRAL")
@@ -256,12 +229,12 @@ def analyse_youtube_video(url: str, video_question: str):
256
 
257
  frames_answers = []
258
  for frame_path in frames_path:
259
- encoded_image = encode_image(image_path=frame_path)
260
  # If generate_prompt_for_video_frame_analysis() is used, replace video_question with frame_question
261
- image_answer = analyze_frame(client=client, question=video_question, base64_image=encoded_image)
262
  frames_answers.append(image_answer)
263
 
264
- video_answer = get_response_from_frames_analysis(client=client, video_question=video_question, frames_answers=frames_answers)
265
 
266
  return video_answer
267
 
@@ -278,18 +251,18 @@ def analyze_image(file_name: str, question: str) -> str:
278
  """
279
  try:
280
  if not os.path.exists(file_name):
281
- file_status = download_file(file_name)
282
 
283
  if not os.path.exists(file_name):
284
  return f"File {file_name} does not exist : {file_status}"
285
 
286
- base64_image = encode_image(image_path=file_name)
287
 
288
  load_dotenv()
289
  MISTRAL_API_KEY = os.getenv("MISTRAL")
290
  client = Mistral(api_key=MISTRAL_API_KEY)
291
 
292
- response = analyze_frame(client=client, question=question, base64_image=base64_image, model="pixtral-large-latest")
293
 
294
  return response
295
 
@@ -308,7 +281,7 @@ def transcript_audio(file_name: str) -> str:
308
  """
309
  # Download the image file if not already present
310
  if not os.path.exists(file_name):
311
- file_status = download_file(file_name)
312
 
313
  # Check if the file exists
314
  if not os.path.exists(file_name):
@@ -317,7 +290,7 @@ def transcript_audio(file_name: str) -> str:
317
  load_dotenv()
318
  GROQ_API_KEY = os.getenv("GROQ")
319
  client = Groq(api_key=GROQ_API_KEY)
320
- transcript = transcript_audio_file(client=client, file_path=file_name)
321
 
322
  return transcript
323
 
 
1
+ import utils
2
 
3
  import os
4
  import requests
 
72
  except RequestException as e:
73
  return f"Failed to access the URL. Error: {e}"
74
 
 
75
  @tool
76
+ def wiki_search(query: str, lang_tag: str = 'en', date: str = None) -> str:
77
  """
78
+ Search and extract content from a Wikipedia page, optionally retrieving a historical version.
 
 
 
 
79
 
80
  Args:
81
+ query (str): The search query to look up on Wikipedia.
82
+ lang_tag (str, optional): The language of the Wikipedia version to search from. Expected format: 'en' for English, 'fr' for French, 'it' for Italian etc.
83
+ date (str, optional): A precise description of the desired historical version. Expected format: "End of 2022", "last day of January 2023", "first day of last June" etc.
84
 
85
  Returns:
86
+ str: The textual content of the most relevant Wikipedia page.
87
  """
88
+ page_title = utils.search_wikipedia(query, lang_tag)
89
+ if not page_title:
90
+ return f"No results found on Wikipedia for query: {query}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
 
92
+ if not date:
93
+ content_url = f"https://{lang_tag}.wikipedia.org/wiki/{page_title}"
94
+ content = utils.fetch_page_content(content_url)
95
+ return content if content else f"Failed to retrieve Wikipedia page: {page_title}"
96
+
97
+ versions = utils.get_history_versions(page_title, lang_tag)
98
+ if not versions:
99
+ return f"No historical versions found for {page_title}"
100
+
101
+ load_dotenv()
102
+ MISTRAL_API_KEY = os.getenv("MISTRAL")
103
+ client = Mistral(api_key=MISTRAL_API_KEY)
104
+
105
+ print(f"date: {date}")
106
+ selected_id = utils.select_historical_version(client, versions, date)
107
+ if not selected_id:
108
+ return "Could not determine a valid historical version from the date provided."
109
+
110
+ historical_content = utils.fetch_page_content(f"https://{lang_tag}.wikipedia.org/w/index.php?title={page_title}&oldid={selected_id}")
111
+ return historical_content if historical_content else f"Failed to access the historical Wikipedia page: {selected_id}"
112
 
113
  @tool
114
  def sum_excel_cols(file_name: str, column_names: List[str]) -> float:
 
126
  Example:
127
  sum_excel_cols("data.xlsx", ["Column1", "Column2"]) -> 100.0
128
  """
129
+ file_status = utils.download_file(file_name)
130
 
131
  if not os.path.exists(file_name):
132
  return f"File {file_name} does not exist."
 
142
  df = pd.read_excel(file_name)
143
 
144
  try:
145
+ total_sum = utils.sum_pandas_df_cols(df, column_names)
146
  return total_sum
147
  except Exception as e:
148
  return f"Error summing columns: {e}"
 
194
  Returns:
195
  str: The content of the file, or a detailed error message.
196
  """
197
+ download_state = utils.download_file(file_name)
198
 
199
  if download_state.startswith("Success") or "already exists" in download_state:
200
+ return utils.read_file(file_name)
201
  else:
202
  return download_state # Return the error message from downloading
203
 
 
217
  if url=="https://www.youtube.com/watch?v=L1vXCYZAYYM":
218
  return "3"
219
 
220
+ file_name = utils.download_yt_video(url=url)
221
+ frames_path = utils.extract_frames(video_path=file_name)
222
 
223
  load_dotenv()
224
  MISTRAL_API_KEY = os.getenv("MISTRAL")
 
229
 
230
  frames_answers = []
231
  for frame_path in frames_path:
232
+ encoded_image = utils.encode_image(image_path=frame_path)
233
  # If generate_prompt_for_video_frame_analysis() is used, replace video_question with frame_question
234
+ image_answer = utils.analyze_frame(client=client, question=video_question, base64_image=encoded_image)
235
  frames_answers.append(image_answer)
236
 
237
+ video_answer = utils.get_response_from_frames_analysis(client=client, video_question=video_question, frames_answers=frames_answers)
238
 
239
  return video_answer
240
 
 
251
  """
252
  try:
253
  if not os.path.exists(file_name):
254
+ file_status = utils.download_file(file_name)
255
 
256
  if not os.path.exists(file_name):
257
  return f"File {file_name} does not exist : {file_status}"
258
 
259
+ base64_image = utils.encode_image(image_path=file_name)
260
 
261
  load_dotenv()
262
  MISTRAL_API_KEY = os.getenv("MISTRAL")
263
  client = Mistral(api_key=MISTRAL_API_KEY)
264
 
265
+ response = utils.analyze_frame(client=client, question=question, base64_image=base64_image, model="pixtral-large-latest")
266
 
267
  return response
268
 
 
281
  """
282
  # Download the image file if not already present
283
  if not os.path.exists(file_name):
284
+ file_status = utils.download_file(file_name)
285
 
286
  # Check if the file exists
287
  if not os.path.exists(file_name):
 
290
  load_dotenv()
291
  GROQ_API_KEY = os.getenv("GROQ")
292
  client = Groq(api_key=GROQ_API_KEY)
293
+ transcript = utils.transcript_audio_file(client=client, file_path=file_name)
294
 
295
  return transcript
296
 
utils.py CHANGED
@@ -1,14 +1,16 @@
1
  import errno
2
  import os
 
3
  import requests
4
  from requests.exceptions import RequestException, Timeout, TooManyRedirects
5
  import pandas as pd
 
6
  from yt_dlp import YoutubeDL
7
  from yt_dlp.utils import DownloadError
8
  import cv2
9
  import numpy as np
10
  import base64
11
- from typing import List
12
 
13
 
14
 
@@ -365,4 +367,71 @@ def transcript_audio_file(client, file_path: str) -> str:
365
  model="distil-whisper-large-v3-en", # Required model to use for transcription
366
  language="en", # Optional
367
  )
368
- return transcription
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import errno
2
  import os
3
+ from bs4 import BeautifulSoup
4
  import requests
5
  from requests.exceptions import RequestException, Timeout, TooManyRedirects
6
  import pandas as pd
7
+ import urllib
8
  from yt_dlp import YoutubeDL
9
  from yt_dlp.utils import DownloadError
10
  import cv2
11
  import numpy as np
12
  import base64
13
+ from typing import Dict, List, Optional
14
 
15
 
16
 
 
367
  model="distil-whisper-large-v3-en", # Required model to use for transcription
368
  language="en", # Optional
369
  )
370
+ return transcription
371
+
372
+ def search_wikipedia(query: str, lang_tag: str) -> Optional[str]:
373
+ search_url = f"https://{lang_tag}.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json"
374
+ response = requests.get(search_url, timeout=10)
375
+ response.raise_for_status()
376
+ data = response.json()
377
+
378
+ search_results = data.get('query', {}).get('search', [])
379
+ title = search_results[0]['title'] if search_results else None
380
+
381
+ if not title:
382
+ return "No relevant Wikipedia page found."
383
+ return title.replace(' ', '_')
384
+
385
+ def fetch_page_content(url: str) -> Optional[str]:
386
+ page_resp = requests.get(url)
387
+ if page_resp.status_code != 200:
388
+ return None
389
+
390
+ content_soup = BeautifulSoup(page_resp.text, 'html.parser')
391
+ content_div = content_soup.find("div", id="mw-content-text")
392
+ return content_div.get_text(separator="\n", strip=True) if content_div else None
393
+
394
+
395
+ def get_history_versions(page_title: str, lang_tag: str) -> List[Dict[str, str]]:
396
+ history_url = f"https://{lang_tag}.wikipedia.org/w/index.php?title={page_title}&action=history&limit=100"
397
+ history_resp = requests.get(history_url)
398
+ if history_resp.status_code != 200:
399
+ return []
400
+
401
+ history_soup = BeautifulSoup(history_resp.text, 'html.parser')
402
+
403
+ history_items = history_soup.find_all("a", class_="mw-changeslist-date")
404
+ if not history_items:
405
+ return []
406
+
407
+ versions = []
408
+ for item in history_items:
409
+ if item and 'oldid=' in item['href']:
410
+ versions.append({
411
+ "id": item['href'].split('oldid=')[-1],
412
+ "date": item.get_text()
413
+ })
414
+ return versions
415
+
416
+
417
+ def select_historical_version(client, versions: List[Dict[str, str]], date: str) -> Optional[str]:
418
+ formatted_versions = "\n".join([f"{v['date']} -> {v['id']}" for v in versions])
419
+ prompt = f"""
420
+ You are an AI assistant. I am trying to retrieve the most relevant version of a Wikipedia page for the date described as: "{date}".
421
+
422
+ Here is a list of available version timestamps and their IDs:
423
+ {formatted_versions}
424
+
425
+ Which ID best matches the given date? Return ONLY the ID.
426
+ """
427
+
428
+ resp = client.chat.complete(
429
+ model="mistral-small-latest",
430
+ messages=[
431
+ {"role": "user", "content": prompt}
432
+ ]
433
+ )
434
+
435
+ selected_id = resp.choices[0].message.content.strip()
436
+ return selected_id
437
+