Spaces:
Running
Running
| #data_loader | |
| import os | |
| import re | |
| import json | |
| import csv | |
| import codecs | |
| import requests | |
| import PyPDF2 | |
| from docx import Document | |
| import openpyxl | |
| from bs4 import BeautifulSoup | |
| # ? Normalize utility | |
| def normalize_prompt(text): | |
| text = text.strip().lower() | |
| text = re.sub(r"[;:,.!?]+$", "", text) | |
| text = re.sub(r"[^\w\s]", "", text) | |
| #text = re.sub(r"\b(what|actually|please|tell|me|about|can|you|explain|is|the|do)\b", "", text) | |
| text = re.sub(r"\b(what|actually|please|tell|me|about|can|you|explain|is|the|do|does|give|show)\b(?!\w)", "", text) | |
| return re.sub(r"\s+", " ", text).strip() | |
| # ? Output cleanup for SQL responses | |
| def clean_sql_output(raw_text): | |
| try: | |
| decoded = codecs.decode(raw_text.strip(), 'unicode_escape') | |
| return decoded.replace(";;", ";").replace("\n\n\n", "\n\n").strip() | |
| except Exception as e: | |
| print("?? Cleaning error:", e) | |
| return raw_text.strip() | |
| # ? Existing basic rule loader | |
| def load_rules(file_path="data/train_data.txt"): | |
| data = {} | |
| if os.path.exists(file_path): | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| for line in file: | |
| if "=" in line: | |
| key, value = line.strip().split("=", 1) | |
| data[key.strip().lower()] = clean_sql_output(value) | |
| return data | |
| # ? Domain routing logic | |
| def detect_domain(prompt): | |
| prompt = prompt.lower() | |
| if any(word in prompt for word in ["salary", "financial", "transaction", "ledger"]): | |
| return "data/finance.txt" | |
| elif any(word in prompt for word in ["employee", "hr", "hiring"]): | |
| return "data/hr.txt" | |
| elif any(word in prompt for word in ["sale", "customer", "order"]): | |
| return "data/sales.txt" | |
| else: | |
| return None | |
| def load_rules_by_domain(prompt): | |
| domain_file = detect_domain(prompt) | |
| if domain_file and os.path.exists(domain_file): | |
| domain_rules = load_rules(domain_file) | |
| if prompt in domain_rules: | |
| return domain_rules[prompt] | |
| return None | |
| # ? Extended loaders for structured files | |
| def load_txt(path): | |
| pairs = [] | |
| with open(path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if '=' in line: | |
| prompt, answer = line.split('=', 1) | |
| pairs.append((normalize_prompt(prompt), answer.strip())) | |
| return pairs | |
| def load_json(path): | |
| pairs = [] | |
| with open(path, 'r', encoding='utf-8') as f: | |
| for entry in json.load(f): | |
| pairs.append((normalize_prompt(entry['prompt']), entry['answer'].strip())) | |
| return pairs | |
| def load_csv(path): | |
| pairs = [] | |
| with open(path, newline='', encoding='utf-8') as csvfile: | |
| reader = csv.DictReader(csvfile) | |
| for row in reader: | |
| if 'prompt' in row and 'answer' in row: | |
| pairs.append((normalize_prompt(row['prompt']), row['answer'].strip())) | |
| return pairs | |
| def load_pdf(path): | |
| pairs = [] | |
| with open(path, 'rb') as f: | |
| reader = PyPDF2.PdfReader(f) | |
| text = "\n".join([p.extract_text() for p in reader.pages if p.extract_text()]) | |
| for line in text.split("\n"): | |
| if '=' in line: | |
| prompt, answer = line.split('=', 1) | |
| pairs.append((normalize_prompt(prompt), answer.strip())) | |
| return pairs | |
| def load_docx(path): | |
| pairs = [] | |
| doc = Document(path) | |
| for para in doc.paragraphs: | |
| if "=" in para.text: | |
| prompt, answer = para.text.split("=", 1) | |
| pairs.append((normalize_prompt(prompt), answer.strip())) | |
| return pairs | |
| def load_xlsx(path): | |
| pairs = [] | |
| wb = openpyxl.load_workbook(path) | |
| for sheet in wb.worksheets: | |
| for row in sheet.iter_rows(values_only=True): | |
| if not row or len(row) < 2: | |
| continue | |
| prompt, answer = row[0], row[1] | |
| if isinstance(prompt, str) and isinstance(answer, str) and "=" not in prompt: | |
| pairs.append((normalize_prompt(prompt), answer.strip())) | |
| elif isinstance(prompt, str) and "=" in prompt: | |
| p, a = prompt.split("=", 1) | |
| pairs.append((normalize_prompt(p), a.strip())) | |
| return pairs | |
| # ? Load from GitHub/HuggingFace (TXT/JSON) | |
| def fetch_text_from_url(url): | |
| try: | |
| resp = requests.get(url, timeout=10) | |
| resp.raise_for_status() | |
| return resp.text | |
| except Exception as e: | |
| print(f"?? Error reading remote file {url}: {e}") | |
| return "" | |
| # ? Dispatcher for local files | |
| def load_prompts_from_file(path): | |
| if path.endswith('.txt'): | |
| return load_txt(path) | |
| elif path.endswith('.json'): | |
| return load_json(path) | |
| elif path.endswith('.csv'): | |
| return load_csv(path) | |
| elif path.endswith('.pdf'): | |
| return load_pdf(path) | |
| elif path.endswith('.docx'): | |
| return load_docx(path) | |
| elif path.endswith('.xlsx'): | |
| return load_xlsx(path) | |
| else: | |
| print(f"? Unsupported format: {path}") | |
| return [] | |
| def load_prompts_from_url(url): | |
| pairs = [] | |
| text = fetch_text_from_url(url) | |
| if not text: | |
| return [] | |
| if url.endswith(".txt"): | |
| for line in text.splitlines(): | |
| if '=' in line: | |
| prompt, answer = line.split('=', 1) | |
| pairs.append((normalize_prompt(prompt), answer.strip())) | |
| elif url.endswith(".json"): | |
| try: | |
| data = json.loads(text) | |
| for entry in data: | |
| pairs.append((normalize_prompt(entry['prompt']), entry['answer'].strip())) | |
| except Exception as e: | |
| print(f"?? JSON parsing failed: {e}") | |
| return pairs | |
| def load_prompt_pairs(path): | |
| import json, csv | |
| import requests | |
| import io | |
| import PyPDF2 | |
| def is_url(p): return p.startswith("http") | |
| ext = path.split(".")[-1].lower() | |
| data = [] | |
| if is_url(path): | |
| response = requests.get(path) | |
| response.raise_for_status() | |
| content = response.content | |
| if ext == "json": | |
| parsed = json.loads(content.decode("utf-8")) | |
| for entry in parsed: | |
| data.append((normalize_prompt(entry['prompt']), entry['answer'].strip())) | |
| elif ext == "csv": | |
| reader = csv.DictReader(io.StringIO(content.decode("utf-8"))) | |
| for row in reader: | |
| data.append((normalize_prompt(row['prompt']), row['answer'].strip())) | |
| elif ext == "txt": | |
| for line in content.decode("utf-8", errors="replace").splitlines(): | |
| if "=" in line: | |
| p, a = line.split("=", 1) | |
| data.append((normalize_prompt(p), a.strip())) | |
| elif ext == "pdf": | |
| reader = PyPDF2.PdfReader(io.BytesIO(content)) | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| if text: | |
| for line in text.splitlines(): | |
| if "=" in line: | |
| p, a = line.split("=", 1) | |
| data.append((normalize_prompt(p), a.strip())) | |
| else: | |
| with open(path, "r", encoding="utf-8", errors="replace") as f: | |
| lines = f.readlines() | |
| for line in lines: | |
| line = line.strip() | |
| if "=" in line: | |
| p, a = line.split("=", 1) | |
| data.append((normalize_prompt(p), a.strip())) | |
| return data | |
| def list_files_from_github_folder(github_folder_url): | |
| try: | |
| html = requests.get(github_folder_url).text | |
| soup = BeautifulSoup(html, "lxml") | |
| links = soup.select("a.js-navigation-open") | |
| raw_base = github_folder_url.replace("github.com", "raw.githubusercontent.com").replace("/blob", "") | |
| file_links = [] | |
| for link in links: | |
| href = link.get("href", "") | |
| if any(href.endswith(ext) for ext in [".txt", ".json", ".csv", ".pdf", ".docx", ".xlsx"]): | |
| file_links.append(f"https://{raw_base.split('/', 2)[-1].split('/')[0]}/{href.split('/', 2)[-1]}") | |
| return file_links | |
| except Exception as e: | |
| print("⚠️ GitHub scan error:", e) | |
| return [] | |