Spaces:
Sleeping
Sleeping
| import argparse | |
| import os | |
| import re | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| from typing import Optional | |
| from langchain_core.documents import Document | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import JinaEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from dotenv import load_dotenv | |
| from app.logger import get_logger | |
| load_dotenv() | |
| logger = get_logger("ingest") | |
| CHROMA_DB_DIR = "./chroma_db" | |
| MANIFEST_PATH = "./manifest.json" | |
| def _loose(phrase: str) -> str: | |
| """Bir ifadeyi, PDF çıkarımında kelime içine serpilmiş rastgele | |
| boşluklara ('Ku llanm a') toleranslı bir regex'e çevirir. | |
| Boşluklar \\s+, diğer tokenlar arasına \\s* eklenir. '.' -> \\.?, | |
| ':' -> :?, '?' önceki tokenı opsiyonel yapmak yerine literal | |
| bırakmaz (opsiyonel karakterler nokta/:). | |
| Karakter sınıfları ([ıi] gibi) tek token sayılır. | |
| """ | |
| out: list[str] = [] | |
| i, n = 0, len(phrase) | |
| while i < n: | |
| c = phrase[i] | |
| if c.isspace(): | |
| out.append(r"\s+") | |
| while i < n and phrase[i].isspace(): | |
| i += 1 | |
| continue | |
| if c == "[": | |
| j = phrase.index("]", i) | |
| token = phrase[i : j + 1] | |
| i = j + 1 | |
| elif c == ".": | |
| if out and out[-1] == r"\s*": | |
| out.pop() | |
| out.append(r"\.?") | |
| i += 1 | |
| continue | |
| elif c == ":": | |
| if out and out[-1] == r"\s*": | |
| out.pop() | |
| out.append(r":?") | |
| i += 1 | |
| continue | |
| elif c == "?": | |
| i += 1 | |
| continue | |
| else: | |
| token = c | |
| i += 1 | |
| if out and not out[-1].endswith(r"\s+"): | |
| out.append(r"\s*") | |
| out.append(token) | |
| return "".join(out) | |
| def get_file_hash(filepath: Path) -> str: | |
| hasher = hashlib.md5() | |
| with open(filepath, 'rb') as f: | |
| buf = f.read() | |
| hasher.update(buf) | |
| return hasher.hexdigest() | |
| def _extract_drug_id_from_filename(filepath: Path) -> Optional[str]: | |
| """Dosya adından ilaç adını çeker. Format: {İLAÇ ADI}__{random}.pdf""" | |
| stem = filepath.stem | |
| if '__' in stem: | |
| part = stem.split('__')[0].strip() | |
| part = re.sub(r'\s+', ' ', part) | |
| if part: | |
| return part | |
| return None | |
| def extract_drug_id(doc_path: Path, first_page_text: str) -> str: | |
| # 1. Ana yöntem: Dosya adından çek (ilaç adı __ ayracından önceki kısım) | |
| drug_name = _extract_drug_id_from_filename(doc_path) | |
| if drug_name: | |
| logger.info(f"Dosya adından tespit edildi: {doc_path.name} → {drug_name}") | |
| return drug_name | |
| # 2. KULLANMA TALİMATI başlığından sonra ilaç adını topla | |
| lines = first_page_text.split('\n') | |
| start_collecting = False | |
| drug_name_lines = [] | |
| stop_prefixes = [ | |
| "ağız", "oral", "deri", "kas", "damar", | |
| "etkin madde", "yardımcı madde", | |
| "ağızdan", "kas içine", "damar içine", | |
| "cilt üzerine", "deri altına", | |
| "bu kullanma talimatında", "kullanmadan önce" | |
| ] | |
| for line in lines: | |
| clean_line = line.strip() | |
| if not start_collecting: | |
| if "KULLANMA TALİMATI" in clean_line.upper(): | |
| start_collecting = True | |
| continue | |
| if not clean_line: | |
| continue | |
| lower_line = clean_line.lower().lstrip("•.-* ") | |
| if any(lower_line.startswith(prefix) for prefix in stop_prefixes): | |
| break | |
| drug_name_lines.append(clean_line) | |
| if drug_name_lines: | |
| result = " ".join(drug_name_lines).replace("®", "").strip() | |
| logger.warning(f"KULLANMA TALİMATI yöntemi kullanıldı: {doc_path.name} → {result}") | |
| return result | |
| # 3. Regex: İlk sayfada "mg", "tablet", "kapsül" vb. içeren satırları ara | |
| drug_pattern = re.compile( | |
| r'^(.+(?:mg|mcg|mikrogram|ml|IU).+(?:tablet|kapsül|kapsul|film|şurup|surup|jel|krem|damla|flakon|süspansiyon|suspansiyon|sprey|ampul|enjektabl).*?)$', | |
| re.IGNORECASE | re.MULTILINE | |
| ) | |
| match = drug_pattern.search(first_page_text[:1000]) | |
| if match: | |
| result = match.group(1).replace("®", "").strip() | |
| logger.warning(f"Regex yöntemi kullanıldı: {doc_path.name} → {result}") | |
| return result | |
| # 4. Tespit edilemedi — None döndür, process_pdfs atlayacak | |
| logger.warning(f"İlaç adı tespit edilemedi, atlanıyor: {doc_path.name}") | |
| return None | |
| def split_kt_by_sections(text: str, drug_id: str, file_hash: str) -> "list[Document]": | |
| # Başlıkları yakalayacak esnek regex desenleri | |
| patterns = { | |
| "1. İlaç nedir ve ne için kullanılır?": r"(?m)^\s*1\.\s+(?!\").*nedir\s+ve\s+ne\s+için\s+kullanılır[^)\"]*$", | |
| "2. Kullanmadan önce dikkat edilmesi gerekenler": r"(?m)^\s*2\.\s+(?!\").*kullanmadan\s+önce\s+dikkat\s+edilmesi\s+gerekenler[^)\"]*$", | |
| "3. Nasıl kullanılır?": r"(?m)^\s*3\.\s+(?!\").*nasıl\s+kullanılır[^)\"]*$", | |
| "4. Olası yan etkiler nelerdir?": r"(?m)^\s*4\.\s+(?!\").*olası\s+yan\s+etkiler[^)\"]*$", | |
| "5. Saklama koşulları": r"(?m)^\s*5\.\s+(?!\").*saklanması[^)\"]*$" | |
| } | |
| matches = [] | |
| for section_name, pattern in patterns.items(): | |
| # İlk eşleşmeyi bul | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| matches.append({"name": section_name, "start": match.start()}) | |
| # Başlangıç indeksine göre sırala | |
| matches.sort(key=lambda x: x["start"]) | |
| sections = [] | |
| if not matches: | |
| # Hiç başlık bulunamazsa tüm metni tek bir genel bölüm olarak al | |
| sections.append({"name": "Genel Bilgiler", "content": text.strip()}) | |
| else: | |
| # Bulunan bölümleri ayır | |
| for i in range(len(matches)): | |
| # İlk başlıktan önceki metni (prelude) giriş bölümü yapmak yerine ilk bölümün başına dahil ediyoruz | |
| start_index = 0 if i == 0 else matches[i]["start"] | |
| end_index = matches[i+1]["start"] if i + 1 < len(matches) else len(text) | |
| sections.append({ | |
| "name": matches[i]["name"], | |
| "content": text[start_index:end_index].strip() | |
| }) | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1800, | |
| chunk_overlap=300, | |
| separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""] | |
| ) | |
| docs = [] | |
| for sec in sections: | |
| chunks = text_splitter.split_text(sec["content"]) | |
| for chunk in chunks: | |
| # RAG performansını artırmak için ilaç adını bölüm başlığının başına ekliyoruz | |
| chunk_text = f"[{drug_id} - {sec['name']}]\n\n{chunk}" | |
| docs.append(Document( | |
| page_content=chunk_text, | |
| metadata={ | |
| "drug_id": drug_id, | |
| "section": sec["name"], | |
| "file_hash": file_hash | |
| } | |
| )) | |
| return docs | |
| def _generate_drugs_list(db): | |
| """ChromaDB'den unique drug_id'leri çekip drugs_list.txt'ye yazar.""" | |
| try: | |
| collection = db._collection | |
| results = collection.get(include=["metadatas"]) | |
| drug_ids = set() | |
| for meta in results["metadatas"]: | |
| did = meta.get("drug_id", "") | |
| if did and did != "SKIP": | |
| drug_ids.add(did) | |
| sorted_drugs = sorted(drug_ids, key=lambda x: x.lower()) | |
| with open("drugs_list.txt", "w", encoding="utf-8") as f: | |
| for drug in sorted_drugs: | |
| f.write(f"{drug}\n") | |
| logger.info(f"drugs_list.txt güncellendi: {len(sorted_drugs)} ilaç") | |
| except Exception as e: | |
| logger.error(f"drugs_list.txt oluşturma hatası: {e}") | |
| def process_pdfs(pdf_dir: str, mode: str): | |
| import time | |
| pdf_dir_path = Path(pdf_dir) | |
| manifest = {} | |
| if os.path.exists(MANIFEST_PATH): | |
| with open(MANIFEST_PATH, "r") as f: | |
| manifest = json.load(f) | |
| db = Chroma(persist_directory=CHROMA_DB_DIR, embedding_function=JinaEmbeddings(jina_api_key=os.environ.get("JINA_API_KEY"), model_name="jina-embeddings-v3")) | |
| for filepath in pdf_dir_path.glob("*.pdf"): | |
| file_hash = get_file_hash(filepath) | |
| old_hash = manifest.get(str(filepath)) | |
| if mode == "incremental" and old_hash == file_hash: | |
| logger.debug(f"Atlanıyor (değişiklik yok): {filepath.name}") | |
| continue | |
| logger.info(f"İşleniyor: {filepath.name}") | |
| loader = PyPDFLoader(str(filepath)) | |
| docs = loader.load() | |
| if not docs: | |
| logger.warning(f"Boş PDF: {filepath.name}") | |
| continue | |
| # Tüm sayfalarda temizleme — _loose sayesinde PDF'den gelen | |
| # 'Ku llanm a Talim atında' gibi kelime-içi boşluklara toleranslı. | |
| p1 = re.compile( | |
| r"(?:" + _loose("bu ilac[ıi] kullanmaya ba[şs]lamadan [öo]nce") + r"\s+)?" + | |
| _loose("bu kullanma tal[iı]mat[ıi]n[ıi]") + | |
| r".*?" + | |
| _loose("y[üu]ksek veya d[üu][şs][üu]k doz kullanmay[ıi]n[ıi]z."), | |
| re.IGNORECASE | re.DOTALL | |
| ) | |
| p2 = re.compile( | |
| _loose("bu kullanma tal[iı]mat[ıi]nda:") + | |
| r".*?" + | |
| _loose("ba[şs]l[ıi]klar[ıi] yer almaktad[ıi]r."), | |
| re.IGNORECASE | re.DOTALL | |
| ) | |
| try: | |
| for doc in docs: | |
| cleaned = p1.sub("", doc.page_content) | |
| cleaned = p2.sub("", cleaned) | |
| cleaned = re.sub(r'^\s*\d+\s*$', '', cleaned, flags=re.MULTILINE) | |
| cleaned = re.sub(r'^\s*\d+\s*/\s*\d+\s*$', '', cleaned, flags=re.MULTILINE) | |
| cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) | |
| doc.page_content = cleaned.strip() | |
| except Exception as e: | |
| logger.error(f"İçerik temizleme hatası ({filepath.name}): {e}") | |
| drug_id = extract_drug_id(filepath, docs[0].page_content) | |
| if drug_id is None: | |
| continue | |
| logger.info(f"İlaç tespit edildi: {drug_id}") | |
| full_text = "\n".join(doc.page_content for doc in docs) | |
| chunks = split_kt_by_sections(full_text, drug_id, file_hash) | |
| logger.info(f"Chunk sayısı: {len(chunks)}") | |
| # Eski chunk'ları sil (stale data önleme) | |
| if old_hash: | |
| try: | |
| db._collection.delete(where={"file_hash": old_hash}) | |
| logger.info(f"Eski chunk'lar silindi (hash: {old_hash[:12]}...)") | |
| except Exception as e: | |
| logger.warning(f"Eski chunk silme hatası: {e}") | |
| batch_size = 50 | |
| for i in range(0, len(chunks), batch_size): | |
| batch = chunks[i:i+batch_size] | |
| try: | |
| db.add_documents(batch) | |
| time.sleep(2) | |
| except Exception as e: | |
| logger.error(f"Embedding hatası (bekleniyor...): {e}") | |
| time.sleep(10) | |
| try: | |
| db.add_documents(batch) | |
| except Exception as inner_e: | |
| logger.error(f"Retry başarısız, atlanıyor: {inner_e}") | |
| manifest[str(filepath)] = file_hash | |
| with open(MANIFEST_PATH, "w") as f: | |
| json.dump(manifest, f) | |
| # İlaç listesini ChromaDB'den otomatik oluştur | |
| _generate_drugs_list(db) | |
| logger.info("Ingestion tamamlandı.") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--pdf-dir", type=str, required=True) | |
| parser.add_argument("--mode", type=str, choices=["incremental", "full"], default="full", help="Ingestion mode: 'incremental' to only process changed files, 'full' to reprocess all files") | |
| args = parser.parse_args() | |
| process_pdfs(args.pdf_dir, args.mode) |