ilacChatBot / app /ingest.py
emrecn's picture
HF Spaces deploy: temiz tek-commit history, chroma_db Git LFS üzerinden
efc4680
import argparse
import os
import re
import hashlib
import json
from pathlib import Path
from typing import Optional
from langchain_core.documents import Document
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import JinaEmbeddings
from langchain_community.vectorstores import Chroma
from dotenv import load_dotenv
from app.logger import get_logger
load_dotenv()
logger = get_logger("ingest")
CHROMA_DB_DIR = "./chroma_db"
MANIFEST_PATH = "./manifest.json"
def _loose(phrase: str) -> str:
"""Bir ifadeyi, PDF çıkarımında kelime içine serpilmiş rastgele
boşluklara ('Ku llanm a') toleranslı bir regex'e çevirir.
Boşluklar \\s+, diğer tokenlar arasına \\s* eklenir. '.' -> \\.?,
':' -> :?, '?' önceki tokenı opsiyonel yapmak yerine literal
bırakmaz (opsiyonel karakterler nokta/:).
Karakter sınıfları ([ıi] gibi) tek token sayılır.
"""
out: list[str] = []
i, n = 0, len(phrase)
while i < n:
c = phrase[i]
if c.isspace():
out.append(r"\s+")
while i < n and phrase[i].isspace():
i += 1
continue
if c == "[":
j = phrase.index("]", i)
token = phrase[i : j + 1]
i = j + 1
elif c == ".":
if out and out[-1] == r"\s*":
out.pop()
out.append(r"\.?")
i += 1
continue
elif c == ":":
if out and out[-1] == r"\s*":
out.pop()
out.append(r":?")
i += 1
continue
elif c == "?":
i += 1
continue
else:
token = c
i += 1
if out and not out[-1].endswith(r"\s+"):
out.append(r"\s*")
out.append(token)
return "".join(out)
def get_file_hash(filepath: Path) -> str:
hasher = hashlib.md5()
with open(filepath, 'rb') as f:
buf = f.read()
hasher.update(buf)
return hasher.hexdigest()
def _extract_drug_id_from_filename(filepath: Path) -> Optional[str]:
"""Dosya adından ilaç adını çeker. Format: {İLAÇ ADI}__{random}.pdf"""
stem = filepath.stem
if '__' in stem:
part = stem.split('__')[0].strip()
part = re.sub(r'\s+', ' ', part)
if part:
return part
return None
def extract_drug_id(doc_path: Path, first_page_text: str) -> str:
# 1. Ana yöntem: Dosya adından çek (ilaç adı __ ayracından önceki kısım)
drug_name = _extract_drug_id_from_filename(doc_path)
if drug_name:
logger.info(f"Dosya adından tespit edildi: {doc_path.name}{drug_name}")
return drug_name
# 2. KULLANMA TALİMATI başlığından sonra ilaç adını topla
lines = first_page_text.split('\n')
start_collecting = False
drug_name_lines = []
stop_prefixes = [
"ağız", "oral", "deri", "kas", "damar",
"etkin madde", "yardımcı madde",
"ağızdan", "kas içine", "damar içine",
"cilt üzerine", "deri altına",
"bu kullanma talimatında", "kullanmadan önce"
]
for line in lines:
clean_line = line.strip()
if not start_collecting:
if "KULLANMA TALİMATI" in clean_line.upper():
start_collecting = True
continue
if not clean_line:
continue
lower_line = clean_line.lower().lstrip("•.-* ")
if any(lower_line.startswith(prefix) for prefix in stop_prefixes):
break
drug_name_lines.append(clean_line)
if drug_name_lines:
result = " ".join(drug_name_lines).replace("®", "").strip()
logger.warning(f"KULLANMA TALİMATI yöntemi kullanıldı: {doc_path.name}{result}")
return result
# 3. Regex: İlk sayfada "mg", "tablet", "kapsül" vb. içeren satırları ara
drug_pattern = re.compile(
r'^(.+(?:mg|mcg|mikrogram|ml|IU).+(?:tablet|kapsül|kapsul|film|şurup|surup|jel|krem|damla|flakon|süspansiyon|suspansiyon|sprey|ampul|enjektabl).*?)$',
re.IGNORECASE | re.MULTILINE
)
match = drug_pattern.search(first_page_text[:1000])
if match:
result = match.group(1).replace("®", "").strip()
logger.warning(f"Regex yöntemi kullanıldı: {doc_path.name}{result}")
return result
# 4. Tespit edilemedi — None döndür, process_pdfs atlayacak
logger.warning(f"İlaç adı tespit edilemedi, atlanıyor: {doc_path.name}")
return None
def split_kt_by_sections(text: str, drug_id: str, file_hash: str) -> "list[Document]":
# Başlıkları yakalayacak esnek regex desenleri
patterns = {
"1. İlaç nedir ve ne için kullanılır?": r"(?m)^\s*1\.\s+(?!\").*nedir\s+ve\s+ne\s+için\s+kullanılır[^)\"]*$",
"2. Kullanmadan önce dikkat edilmesi gerekenler": r"(?m)^\s*2\.\s+(?!\").*kullanmadan\s+önce\s+dikkat\s+edilmesi\s+gerekenler[^)\"]*$",
"3. Nasıl kullanılır?": r"(?m)^\s*3\.\s+(?!\").*nasıl\s+kullanılır[^)\"]*$",
"4. Olası yan etkiler nelerdir?": r"(?m)^\s*4\.\s+(?!\").*olası\s+yan\s+etkiler[^)\"]*$",
"5. Saklama koşulları": r"(?m)^\s*5\.\s+(?!\").*saklanması[^)\"]*$"
}
matches = []
for section_name, pattern in patterns.items():
# İlk eşleşmeyi bul
match = re.search(pattern, text, re.IGNORECASE)
if match:
matches.append({"name": section_name, "start": match.start()})
# Başlangıç indeksine göre sırala
matches.sort(key=lambda x: x["start"])
sections = []
if not matches:
# Hiç başlık bulunamazsa tüm metni tek bir genel bölüm olarak al
sections.append({"name": "Genel Bilgiler", "content": text.strip()})
else:
# Bulunan bölümleri ayır
for i in range(len(matches)):
# İlk başlıktan önceki metni (prelude) giriş bölümü yapmak yerine ilk bölümün başına dahil ediyoruz
start_index = 0 if i == 0 else matches[i]["start"]
end_index = matches[i+1]["start"] if i + 1 < len(matches) else len(text)
sections.append({
"name": matches[i]["name"],
"content": text[start_index:end_index].strip()
})
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1800,
chunk_overlap=300,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""]
)
docs = []
for sec in sections:
chunks = text_splitter.split_text(sec["content"])
for chunk in chunks:
# RAG performansını artırmak için ilaç adını bölüm başlığının başına ekliyoruz
chunk_text = f"[{drug_id} - {sec['name']}]\n\n{chunk}"
docs.append(Document(
page_content=chunk_text,
metadata={
"drug_id": drug_id,
"section": sec["name"],
"file_hash": file_hash
}
))
return docs
def _generate_drugs_list(db):
"""ChromaDB'den unique drug_id'leri çekip drugs_list.txt'ye yazar."""
try:
collection = db._collection
results = collection.get(include=["metadatas"])
drug_ids = set()
for meta in results["metadatas"]:
did = meta.get("drug_id", "")
if did and did != "SKIP":
drug_ids.add(did)
sorted_drugs = sorted(drug_ids, key=lambda x: x.lower())
with open("drugs_list.txt", "w", encoding="utf-8") as f:
for drug in sorted_drugs:
f.write(f"{drug}\n")
logger.info(f"drugs_list.txt güncellendi: {len(sorted_drugs)} ilaç")
except Exception as e:
logger.error(f"drugs_list.txt oluşturma hatası: {e}")
def process_pdfs(pdf_dir: str, mode: str):
import time
pdf_dir_path = Path(pdf_dir)
manifest = {}
if os.path.exists(MANIFEST_PATH):
with open(MANIFEST_PATH, "r") as f:
manifest = json.load(f)
db = Chroma(persist_directory=CHROMA_DB_DIR, embedding_function=JinaEmbeddings(jina_api_key=os.environ.get("JINA_API_KEY"), model_name="jina-embeddings-v3"))
for filepath in pdf_dir_path.glob("*.pdf"):
file_hash = get_file_hash(filepath)
old_hash = manifest.get(str(filepath))
if mode == "incremental" and old_hash == file_hash:
logger.debug(f"Atlanıyor (değişiklik yok): {filepath.name}")
continue
logger.info(f"İşleniyor: {filepath.name}")
loader = PyPDFLoader(str(filepath))
docs = loader.load()
if not docs:
logger.warning(f"Boş PDF: {filepath.name}")
continue
# Tüm sayfalarda temizleme — _loose sayesinde PDF'den gelen
# 'Ku llanm a Talim atında' gibi kelime-içi boşluklara toleranslı.
p1 = re.compile(
r"(?:" + _loose("bu ilac[ıi] kullanmaya ba[şs]lamadan [öo]nce") + r"\s+)?" +
_loose("bu kullanma tal[iı]mat[ıi]n[ıi]") +
r".*?" +
_loose("y[üu]ksek veya d[üu][şs][üu]k doz kullanmay[ıi]n[ıi]z."),
re.IGNORECASE | re.DOTALL
)
p2 = re.compile(
_loose("bu kullanma tal[iı]mat[ıi]nda:") +
r".*?" +
_loose("ba[şs]l[ıi]klar[ıi] yer almaktad[ıi]r."),
re.IGNORECASE | re.DOTALL
)
try:
for doc in docs:
cleaned = p1.sub("", doc.page_content)
cleaned = p2.sub("", cleaned)
cleaned = re.sub(r'^\s*\d+\s*$', '', cleaned, flags=re.MULTILINE)
cleaned = re.sub(r'^\s*\d+\s*/\s*\d+\s*$', '', cleaned, flags=re.MULTILINE)
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
doc.page_content = cleaned.strip()
except Exception as e:
logger.error(f"İçerik temizleme hatası ({filepath.name}): {e}")
drug_id = extract_drug_id(filepath, docs[0].page_content)
if drug_id is None:
continue
logger.info(f"İlaç tespit edildi: {drug_id}")
full_text = "\n".join(doc.page_content for doc in docs)
chunks = split_kt_by_sections(full_text, drug_id, file_hash)
logger.info(f"Chunk sayısı: {len(chunks)}")
# Eski chunk'ları sil (stale data önleme)
if old_hash:
try:
db._collection.delete(where={"file_hash": old_hash})
logger.info(f"Eski chunk'lar silindi (hash: {old_hash[:12]}...)")
except Exception as e:
logger.warning(f"Eski chunk silme hatası: {e}")
batch_size = 50
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
try:
db.add_documents(batch)
time.sleep(2)
except Exception as e:
logger.error(f"Embedding hatası (bekleniyor...): {e}")
time.sleep(10)
try:
db.add_documents(batch)
except Exception as inner_e:
logger.error(f"Retry başarısız, atlanıyor: {inner_e}")
manifest[str(filepath)] = file_hash
with open(MANIFEST_PATH, "w") as f:
json.dump(manifest, f)
# İlaç listesini ChromaDB'den otomatik oluştur
_generate_drugs_list(db)
logger.info("Ingestion tamamlandı.")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--pdf-dir", type=str, required=True)
parser.add_argument("--mode", type=str, choices=["incremental", "full"], default="full", help="Ingestion mode: 'incremental' to only process changed files, 'full' to reprocess all files")
args = parser.parse_args()
process_pdfs(args.pdf_dir, args.mode)