oracle-llm / data_loader.py
orachamp1981's picture
Upload 12 files
641c418 verified
#data_loader
import os
import re
import json
import csv
import codecs
import requests
import PyPDF2
from docx import Document
import openpyxl
from bs4 import BeautifulSoup
# ? Normalize utility
def normalize_prompt(text):
text = text.strip().lower()
text = re.sub(r"[;:,.!?]+$", "", text)
text = re.sub(r"[^\w\s]", "", text)
#text = re.sub(r"\b(what|actually|please|tell|me|about|can|you|explain|is|the|do)\b", "", text)
text = re.sub(r"\b(what|actually|please|tell|me|about|can|you|explain|is|the|do|does|give|show)\b(?!\w)", "", text)
return re.sub(r"\s+", " ", text).strip()
# ? Output cleanup for SQL responses
def clean_sql_output(raw_text):
try:
decoded = codecs.decode(raw_text.strip(), 'unicode_escape')
return decoded.replace(";;", ";").replace("\n\n\n", "\n\n").strip()
except Exception as e:
print("?? Cleaning error:", e)
return raw_text.strip()
# ? Existing basic rule loader
def load_rules(file_path="data/train_data.txt"):
data = {}
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as file:
for line in file:
if "=" in line:
key, value = line.strip().split("=", 1)
data[key.strip().lower()] = clean_sql_output(value)
return data
# ? Domain routing logic
def detect_domain(prompt):
prompt = prompt.lower()
if any(word in prompt for word in ["salary", "financial", "transaction", "ledger"]):
return "data/finance.txt"
elif any(word in prompt for word in ["employee", "hr", "hiring"]):
return "data/hr.txt"
elif any(word in prompt for word in ["sale", "customer", "order"]):
return "data/sales.txt"
else:
return None
def load_rules_by_domain(prompt):
domain_file = detect_domain(prompt)
if domain_file and os.path.exists(domain_file):
domain_rules = load_rules(domain_file)
if prompt in domain_rules:
return domain_rules[prompt]
return None
# ? Extended loaders for structured files
def load_txt(path):
pairs = []
with open(path, 'r', encoding='utf-8') as f:
for line in f:
if '=' in line:
prompt, answer = line.split('=', 1)
pairs.append((normalize_prompt(prompt), answer.strip()))
return pairs
def load_json(path):
pairs = []
with open(path, 'r', encoding='utf-8') as f:
for entry in json.load(f):
pairs.append((normalize_prompt(entry['prompt']), entry['answer'].strip()))
return pairs
def load_csv(path):
pairs = []
with open(path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if 'prompt' in row and 'answer' in row:
pairs.append((normalize_prompt(row['prompt']), row['answer'].strip()))
return pairs
def load_pdf(path):
pairs = []
with open(path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
text = "\n".join([p.extract_text() for p in reader.pages if p.extract_text()])
for line in text.split("\n"):
if '=' in line:
prompt, answer = line.split('=', 1)
pairs.append((normalize_prompt(prompt), answer.strip()))
return pairs
def load_docx(path):
pairs = []
doc = Document(path)
for para in doc.paragraphs:
if "=" in para.text:
prompt, answer = para.text.split("=", 1)
pairs.append((normalize_prompt(prompt), answer.strip()))
return pairs
def load_xlsx(path):
pairs = []
wb = openpyxl.load_workbook(path)
for sheet in wb.worksheets:
for row in sheet.iter_rows(values_only=True):
if not row or len(row) < 2:
continue
prompt, answer = row[0], row[1]
if isinstance(prompt, str) and isinstance(answer, str) and "=" not in prompt:
pairs.append((normalize_prompt(prompt), answer.strip()))
elif isinstance(prompt, str) and "=" in prompt:
p, a = prompt.split("=", 1)
pairs.append((normalize_prompt(p), a.strip()))
return pairs
# ? Load from GitHub/HuggingFace (TXT/JSON)
def fetch_text_from_url(url):
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.text
except Exception as e:
print(f"?? Error reading remote file {url}: {e}")
return ""
# ? Dispatcher for local files
def load_prompts_from_file(path):
if path.endswith('.txt'):
return load_txt(path)
elif path.endswith('.json'):
return load_json(path)
elif path.endswith('.csv'):
return load_csv(path)
elif path.endswith('.pdf'):
return load_pdf(path)
elif path.endswith('.docx'):
return load_docx(path)
elif path.endswith('.xlsx'):
return load_xlsx(path)
else:
print(f"? Unsupported format: {path}")
return []
def load_prompts_from_url(url):
pairs = []
text = fetch_text_from_url(url)
if not text:
return []
if url.endswith(".txt"):
for line in text.splitlines():
if '=' in line:
prompt, answer = line.split('=', 1)
pairs.append((normalize_prompt(prompt), answer.strip()))
elif url.endswith(".json"):
try:
data = json.loads(text)
for entry in data:
pairs.append((normalize_prompt(entry['prompt']), entry['answer'].strip()))
except Exception as e:
print(f"?? JSON parsing failed: {e}")
return pairs
def load_prompt_pairs(path):
import json, csv
import requests
import io
import PyPDF2
def is_url(p): return p.startswith("http")
ext = path.split(".")[-1].lower()
data = []
if is_url(path):
response = requests.get(path)
response.raise_for_status()
content = response.content
if ext == "json":
parsed = json.loads(content.decode("utf-8"))
for entry in parsed:
data.append((normalize_prompt(entry['prompt']), entry['answer'].strip()))
elif ext == "csv":
reader = csv.DictReader(io.StringIO(content.decode("utf-8")))
for row in reader:
data.append((normalize_prompt(row['prompt']), row['answer'].strip()))
elif ext == "txt":
for line in content.decode("utf-8", errors="replace").splitlines():
if "=" in line:
p, a = line.split("=", 1)
data.append((normalize_prompt(p), a.strip()))
elif ext == "pdf":
reader = PyPDF2.PdfReader(io.BytesIO(content))
for page in reader.pages:
text = page.extract_text()
if text:
for line in text.splitlines():
if "=" in line:
p, a = line.split("=", 1)
data.append((normalize_prompt(p), a.strip()))
else:
with open(path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if "=" in line:
p, a = line.split("=", 1)
data.append((normalize_prompt(p), a.strip()))
return data
def list_files_from_github_folder(github_folder_url):
try:
html = requests.get(github_folder_url).text
soup = BeautifulSoup(html, "lxml")
links = soup.select("a.js-navigation-open")
raw_base = github_folder_url.replace("github.com", "raw.githubusercontent.com").replace("/blob", "")
file_links = []
for link in links:
href = link.get("href", "")
if any(href.endswith(ext) for ext in [".txt", ".json", ".csv", ".pdf", ".docx", ".xlsx"]):
file_links.append(f"https://{raw_base.split('/', 2)[-1].split('/')[0]}/{href.split('/', 2)[-1]}")
return file_links
except Exception as e:
print("⚠️ GitHub scan error:", e)
return []