| import os |
| import gradio as gr |
| import faiss |
| import pickle |
| import requests |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin, urlparse |
| from sentence_transformers import SentenceTransformer |
| from huggingface_hub import InferenceClient, HfApi |
|
|
| |
| HF_REPO_ID = "MoslemBot/kajiweb" |
| HF_API_TOKEN = os.getenv("HF_TOKEN") |
| api = HfApi() |
|
|
| def upload_to_hub(local_path, remote_path): |
| api.upload_file( |
| path_or_fileobj=local_path, |
| path_in_repo=remote_path, |
| repo_id=HF_REPO_ID, |
| repo_type="space", |
| token=HF_API_TOKEN |
| ) |
| print(f"β
Uploaded to Hub: {remote_path}") |
|
|
| |
| embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
| llm = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
| DATA_DIR = "data" |
| os.makedirs(DATA_DIR, exist_ok=True) |
|
|
| def extract_links_and_text(base_url, max_depth=1, visited=None): |
| if visited is None: |
| visited = set() |
| if base_url in visited or max_depth < 0: |
| return [] |
|
|
| visited.add(base_url) |
| print(f"π Crawling: {base_url}") |
| try: |
| response = requests.get(base_url, timeout=10) |
| response.raise_for_status() |
| soup = BeautifulSoup(response.text, 'html.parser') |
| page_text = ' '.join([p.get_text() for p in soup.find_all(['p', 'h1', 'h2', 'h3'])]) |
| result = [(page_text, base_url)] if page_text.strip() else [] |
|
|
| links = set() |
| for a in soup.find_all("a", href=True): |
| href = a["href"] |
| full_url = urljoin(base_url, href) |
| if urlparse(full_url).netloc == urlparse(base_url).netloc: |
| links.add(full_url) |
|
|
| for link in links: |
| result.extend(extract_links_and_text(link, max_depth=max_depth-1, visited=visited)) |
| return result |
| except Exception as e: |
| print(f"β Failed to fetch {base_url}: {e}") |
| return [] |
|
|
| |
| def save_webpage(url, title): |
| folder = os.path.join(DATA_DIR, title.strip()) |
| if os.path.exists(folder): |
| return f"'{title}' already exists. Use a different title." |
|
|
| os.makedirs(folder, exist_ok=True) |
|
|
| |
| page_data = extract_links_and_text(url, max_depth=1) |
|
|
| if not page_data: |
| return "β No text extracted from the webpage." |
|
|
| |
| chunks = [] |
| sources = [] |
| for text, source_url in page_data: |
| for i in range(0, len(text), 500): |
| chunk = text[i:i+500] |
| chunks.append(chunk) |
| sources.append(source_url) |
|
|
| |
| embeddings = embedder.encode(chunks) |
|
|
| print("Embeddings shape:", embeddings.shape) |
| if len(embeddings.shape) != 2: |
| raise ValueError(f"Expected 2D embeddings, got shape {embeddings.shape}") |
|
|
| index = faiss.IndexFlatL2(embeddings.shape[1]) |
| index.add(embeddings) |
|
|
| |
| index_path = os.path.join(folder, "index.faiss") |
| meta_path = os.path.join(folder, "meta.pkl") |
| faiss.write_index(index, index_path) |
| with open(meta_path, "wb") as f: |
| pickle.dump(list(zip(chunks, sources)), f) |
|
|
| |
| upload_to_hub(index_path, f"data/{title}/index.faiss") |
| upload_to_hub(meta_path, f"data/{title}/meta.pkl") |
|
|
| return f"β
Saved and indexed '{title}', and uploaded to Hub. Please reload (refresh) the page." |
|
|
| |
| def list_titles(): |
| print(f"Listing in: {DATA_DIR} β {os.listdir(DATA_DIR)}") |
| return [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))] |
|
|
| |
| def ask_question(message, history, selected_titles): |
| if not selected_titles: |
| return "β Please select at least one webpage." |
|
|
| combined_answer = "" |
| for title in selected_titles: |
| folder = os.path.join(DATA_DIR, title) |
| try: |
| index = faiss.read_index(os.path.join(folder, "index.faiss")) |
| with open(os.path.join(folder, "meta.pkl"), "rb") as f: |
| chunk_data = pickle.load(f) |
|
|
| chunks = [cd[0] for cd in chunk_data] |
| urls = [cd[1] for cd in chunk_data] |
|
|
| q_embed = embedder.encode([message]) |
| D, I = index.search(q_embed, k=3) |
|
|
| response_context = "" |
| sources_set = set() |
| for idx in I[0]: |
| response_context += f"[{urls[idx]}]\n{chunks[idx]}\n\n" |
| sources_set.add(urls[idx]) |
|
|
| response = llm.chat_completion( |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant. Answer based only on the given context."}, |
| {"role": "user", "content": f"Context:\n{response_context}\n\nQuestion: {message}"} |
| ], |
| model="deepseek-ai/DeepSeek-R1-0528", |
| max_tokens=2048, |
| ) |
|
|
| response = response.choices[0].message["content"] |
| combined_answer += f"**{title}** (sources: {', '.join(sources_set)}):\n{response.strip()}\n\n" |
| except Exception as e: |
| combined_answer += f"β οΈ Error with {title}: {str(e)}\n\n" |
|
|
| return combined_answer.strip() |
|
|
| |
| with gr.Blocks() as demo: |
| with gr.Tab("π Index Web Page"): |
| url = gr.Textbox(label="Web Page URL") |
| title = gr.Textbox(label="Title for Web Page") |
| index_btn = gr.Button("Fetch and Index (with crawl)") |
| index_status = gr.Textbox(label="Status") |
| index_btn.click(fn=save_webpage, inputs=[url, title], outputs=index_status) |
|
|
| with gr.Tab("π¬ Chat with Web Pages"): |
| page_selector = gr.CheckboxGroup(label="Select Indexed Pages", choices=list_titles()) |
| refresh_btn = gr.Button("π Refresh List") |
| refresh_btn.click(fn=list_titles, outputs=page_selector) |
| chat = gr.ChatInterface(fn=ask_question, additional_inputs=[page_selector]) |
|
|
| demo.launch() |
|
|