| | import os |
| | import gradio as gr |
| | import faiss |
| | import pickle |
| | import requests |
| | from bs4 import BeautifulSoup |
| | from urllib.parse import urljoin, urlparse |
| | from sentence_transformers import SentenceTransformer |
| | from huggingface_hub import InferenceClient, HfApi |
| |
|
| | |
| | HF_REPO_ID = "MoslemBot/kajiweb" |
| | HF_API_TOKEN = os.getenv("HF_TOKEN") |
| | api = HfApi() |
| |
|
| | def upload_to_hub(local_path, remote_path): |
| | api.upload_file( |
| | path_or_fileobj=local_path, |
| | path_in_repo=remote_path, |
| | repo_id=HF_REPO_ID, |
| | repo_type="space", |
| | token=HF_API_TOKEN |
| | ) |
| | print(f"β
Uploaded to Hub: {remote_path}") |
| |
|
| | |
| | embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
| | llm = InferenceClient(token=os.getenv("HF_TOKEN")) |
| |
|
| | DATA_DIR = "data" |
| | os.makedirs(DATA_DIR, exist_ok=True) |
| |
|
| | def extract_links_and_text(base_url, max_depth=1, visited=None): |
| | if visited is None: |
| | visited = set() |
| | if base_url in visited or max_depth < 0: |
| | return [] |
| |
|
| | visited.add(base_url) |
| | print(f"π Crawling: {base_url}") |
| | try: |
| | response = requests.get(base_url, timeout=10) |
| | response.raise_for_status() |
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | page_text = ' '.join([p.get_text() for p in soup.find_all(['p', 'h1', 'h2', 'h3'])]) |
| | result = [(page_text, base_url)] if page_text.strip() else [] |
| |
|
| | links = set() |
| | for a in soup.find_all("a", href=True): |
| | href = a["href"] |
| | full_url = urljoin(base_url, href) |
| | if urlparse(full_url).netloc == urlparse(base_url).netloc: |
| | links.add(full_url) |
| |
|
| | for link in links: |
| | result.extend(extract_links_and_text(link, max_depth=max_depth-1, visited=visited)) |
| | return result |
| | except Exception as e: |
| | print(f"β Failed to fetch {base_url}: {e}") |
| | return [] |
| |
|
| | |
| | def save_webpage(url, title): |
| | folder = os.path.join(DATA_DIR, title.strip()) |
| | if os.path.exists(folder): |
| | return f"'{title}' already exists. Use a different title." |
| |
|
| | os.makedirs(folder, exist_ok=True) |
| |
|
| | |
| | page_data = extract_links_and_text(url, max_depth=1) |
| |
|
| | if not page_data: |
| | return "β No text extracted from the webpage." |
| |
|
| | |
| | chunks = [] |
| | sources = [] |
| | for text, source_url in page_data: |
| | for i in range(0, len(text), 500): |
| | chunk = text[i:i+500] |
| | chunks.append(chunk) |
| | sources.append(source_url) |
| |
|
| | |
| | embeddings = embedder.encode(chunks) |
| |
|
| | print("Embeddings shape:", embeddings.shape) |
| | if len(embeddings.shape) != 2: |
| | raise ValueError(f"Expected 2D embeddings, got shape {embeddings.shape}") |
| |
|
| | index = faiss.IndexFlatL2(embeddings.shape[1]) |
| | index.add(embeddings) |
| |
|
| | |
| | index_path = os.path.join(folder, "index.faiss") |
| | meta_path = os.path.join(folder, "meta.pkl") |
| | faiss.write_index(index, index_path) |
| | with open(meta_path, "wb") as f: |
| | pickle.dump(list(zip(chunks, sources)), f) |
| |
|
| | |
| | upload_to_hub(index_path, f"data/{title}/index.faiss") |
| | upload_to_hub(meta_path, f"data/{title}/meta.pkl") |
| |
|
| | return f"β
Saved and indexed '{title}', and uploaded to Hub. Please reload (refresh) the page." |
| |
|
| | |
| | def list_titles(): |
| | print(f"Listing in: {DATA_DIR} β {os.listdir(DATA_DIR)}") |
| | return [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))] |
| |
|
| | |
| | def ask_question(message, history, selected_titles): |
| | if not selected_titles: |
| | return "β Please select at least one webpage." |
| |
|
| | combined_answer = "" |
| | for title in selected_titles: |
| | folder = os.path.join(DATA_DIR, title) |
| | try: |
| | index = faiss.read_index(os.path.join(folder, "index.faiss")) |
| | with open(os.path.join(folder, "meta.pkl"), "rb") as f: |
| | chunk_data = pickle.load(f) |
| |
|
| | chunks = [cd[0] for cd in chunk_data] |
| | urls = [cd[1] for cd in chunk_data] |
| |
|
| | q_embed = embedder.encode([message]) |
| | D, I = index.search(q_embed, k=3) |
| |
|
| | response_context = "" |
| | sources_set = set() |
| | for idx in I[0]: |
| | response_context += f"[{urls[idx]}]\n{chunks[idx]}\n\n" |
| | sources_set.add(urls[idx]) |
| |
|
| | response = llm.chat_completion( |
| | messages=[ |
| | {"role": "system", "content": "You are a helpful assistant. Answer based only on the given context."}, |
| | {"role": "user", "content": f"Context:\n{response_context}\n\nQuestion: {message}"} |
| | ], |
| | model="deepseek-ai/DeepSeek-R1-0528", |
| | max_tokens=2048, |
| | ) |
| |
|
| | response = response.choices[0].message["content"] |
| | combined_answer += f"**{title}** (sources: {', '.join(sources_set)}):\n{response.strip()}\n\n" |
| | except Exception as e: |
| | combined_answer += f"β οΈ Error with {title}: {str(e)}\n\n" |
| |
|
| | return combined_answer.strip() |
| |
|
| | |
| | with gr.Blocks() as demo: |
| | with gr.Tab("π Index Web Page"): |
| | url = gr.Textbox(label="Web Page URL") |
| | title = gr.Textbox(label="Title for Web Page") |
| | index_btn = gr.Button("Fetch and Index (with crawl)") |
| | index_status = gr.Textbox(label="Status") |
| | index_btn.click(fn=save_webpage, inputs=[url, title], outputs=index_status) |
| |
|
| | with gr.Tab("π¬ Chat with Web Pages"): |
| | page_selector = gr.CheckboxGroup(label="Select Indexed Pages", choices=list_titles()) |
| | refresh_btn = gr.Button("π Refresh List") |
| | refresh_btn.click(fn=list_titles, outputs=page_selector) |
| | chat = gr.ChatInterface(fn=ask_question, additional_inputs=[page_selector]) |
| |
|
| | demo.launch() |
| |
|