#data_loader import os import re import json import csv import codecs import requests import PyPDF2 from docx import Document import openpyxl from bs4 import BeautifulSoup # ? Normalize utility def normalize_prompt(text): text = text.strip().lower() text = re.sub(r"[;:,.!?]+$", "", text) text = re.sub(r"[^\w\s]", "", text) #text = re.sub(r"\b(what|actually|please|tell|me|about|can|you|explain|is|the|do)\b", "", text) text = re.sub(r"\b(what|actually|please|tell|me|about|can|you|explain|is|the|do|does|give|show)\b(?!\w)", "", text) return re.sub(r"\s+", " ", text).strip() # ? Output cleanup for SQL responses def clean_sql_output(raw_text): try: decoded = codecs.decode(raw_text.strip(), 'unicode_escape') return decoded.replace(";;", ";").replace("\n\n\n", "\n\n").strip() except Exception as e: print("?? Cleaning error:", e) return raw_text.strip() # ? Existing basic rule loader def load_rules(file_path="data/train_data.txt"): data = {} if os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as file: for line in file: if "=" in line: key, value = line.strip().split("=", 1) data[key.strip().lower()] = clean_sql_output(value) return data # ? Domain routing logic def detect_domain(prompt): prompt = prompt.lower() if any(word in prompt for word in ["salary", "financial", "transaction", "ledger"]): return "data/finance.txt" elif any(word in prompt for word in ["employee", "hr", "hiring"]): return "data/hr.txt" elif any(word in prompt for word in ["sale", "customer", "order"]): return "data/sales.txt" else: return None def load_rules_by_domain(prompt): domain_file = detect_domain(prompt) if domain_file and os.path.exists(domain_file): domain_rules = load_rules(domain_file) if prompt in domain_rules: return domain_rules[prompt] return None # ? Extended loaders for structured files def load_txt(path): pairs = [] with open(path, 'r', encoding='utf-8') as f: for line in f: if '=' in line: prompt, answer = line.split('=', 1) pairs.append((normalize_prompt(prompt), answer.strip())) return pairs def load_json(path): pairs = [] with open(path, 'r', encoding='utf-8') as f: for entry in json.load(f): pairs.append((normalize_prompt(entry['prompt']), entry['answer'].strip())) return pairs def load_csv(path): pairs = [] with open(path, newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: if 'prompt' in row and 'answer' in row: pairs.append((normalize_prompt(row['prompt']), row['answer'].strip())) return pairs def load_pdf(path): pairs = [] with open(path, 'rb') as f: reader = PyPDF2.PdfReader(f) text = "\n".join([p.extract_text() for p in reader.pages if p.extract_text()]) for line in text.split("\n"): if '=' in line: prompt, answer = line.split('=', 1) pairs.append((normalize_prompt(prompt), answer.strip())) return pairs def load_docx(path): pairs = [] doc = Document(path) for para in doc.paragraphs: if "=" in para.text: prompt, answer = para.text.split("=", 1) pairs.append((normalize_prompt(prompt), answer.strip())) return pairs def load_xlsx(path): pairs = [] wb = openpyxl.load_workbook(path) for sheet in wb.worksheets: for row in sheet.iter_rows(values_only=True): if not row or len(row) < 2: continue prompt, answer = row[0], row[1] if isinstance(prompt, str) and isinstance(answer, str) and "=" not in prompt: pairs.append((normalize_prompt(prompt), answer.strip())) elif isinstance(prompt, str) and "=" in prompt: p, a = prompt.split("=", 1) pairs.append((normalize_prompt(p), a.strip())) return pairs # ? Load from GitHub/HuggingFace (TXT/JSON) def fetch_text_from_url(url): try: resp = requests.get(url, timeout=10) resp.raise_for_status() return resp.text except Exception as e: print(f"?? Error reading remote file {url}: {e}") return "" # ? Dispatcher for local files def load_prompts_from_file(path): if path.endswith('.txt'): return load_txt(path) elif path.endswith('.json'): return load_json(path) elif path.endswith('.csv'): return load_csv(path) elif path.endswith('.pdf'): return load_pdf(path) elif path.endswith('.docx'): return load_docx(path) elif path.endswith('.xlsx'): return load_xlsx(path) else: print(f"? Unsupported format: {path}") return [] def load_prompts_from_url(url): pairs = [] text = fetch_text_from_url(url) if not text: return [] if url.endswith(".txt"): for line in text.splitlines(): if '=' in line: prompt, answer = line.split('=', 1) pairs.append((normalize_prompt(prompt), answer.strip())) elif url.endswith(".json"): try: data = json.loads(text) for entry in data: pairs.append((normalize_prompt(entry['prompt']), entry['answer'].strip())) except Exception as e: print(f"?? JSON parsing failed: {e}") return pairs def load_prompt_pairs(path): import json, csv import requests import io import PyPDF2 def is_url(p): return p.startswith("http") ext = path.split(".")[-1].lower() data = [] if is_url(path): response = requests.get(path) response.raise_for_status() content = response.content if ext == "json": parsed = json.loads(content.decode("utf-8")) for entry in parsed: data.append((normalize_prompt(entry['prompt']), entry['answer'].strip())) elif ext == "csv": reader = csv.DictReader(io.StringIO(content.decode("utf-8"))) for row in reader: data.append((normalize_prompt(row['prompt']), row['answer'].strip())) elif ext == "txt": for line in content.decode("utf-8", errors="replace").splitlines(): if "=" in line: p, a = line.split("=", 1) data.append((normalize_prompt(p), a.strip())) elif ext == "pdf": reader = PyPDF2.PdfReader(io.BytesIO(content)) for page in reader.pages: text = page.extract_text() if text: for line in text.splitlines(): if "=" in line: p, a = line.split("=", 1) data.append((normalize_prompt(p), a.strip())) else: with open(path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() for line in lines: line = line.strip() if "=" in line: p, a = line.split("=", 1) data.append((normalize_prompt(p), a.strip())) return data def list_files_from_github_folder(github_folder_url): try: html = requests.get(github_folder_url).text soup = BeautifulSoup(html, "lxml") links = soup.select("a.js-navigation-open") raw_base = github_folder_url.replace("github.com", "raw.githubusercontent.com").replace("/blob", "") file_links = [] for link in links: href = link.get("href", "") if any(href.endswith(ext) for ext in [".txt", ".json", ".csv", ".pdf", ".docx", ".xlsx"]): file_links.append(f"https://{raw_base.split('/', 2)[-1].split('/')[0]}/{href.split('/', 2)[-1]}") return file_links except Exception as e: print("⚠️ GitHub scan error:", e) return []