trongld's picture
Update get_tools function to include WikipediaAPIWrapper and modify WikipediaQuery tool
9e2c057
import os
from langchain_community.tools import Tool, BraveSearch, YouTubeSearchTool
from langchain_community.tools import DuckDuckGoSearchResults, GoogleSearchResults
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper, WolframAlphaAPIWrapper
from langchain_community.tools import WolframAlphaQueryRun
from typing import Any, Dict, List, Optional
import json
import re
from datetime import datetime, timedelta
import io # for BytesIO
# Structured tools
try:
from langchain_core.tools import tool
except Exception:
def tool(*args, **kwargs):
def _wrap(fn): return fn
return _wrap
# Optional deps
try:
from youtube_transcript_api import (
YouTubeTranscriptApi,
TranscriptsDisabled,
NoTranscriptFound,
)
except Exception:
YouTubeTranscriptApi = None # type: ignore
TranscriptsDisabled = Exception # type: ignore
NoTranscriptFound = Exception # type: ignore
try:
from dateutil import parser as date_parser
from dateutil.relativedelta import relativedelta
except Exception:
date_parser = None # type: ignore
relativedelta = None # type: ignore
try:
from zoneinfo import ZoneInfo # py>=3.9
except Exception:
ZoneInfo = None # type: ignore
try:
import pandas as pd
except Exception:
pd = None # type: ignore
try:
import requests
except Exception:
requests = None # type: ignore
def _parse_video_id(url_or_id: str) -> Optional[str]:
s = (url_or_id or "").strip()
if re.fullmatch(r"[0-9A-Za-z_-]{11}", s):
return s
try:
from urllib.parse import urlparse, parse_qs
u = urlparse(s)
if u.netloc.endswith(("youtube.com", "m.youtube.com", "music.youtube.com")):
qs = parse_qs(u.query)
v = (qs.get("v") or [""])[0]
if re.fullmatch(r"[0-9A-Za-z_-]{11}", v):
return v
if u.netloc.endswith("youtu.be"):
vid = u.path.lstrip("/").split("/")[0]
if re.fullmatch(r"[0-9A-Za-z_-]{11}", vid):
return vid
except Exception:
pass
return None
def _to_dt(value: str, tz: Optional[str] = None) -> datetime:
if date_parser is not None:
dt = date_parser.parse(value)
else:
try:
dt = datetime.fromisoformat(value)
except Exception:
dt = datetime.strptime(value, "%Y-%m-%d")
if tz and ZoneInfo is not None:
try:
z = ZoneInfo(tz)
dt = dt.replace(
tzinfo=z) if dt.tzinfo is None else dt.astimezone(z)
except Exception:
pass
return dt
@tool("youtube_transcript", return_direct=False)
def youtube_transcript(video: str, languages: Optional[List[str]] = None, max_chars: int = 8000) -> Dict[str, Any]:
"""
Get YouTube transcript for a video URL or ID.
Params:
- video: URL or 11-char video ID
- languages: preferred languages, e.g. ["vi","en"]
- max_chars: truncate long transcripts
"""
if YouTubeTranscriptApi is None:
return {"ok": False, "error": "youtube-transcript-api not installed. pip install youtube-transcript-api"}
vid = _parse_video_id(video)
if not vid:
return {"ok": False, "error": "Invalid YouTube video id/url."}
langs = languages or ["vi", "en"]
try:
segs = None
try:
segs = YouTubeTranscriptApi.get_transcript(vid, languages=langs)
except NoTranscriptFound:
try:
segs = YouTubeTranscriptApi.get_transcript(
vid, languages=["en"])
except Exception:
pass
if not segs:
try:
tx = YouTubeTranscriptApi.list_transcripts(vid)
for tr in tx:
if tr.is_translatable and "en" in langs:
segs = tr.translate("en").fetch()
break
except Exception:
pass
if not segs:
return {"ok": False, "error": "No transcript available."}
text = " ".join(s.get("text", "") for s in segs).strip()
if max_chars and len(text) > max_chars:
text = text[:max_chars] + " ...[truncated]..."
return {"ok": True, "data": {"video_id": vid, "text": text, "segments": segs}}
except TranscriptsDisabled:
return {"ok": False, "error": "Transcripts are disabled for this video."}
except Exception as e:
return {"ok": False, "error": f"Transcript fetch failed: {e}"}
@tool("youtube_transcript_srt", return_direct=False)
def youtube_transcript_srt(video: str, languages: Optional[List[str]] = None, max_segments: Optional[int] = None) -> Dict[str, Any]:
"""
Return the YouTube transcript as SRT captions.
Params:
- video: URL or 11-char video ID
- languages: preferred languages, e.g. ["vi","en"]
- max_segments: limit number of caption segments (optional)
"""
try:
# Reuse the existing transcript tool to fetch segments
res = youtube_transcript(video=video, languages=languages, max_chars=0)
if not res.get("ok"):
return res
segs = (res.get("data") or {}).get("segments") or []
if max_segments is not None and max_segments > 0:
segs = segs[:max_segments]
def _srt_time(sec: float) -> str:
sec = max(0.0, float(sec or 0.0))
ms = int(round((sec - int(sec)) * 1000))
s = int(sec) % 60
m = (int(sec) // 60) % 60
h = int(sec) // 3600
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
lines: List[str] = []
for i, seg in enumerate(segs, 1):
start = float(seg.get("start", 0.0))
end = start + float(seg.get("duration", 0.0))
text = str(seg.get("text", "")).strip()
lines.append(str(i))
lines.append(f"{_srt_time(start)} --> {_srt_time(end)}")
lines.append(text)
lines.append("") # blank line between blocks
srt = "\n".join(lines).strip() + ("\n" if lines else "")
return {"ok": True, "data": {"srt": srt, "segments": len(segs)}}
except Exception as e:
return {"ok": False, "error": f"SRT generation failed: {e}"}
@tool("date_today", return_direct=False)
def date_today(tz: Optional[str] = None) -> Dict[str, Any]:
"""
Return today's datetime fields.
"""
now = datetime.now(
ZoneInfo(tz)) if tz and ZoneInfo is not None else datetime.now()
return {"ok": True, "data": {"iso": now.isoformat(), "date": now.date().isoformat(), "time": now.time().isoformat(timespec="seconds")}}
@tool("date_parse", return_direct=False)
def date_parse(date_str: str, tz: Optional[str] = None) -> Dict[str, Any]:
"""
Parse a date/time string into ISO fields.
"""
try:
dt = _to_dt(date_str, tz)
return {"ok": True, "data": {"iso": dt.isoformat(), "date": dt.date().isoformat(), "time": dt.time().isoformat(timespec="seconds")}}
except Exception as e:
return {"ok": False, "error": f"Parse failed: {e}"}
@tool("date_add", return_direct=False)
def date_add(date_str: str, days: int = 0, months: int = 0, years: int = 0, tz: Optional[str] = None) -> Dict[str, Any]:
"""
Add/subtract days/months/years to a date/time.
"""
try:
dt = _to_dt(date_str, tz)
if relativedelta is not None:
dt2 = dt + relativedelta(days=days, months=months, years=years)
else:
if months or years:
return {"ok": False, "error": "Month/year arithmetic needs python-dateutil. pip install python-dateutil"}
dt2 = dt + timedelta(days=days)
return {"ok": True, "data": {"iso": dt2.isoformat(), "date": dt2.date().isoformat(), "time": dt2.time().isoformat(timespec="seconds")}}
except Exception as e:
return {"ok": False, "error": f"Add failed: {e}"}
@tool("date_diff", return_direct=False)
def date_diff(start: str, end: str, unit: str = "days", tz: Optional[str] = None) -> Dict[str, Any]:
"""
Difference between two date/times. unit: days|hours|minutes|seconds.
"""
try:
d1 = _to_dt(start, tz)
d2 = _to_dt(end, tz)
seconds = (d2 - d1).total_seconds()
unit = (unit or "days").lower()
if unit == "seconds":
value = seconds
elif unit == "minutes":
value = seconds / 60
elif unit == "hours":
value = seconds / 3600
else:
unit = "days"
value = seconds / 86400
return {"ok": True, "data": {"value": value, "unit": unit}}
except Exception as e:
return {"ok": False, "error": f"Diff failed: {e}"}
@tool("next_weekday", return_direct=False)
def next_weekday(date_str: str, weekday: int, include_today: bool = False, tz: Optional[str] = None) -> Dict[str, Any]:
"""
Next date matching weekday (0=Mon..6=Sun).
"""
try:
base = _to_dt(date_str, tz).date()
wd = int(weekday) % 7
delta = (wd - base.weekday()) % 7
if delta == 0 and not include_today:
delta = 7
target = base + timedelta(days=delta)
return {"ok": True, "data": {"date": target.isoformat(), "weekday": wd}}
except Exception as e:
return {"ok": False, "error": f"next_weekday failed: {e}"}
@tool("date_format", return_direct=False)
def date_format(date_str: str, fmt: str = "%Y-%m-%d %H:%M:%S", tz: Optional[str] = None) -> Dict[str, Any]:
"""
Format a date/time string with strftime.
"""
try:
dt = _to_dt(date_str, tz)
return {"ok": True, "data": {"formatted": dt.strftime(fmt)}}
except Exception as e:
return {"ok": False, "error": f"Format failed: {e}"}
@tool("read_excel", return_direct=False)
def read_excel(path_or_url: str, sheet: Optional[str] = None, nrows: int = 100, usecols: Optional[str] = None, header: Optional[int] = 0) -> Dict[str, Any]:
"""
Read a worksheet from an Excel file (.xlsx/.xls/.xlsm) from a local path or HTTP(S) URL.
Params:
- path_or_url: local file path or URL.
- sheet: sheet name or 0-based index (default: first sheet).
- nrows: max number of rows to return (default: 100).
- usecols: Excel-style column selection, e.g., 'A:D' or 'A,C:E'.
- header: row index to use as header (default: 0). Use None for no header.
"""
if pd is None:
return {"ok": False, "error": "pandas not installed. pip install pandas openpyxl"}
src = (path_or_url or "").strip()
if not src:
return {"ok": False, "error": "Missing path_or_url"}
try:
data_src: Any
if re.match(r"^https?://", src, re.I):
if requests is None:
return {"ok": False, "error": "requests not installed for URL fetching. pip install requests"}
resp = requests.get(src, timeout=30)
resp.raise_for_status()
data_src = io.BytesIO(resp.content)
else:
if not os.path.exists(src):
return {"ok": False, "error": f"File not found: {src}"}
data_src = src
sheet_name = 0 if sheet is None else sheet
df = pd.read_excel(
data_src,
sheet_name=sheet_name,
nrows=None if (nrows is None or nrows <= 0) else nrows,
usecols=usecols,
header=header
)
if isinstance(df, dict): # safety if engine returns multiple sheets
first_key = next(iter(df.keys()))
df = df[first_key]
sheet_used = first_key
else:
sheet_used = sheet_name
if nrows and nrows > 0:
df = df.head(nrows)
columns = [str(c) for c in df.columns.tolist()]
records = df.to_dict(orient="records")
return {
"ok": True,
"data": {
"sheet": sheet_used,
"columns": columns,
"records": records,
"info": {"rows": len(records), "cols": len(columns)}
}
}
except Exception as e:
return {"ok": False, "error": "Excel read failed: {}".format(e)}
@tool("read_text", return_direct=False)
def read_text(path_or_url: str, max_chars: int = 20000, encoding: Optional[str] = None) -> Dict[str, Any]:
"""
Read a text file from a local path or HTTP(S) URL.
Params:
- path_or_url: local file path or URL.
- max_chars: maximum characters to return (default: 20000).
- encoding: optional text encoding override; if omitted, try to detect.
"""
src = (path_or_url or "").strip()
if not src:
return {"ok": False, "error": "Missing path_or_url"}
try:
text: str = ""
used_encoding: str = "utf-8"
if re.match(r"^https?://", src, re.I):
if requests is None:
return {"ok": False, "error": "requests not installed for URL fetching. pip install requests"}
resp = requests.get(src, timeout=30)
resp.raise_for_status()
used_encoding = encoding or resp.encoding or getattr(
resp, "apparent_encoding", None) or "utf-8"
text = resp.content.decode(used_encoding, errors="replace")
else:
if not os.path.exists(src):
return {"ok": False, "error": f"File not found: {src}"}
enc_candidates = [encoding] if encoding else [
"utf-8", "utf-16", "utf-16-le", "utf-16-be", "latin-1"]
for enc_try in enc_candidates:
try:
with open(src, "r", encoding=enc_try, errors="strict") as f:
text = f.read()
used_encoding = enc_try or "utf-8"
break
except Exception:
continue
else:
with open(src, "rb") as f:
raw = f.read()
used_encoding = "latin-1"
text = raw.decode(used_encoding, errors="replace")
truncated = False
if max_chars and max_chars > 0 and len(text) > max_chars:
text = text[:max_chars] + " ...[truncated]..."
truncated = True
return {
"ok": True,
"data": {
"path": src,
"encoding": used_encoding,
"truncated": truncated,
"length": len(text),
"text": text,
},
}
except Exception as e:
return {"ok": False, "error": f"Text read failed: {e}"}
def get_tools():
"""
Returns a list of tools that can be used by the agent.
"""
wikipedia_api_wrapper = WikipediaAPIWrapper()
tools = [
Tool(
name="YouTubeSearch",
func=YouTubeSearchTool().run,
description="Search YouTube for videos."
),
Tool(
name="DuckDuckGoSearch",
func=DuckDuckGoSearchResults().run,
description="Search the web using DuckDuckGo."
),
# Tool(
# name="GoogleSearch",
# func=GoogleSearchResults().run,
# description="Search the web using Google."
# ),
Tool(
name="WikipediaQuery",
func=WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper).run,
description="Query Wikipedia for information."
),
# Tool(
# name="WolframAlphaQuery",
# func=WolframAlphaQueryRun().run,
# description="Query Wolfram Alpha for computational knowledge."
# )
]
# Add structured tools (LangChain @tool)
tools.extend([
youtube_transcript,
date_today,
date_parse,
date_add,
date_diff,
next_weekday,
date_format,
read_text,
read_excel,
youtube_transcript_srt, # new
])
return tools