philipp-zettl commited on
Commit
da365b2
·
verified ·
1 Parent(s): 7afed3f

Add vrom_hub/fetcher.py

Browse files
Files changed (1) hide show
  1. vrom_hub/fetcher.py +199 -0
vrom_hub/fetcher.py ADDED
@@ -0,0 +1,199 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Documentation page fetcher.
3
+
4
+ Supports:
5
+ - HF documentation pages (explore_hf_docs style)
6
+ - Raw markdown content
7
+ - URLs to fetch markdown/HTML
8
+ """
9
+
10
+ from __future__ import annotations
11
+
12
+ import logging
13
+ import re
14
+ from typing import Optional
15
+ from urllib.parse import urlparse
16
+
17
+ import requests
18
+
19
+ from vrom_hub.chunker import DocPage
20
+
21
+ logger = logging.getLogger(__name__)
22
+
23
+
24
+ def _url_to_source_file(url: str) -> str:
25
+ """Convert a URL to a source_file path (e.g. 'trl/index.md')."""
26
+ parsed = urlparse(url)
27
+ path = parsed.path.rstrip("/")
28
+
29
+ # HF docs pattern: /docs/{lib}/{page}
30
+ hf_match = re.match(r'/docs/([^/]+)/(.*)', path)
31
+ if hf_match:
32
+ lib = hf_match.group(1)
33
+ page = hf_match.group(2) or "index"
34
+ return f"{lib}/{page}.md"
35
+
36
+ # Generic: use last path segments
37
+ segments = [s for s in path.split("/") if s]
38
+ if segments:
39
+ return "/".join(segments[-2:]) + ".md" if len(segments) >= 2 else segments[-1] + ".md"
40
+
41
+ return "unknown.md"
42
+
43
+
44
+ def _extract_title_from_markdown(content: str) -> str:
45
+ """Extract the first heading from markdown content."""
46
+ match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
47
+ if match:
48
+ return match.group(1).strip()
49
+ # Fallback: first non-empty line
50
+ for line in content.split('\n'):
51
+ line = line.strip()
52
+ if line:
53
+ return line[:100]
54
+ return "Untitled"
55
+
56
+
57
+ def _html_to_markdown(html: str) -> str:
58
+ """Basic HTML to markdown conversion (strip tags, preserve structure)."""
59
+ # Remove script and style tags
60
+ html = re.sub(r'<script[\s\S]*?</script>', '', html, flags=re.IGNORECASE)
61
+ html = re.sub(r'<style[\s\S]*?</style>', '', html, flags=re.IGNORECASE)
62
+
63
+ # Convert common tags
64
+ html = re.sub(r'<h1[^>]*>(.*?)</h1>', r'# \1', html, flags=re.IGNORECASE | re.DOTALL)
65
+ html = re.sub(r'<h2[^>]*>(.*?)</h2>', r'## \1', html, flags=re.IGNORECASE | re.DOTALL)
66
+ html = re.sub(r'<h3[^>]*>(.*?)</h3>', r'### \1', html, flags=re.IGNORECASE | re.DOTALL)
67
+ html = re.sub(r'<h4[^>]*>(.*?)</h4>', r'#### \1', html, flags=re.IGNORECASE | re.DOTALL)
68
+ html = re.sub(r'<code[^>]*>(.*?)</code>', r'`\1`', html, flags=re.IGNORECASE | re.DOTALL)
69
+ html = re.sub(r'<pre[^>]*>(.*?)</pre>', r'```\n\1\n```', html, flags=re.IGNORECASE | re.DOTALL)
70
+ html = re.sub(r'<br\s*/?>', '\n', html, flags=re.IGNORECASE)
71
+ html = re.sub(r'<p[^>]*>', '\n\n', html, flags=re.IGNORECASE)
72
+ html = re.sub(r'</p>', '', html, flags=re.IGNORECASE)
73
+ html = re.sub(r'<li[^>]*>', '\n- ', html, flags=re.IGNORECASE)
74
+
75
+ # Strip remaining tags
76
+ html = re.sub(r'<[^>]+>', '', html)
77
+
78
+ # Clean up whitespace
79
+ html = re.sub(r'\n{3,}', '\n\n', html)
80
+ return html.strip()
81
+
82
+
83
+ class DocFetcher:
84
+ """
85
+ Fetches documentation pages and converts them to DocPage objects.
86
+ """
87
+
88
+ def __init__(self, timeout: int = 30):
89
+ self.timeout = timeout
90
+ self.session = requests.Session()
91
+ self.session.headers.update({
92
+ "User-Agent": "vROM-Hub-Backend/0.1.0"
93
+ })
94
+
95
+ def fetch_url(self, url: str, title: str | None = None) -> DocPage:
96
+ """
97
+ Fetch a documentation page from a URL.
98
+
99
+ Handles both markdown and HTML responses.
100
+ """
101
+ logger.info(f"Fetching: {url}")
102
+ resp = self.session.get(url, timeout=self.timeout)
103
+ resp.raise_for_status()
104
+
105
+ content_type = resp.headers.get("content-type", "")
106
+ content = resp.text
107
+
108
+ # Convert HTML to markdown if needed
109
+ if "html" in content_type.lower():
110
+ content = _html_to_markdown(content)
111
+
112
+ if title is None:
113
+ title = _extract_title_from_markdown(content)
114
+
115
+ source_file = _url_to_source_file(url)
116
+
117
+ return DocPage(
118
+ content=content,
119
+ source_file=source_file,
120
+ url=url,
121
+ title=title,
122
+ )
123
+
124
+ def from_markdown(
125
+ self,
126
+ content: str,
127
+ url: str = "",
128
+ source_file: str = "doc.md",
129
+ title: str | None = None,
130
+ ) -> DocPage:
131
+ """Create a DocPage from raw markdown content."""
132
+ if title is None:
133
+ title = _extract_title_from_markdown(content)
134
+ return DocPage(
135
+ content=content,
136
+ source_file=source_file,
137
+ url=url,
138
+ title=title,
139
+ )
140
+
141
+ def fetch_hf_docs(self, endpoint: str, pages: list[str] | None = None) -> list[DocPage]:
142
+ """
143
+ Fetch documentation pages from Hugging Face docs.
144
+
145
+ Args:
146
+ endpoint: Library name (e.g. "trl", "transformers", "peft")
147
+ pages: Specific page paths to fetch (e.g. ["index", "sft_trainer"]).
148
+ If None, fetches the index page.
149
+
150
+ Returns:
151
+ List of DocPage objects
152
+ """
153
+ if pages is None:
154
+ pages = ["index"]
155
+
156
+ doc_pages = []
157
+ base_url = f"https://huggingface.co/docs/{endpoint}"
158
+
159
+ for page in pages:
160
+ url = f"{base_url}/{page}"
161
+ md_url = f"https://huggingface.co/docs/{endpoint}/{page}.md"
162
+ try:
163
+ # Try markdown version first
164
+ doc_page = self.fetch_url(md_url, title=None)
165
+ doc_page.url = url # Use clean URL
166
+ doc_pages.append(doc_page)
167
+ except requests.HTTPError:
168
+ try:
169
+ # Fallback to HTML
170
+ doc_page = self.fetch_url(url, title=None)
171
+ doc_pages.append(doc_page)
172
+ except requests.HTTPError as e:
173
+ logger.warning(f"Failed to fetch {url}: {e}")
174
+
175
+ return doc_pages
176
+
177
+ def from_pages(self, pages: list[dict]) -> list[DocPage]:
178
+ """
179
+ Convert a list of page dicts to DocPage objects.
180
+
181
+ Each dict should have:
182
+ - content: str (markdown)
183
+ - url: str (optional)
184
+ - source_file: str (optional)
185
+ - title: str (optional)
186
+ """
187
+ doc_pages = []
188
+ for p in pages:
189
+ content = p["content"]
190
+ url = p.get("url", "")
191
+ source_file = p.get("source_file", _url_to_source_file(url) if url else "doc.md")
192
+ title = p.get("title", _extract_title_from_markdown(content))
193
+ doc_pages.append(DocPage(
194
+ content=content,
195
+ source_file=source_file,
196
+ url=url,
197
+ title=title,
198
+ ))
199
+ return doc_pages