import os from langchain_community.tools import Tool, BraveSearch, YouTubeSearchTool from langchain_community.tools import DuckDuckGoSearchResults, GoogleSearchResults from langchain_community.tools import WikipediaQueryRun from langchain_community.utilities import WikipediaAPIWrapper, WolframAlphaAPIWrapper from langchain_community.tools import WolframAlphaQueryRun from typing import Any, Dict, List, Optional import json import re from datetime import datetime, timedelta import io # for BytesIO # Structured tools try: from langchain_core.tools import tool except Exception: def tool(*args, **kwargs): def _wrap(fn): return fn return _wrap # Optional deps try: from youtube_transcript_api import ( YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, ) except Exception: YouTubeTranscriptApi = None # type: ignore TranscriptsDisabled = Exception # type: ignore NoTranscriptFound = Exception # type: ignore try: from dateutil import parser as date_parser from dateutil.relativedelta import relativedelta except Exception: date_parser = None # type: ignore relativedelta = None # type: ignore try: from zoneinfo import ZoneInfo # py>=3.9 except Exception: ZoneInfo = None # type: ignore try: import pandas as pd except Exception: pd = None # type: ignore try: import requests except Exception: requests = None # type: ignore def _parse_video_id(url_or_id: str) -> Optional[str]: s = (url_or_id or "").strip() if re.fullmatch(r"[0-9A-Za-z_-]{11}", s): return s try: from urllib.parse import urlparse, parse_qs u = urlparse(s) if u.netloc.endswith(("youtube.com", "m.youtube.com", "music.youtube.com")): qs = parse_qs(u.query) v = (qs.get("v") or [""])[0] if re.fullmatch(r"[0-9A-Za-z_-]{11}", v): return v if u.netloc.endswith("youtu.be"): vid = u.path.lstrip("/").split("/")[0] if re.fullmatch(r"[0-9A-Za-z_-]{11}", vid): return vid except Exception: pass return None def _to_dt(value: str, tz: Optional[str] = None) -> datetime: if date_parser is not None: dt = date_parser.parse(value) else: try: dt = datetime.fromisoformat(value) except Exception: dt = datetime.strptime(value, "%Y-%m-%d") if tz and ZoneInfo is not None: try: z = ZoneInfo(tz) dt = dt.replace( tzinfo=z) if dt.tzinfo is None else dt.astimezone(z) except Exception: pass return dt @tool("youtube_transcript", return_direct=False) def youtube_transcript(video: str, languages: Optional[List[str]] = None, max_chars: int = 8000) -> Dict[str, Any]: """ Get YouTube transcript for a video URL or ID. Params: - video: URL or 11-char video ID - languages: preferred languages, e.g. ["vi","en"] - max_chars: truncate long transcripts """ if YouTubeTranscriptApi is None: return {"ok": False, "error": "youtube-transcript-api not installed. pip install youtube-transcript-api"} vid = _parse_video_id(video) if not vid: return {"ok": False, "error": "Invalid YouTube video id/url."} langs = languages or ["vi", "en"] try: segs = None try: segs = YouTubeTranscriptApi.get_transcript(vid, languages=langs) except NoTranscriptFound: try: segs = YouTubeTranscriptApi.get_transcript( vid, languages=["en"]) except Exception: pass if not segs: try: tx = YouTubeTranscriptApi.list_transcripts(vid) for tr in tx: if tr.is_translatable and "en" in langs: segs = tr.translate("en").fetch() break except Exception: pass if not segs: return {"ok": False, "error": "No transcript available."} text = " ".join(s.get("text", "") for s in segs).strip() if max_chars and len(text) > max_chars: text = text[:max_chars] + " ...[truncated]..." return {"ok": True, "data": {"video_id": vid, "text": text, "segments": segs}} except TranscriptsDisabled: return {"ok": False, "error": "Transcripts are disabled for this video."} except Exception as e: return {"ok": False, "error": f"Transcript fetch failed: {e}"} @tool("youtube_transcript_srt", return_direct=False) def youtube_transcript_srt(video: str, languages: Optional[List[str]] = None, max_segments: Optional[int] = None) -> Dict[str, Any]: """ Return the YouTube transcript as SRT captions. Params: - video: URL or 11-char video ID - languages: preferred languages, e.g. ["vi","en"] - max_segments: limit number of caption segments (optional) """ try: # Reuse the existing transcript tool to fetch segments res = youtube_transcript(video=video, languages=languages, max_chars=0) if not res.get("ok"): return res segs = (res.get("data") or {}).get("segments") or [] if max_segments is not None and max_segments > 0: segs = segs[:max_segments] def _srt_time(sec: float) -> str: sec = max(0.0, float(sec or 0.0)) ms = int(round((sec - int(sec)) * 1000)) s = int(sec) % 60 m = (int(sec) // 60) % 60 h = int(sec) // 3600 return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" lines: List[str] = [] for i, seg in enumerate(segs, 1): start = float(seg.get("start", 0.0)) end = start + float(seg.get("duration", 0.0)) text = str(seg.get("text", "")).strip() lines.append(str(i)) lines.append(f"{_srt_time(start)} --> {_srt_time(end)}") lines.append(text) lines.append("") # blank line between blocks srt = "\n".join(lines).strip() + ("\n" if lines else "") return {"ok": True, "data": {"srt": srt, "segments": len(segs)}} except Exception as e: return {"ok": False, "error": f"SRT generation failed: {e}"} @tool("date_today", return_direct=False) def date_today(tz: Optional[str] = None) -> Dict[str, Any]: """ Return today's datetime fields. """ now = datetime.now( ZoneInfo(tz)) if tz and ZoneInfo is not None else datetime.now() return {"ok": True, "data": {"iso": now.isoformat(), "date": now.date().isoformat(), "time": now.time().isoformat(timespec="seconds")}} @tool("date_parse", return_direct=False) def date_parse(date_str: str, tz: Optional[str] = None) -> Dict[str, Any]: """ Parse a date/time string into ISO fields. """ try: dt = _to_dt(date_str, tz) return {"ok": True, "data": {"iso": dt.isoformat(), "date": dt.date().isoformat(), "time": dt.time().isoformat(timespec="seconds")}} except Exception as e: return {"ok": False, "error": f"Parse failed: {e}"} @tool("date_add", return_direct=False) def date_add(date_str: str, days: int = 0, months: int = 0, years: int = 0, tz: Optional[str] = None) -> Dict[str, Any]: """ Add/subtract days/months/years to a date/time. """ try: dt = _to_dt(date_str, tz) if relativedelta is not None: dt2 = dt + relativedelta(days=days, months=months, years=years) else: if months or years: return {"ok": False, "error": "Month/year arithmetic needs python-dateutil. pip install python-dateutil"} dt2 = dt + timedelta(days=days) return {"ok": True, "data": {"iso": dt2.isoformat(), "date": dt2.date().isoformat(), "time": dt2.time().isoformat(timespec="seconds")}} except Exception as e: return {"ok": False, "error": f"Add failed: {e}"} @tool("date_diff", return_direct=False) def date_diff(start: str, end: str, unit: str = "days", tz: Optional[str] = None) -> Dict[str, Any]: """ Difference between two date/times. unit: days|hours|minutes|seconds. """ try: d1 = _to_dt(start, tz) d2 = _to_dt(end, tz) seconds = (d2 - d1).total_seconds() unit = (unit or "days").lower() if unit == "seconds": value = seconds elif unit == "minutes": value = seconds / 60 elif unit == "hours": value = seconds / 3600 else: unit = "days" value = seconds / 86400 return {"ok": True, "data": {"value": value, "unit": unit}} except Exception as e: return {"ok": False, "error": f"Diff failed: {e}"} @tool("next_weekday", return_direct=False) def next_weekday(date_str: str, weekday: int, include_today: bool = False, tz: Optional[str] = None) -> Dict[str, Any]: """ Next date matching weekday (0=Mon..6=Sun). """ try: base = _to_dt(date_str, tz).date() wd = int(weekday) % 7 delta = (wd - base.weekday()) % 7 if delta == 0 and not include_today: delta = 7 target = base + timedelta(days=delta) return {"ok": True, "data": {"date": target.isoformat(), "weekday": wd}} except Exception as e: return {"ok": False, "error": f"next_weekday failed: {e}"} @tool("date_format", return_direct=False) def date_format(date_str: str, fmt: str = "%Y-%m-%d %H:%M:%S", tz: Optional[str] = None) -> Dict[str, Any]: """ Format a date/time string with strftime. """ try: dt = _to_dt(date_str, tz) return {"ok": True, "data": {"formatted": dt.strftime(fmt)}} except Exception as e: return {"ok": False, "error": f"Format failed: {e}"} @tool("read_excel", return_direct=False) def read_excel(path_or_url: str, sheet: Optional[str] = None, nrows: int = 100, usecols: Optional[str] = None, header: Optional[int] = 0) -> Dict[str, Any]: """ Read a worksheet from an Excel file (.xlsx/.xls/.xlsm) from a local path or HTTP(S) URL. Params: - path_or_url: local file path or URL. - sheet: sheet name or 0-based index (default: first sheet). - nrows: max number of rows to return (default: 100). - usecols: Excel-style column selection, e.g., 'A:D' or 'A,C:E'. - header: row index to use as header (default: 0). Use None for no header. """ if pd is None: return {"ok": False, "error": "pandas not installed. pip install pandas openpyxl"} src = (path_or_url or "").strip() if not src: return {"ok": False, "error": "Missing path_or_url"} try: data_src: Any if re.match(r"^https?://", src, re.I): if requests is None: return {"ok": False, "error": "requests not installed for URL fetching. pip install requests"} resp = requests.get(src, timeout=30) resp.raise_for_status() data_src = io.BytesIO(resp.content) else: if not os.path.exists(src): return {"ok": False, "error": f"File not found: {src}"} data_src = src sheet_name = 0 if sheet is None else sheet df = pd.read_excel( data_src, sheet_name=sheet_name, nrows=None if (nrows is None or nrows <= 0) else nrows, usecols=usecols, header=header ) if isinstance(df, dict): # safety if engine returns multiple sheets first_key = next(iter(df.keys())) df = df[first_key] sheet_used = first_key else: sheet_used = sheet_name if nrows and nrows > 0: df = df.head(nrows) columns = [str(c) for c in df.columns.tolist()] records = df.to_dict(orient="records") return { "ok": True, "data": { "sheet": sheet_used, "columns": columns, "records": records, "info": {"rows": len(records), "cols": len(columns)} } } except Exception as e: return {"ok": False, "error": "Excel read failed: {}".format(e)} @tool("read_text", return_direct=False) def read_text(path_or_url: str, max_chars: int = 20000, encoding: Optional[str] = None) -> Dict[str, Any]: """ Read a text file from a local path or HTTP(S) URL. Params: - path_or_url: local file path or URL. - max_chars: maximum characters to return (default: 20000). - encoding: optional text encoding override; if omitted, try to detect. """ src = (path_or_url or "").strip() if not src: return {"ok": False, "error": "Missing path_or_url"} try: text: str = "" used_encoding: str = "utf-8" if re.match(r"^https?://", src, re.I): if requests is None: return {"ok": False, "error": "requests not installed for URL fetching. pip install requests"} resp = requests.get(src, timeout=30) resp.raise_for_status() used_encoding = encoding or resp.encoding or getattr( resp, "apparent_encoding", None) or "utf-8" text = resp.content.decode(used_encoding, errors="replace") else: if not os.path.exists(src): return {"ok": False, "error": f"File not found: {src}"} enc_candidates = [encoding] if encoding else [ "utf-8", "utf-16", "utf-16-le", "utf-16-be", "latin-1"] for enc_try in enc_candidates: try: with open(src, "r", encoding=enc_try, errors="strict") as f: text = f.read() used_encoding = enc_try or "utf-8" break except Exception: continue else: with open(src, "rb") as f: raw = f.read() used_encoding = "latin-1" text = raw.decode(used_encoding, errors="replace") truncated = False if max_chars and max_chars > 0 and len(text) > max_chars: text = text[:max_chars] + " ...[truncated]..." truncated = True return { "ok": True, "data": { "path": src, "encoding": used_encoding, "truncated": truncated, "length": len(text), "text": text, }, } except Exception as e: return {"ok": False, "error": f"Text read failed: {e}"} def get_tools(): """ Returns a list of tools that can be used by the agent. """ wikipedia_api_wrapper = WikipediaAPIWrapper() tools = [ Tool( name="YouTubeSearch", func=YouTubeSearchTool().run, description="Search YouTube for videos." ), Tool( name="DuckDuckGoSearch", func=DuckDuckGoSearchResults().run, description="Search the web using DuckDuckGo." ), # Tool( # name="GoogleSearch", # func=GoogleSearchResults().run, # description="Search the web using Google." # ), Tool( name="WikipediaQuery", func=WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper).run, description="Query Wikipedia for information." ), # Tool( # name="WolframAlphaQuery", # func=WolframAlphaQueryRun().run, # description="Query Wolfram Alpha for computational knowledge." # ) ] # Add structured tools (LangChain @tool) tools.extend([ youtube_transcript, date_today, date_parse, date_add, date_diff, next_weekday, date_format, read_text, read_excel, youtube_transcript_srt, # new ]) return tools