package bg.bas.dcl.LLMs.IfGPTDataset; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Scanner; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import bg.bas.dcl.LLMs.BiasAnalyser; import bg.bas.dcl.LLMs.BiasLexicon; import bg.bas.dcl.LLMs.BulgarianSentenceSplitter; import bg.bas.dcl.LLMs.PIIDetector; import bg.bas.dcl.LLMs.SentenceBiasScore; import bg.bas.dcl.general.FileHandler; import bg.bas.dcl.general.JSONProcessor; /** * IfGPTPipeline * * Pipeline for the ifGPT Bulgarian language dataset. * * ----------------------------------------------------------------------- ----------------------------------------------------------------------- * PIPELINE STAGES (executed in order by {@link #run()}) * * 1. EXTRACT * 2. SPLIT * 3. CLEAN * 4. DEDUPLICATE * 5. PII * 6. BIAS * 7. COUNTS — word / sentence / token counts are recomputed on the cleaned, deduplicated text * FULL_DATA_DIR / FULL_META_DIR * * ----------------------------------------------------------------------- */ @SuppressWarnings("unchecked") public class IfGPTPipeline { // ----------------------------------------------------------------------- // Fixed paths // ----------------------------------------------------------------------- public static final String FULL_DATA_DIR = "/home/ivelina/WORK-DCL/IfGPT/IFGPT-DATASET-DATA/"; public static final String FULL_META_DIR = "/home/ivelina/WORK-DCL/IfGPT/IFGPT-DATASET-METADATA/"; // ----------------------------------------------------------------------- // Configurable paths and options // ----------------------------------------------------------------------- private SourceProcessor sourceProcessor; // mandatory private String newDataDir; // mandatory: incoming texts private String sampleDir; // mandatory: boilerplate sample private String newMetaDir; // mandatory: output metadata private String blocklistFile; // boilerplate blocklist file private String dedupReport; // dedup TSV report path private String biasDictPath; // bias dictionary TSV private String openNlpModelPath = null; // null = bundled JAR model private double boilerplateThreshold = 0.50; // FileCleanProcessor threshold private double dedupThreshold = 0.90; // DeduplicationProcessor threshold private int dedupShingleSize = 5; private int dedupNumHashes = 200; private boolean removeDuplicates = false; // whether to strip dup sentences private boolean keepBackups = true; // keep .bak on file modification private boolean skipClean = false; // skip boilerplate cleaning private boolean skipDedup = false; // skip deduplication private boolean skipPii = false; // skip PII scoring private boolean skipBias = false; // skip bias scoring // ----------------------------------------------------------------------- // // ----------------------------------------------------------------------- public IfGPTPipeline setSourceProcessor(SourceProcessor p) { sourceProcessor = p; return this; } public IfGPTPipeline setNewDataDir(String p) { newDataDir = p; return this; } public IfGPTPipeline setSampleDir(String p) { sampleDir = p; return this; } public IfGPTPipeline setNewMetaDir(String p) { newMetaDir = p; return this; } public IfGPTPipeline setBlocklistFile(String p) { blocklistFile = p; return this; } public IfGPTPipeline setDedupReport(String p) { dedupReport = p; return this; } public IfGPTPipeline setBiasDictPath(String p) { biasDictPath = p; return this; } public IfGPTPipeline setOpenNlpModelPath(String p) { openNlpModelPath = p; return this; } public IfGPTPipeline setBoilerplateThreshold(double t) { boilerplateThreshold = t; return this; } public IfGPTPipeline setDedupThreshold(double t) { dedupThreshold = t; return this; } public IfGPTPipeline setDedupShingleSize(int n) { dedupShingleSize = n; return this; } public IfGPTPipeline setDedupNumHashes(int n) { dedupNumHashes = n; return this; } public IfGPTPipeline setRemoveDuplicates(boolean b) { removeDuplicates = b; return this; } public IfGPTPipeline setKeepBackups(boolean b) { keepBackups = b; return this; } public IfGPTPipeline setSkipClean(boolean b) { skipClean = b; return this; } public IfGPTPipeline setSkipDedup(boolean b) { skipDedup = b; return this; } public IfGPTPipeline setSkipPii(boolean b) { skipPii = b; return this; } public IfGPTPipeline setSkipBias(boolean b) { skipBias = b; return this; } // ----------------------------------------------------------------------- // ----------------------------------------------------------------------- /** * Executes all stages in order. * Throws {@link IllegalStateException} if mandatory configuration is missing. */ public void run() { validateConfig(); ensureDirs(newMetaDir, FULL_DATA_DIR, FULL_META_DIR); banner("STAGE 1 — SOURCE EXTRACTION"); runExtraction(); // Shared NLP components (initialised once, reused across stages) BulgarianSentenceSplitter splitter = new BulgarianSentenceSplitter(openNlpModelPath); banner("STAGE 2 — SENTENCE SPLITTING & INITIAL METADATA"); runSentenceSplitting(splitter); if (!skipClean) { banner("STAGE 3 — BOILERPLATE CLEANING"); runCleaning(); } else { log("STAGE 3 skipped (skipClean=true)"); } if (!skipDedup) { banner("STAGE 4 — DEDUPLICATION"); runDeduplication(); } else { log("STAGE 4 skipped (skipDedup=true)"); } PIIDetector piiDetector = skipPii ? null : new PIIDetector(splitter); BiasAnalyser biasAnalyser = skipBias ? null : buildBiasAnalyser(splitter); banner("STAGES 5-7 — PII, BIAS & FINAL COUNTS"); runAnnotationAndCounts(splitter, piiDetector, biasAnalyser); banner("STAGE 8 — PERSIST TO FULL CORPUS"); runPersist(); banner("PIPELINE COMPLETE"); } // ----------------------------------------------------------------------- // Stage 1 — Extraction // ----------------------------------------------------------------------- private void runExtraction() { // The source processor writes plain-text files to newDataDir and // seed metadata JSON to newMetaDir. sourceProcessor.process(newDataDir, newMetaDir); log("Extraction complete → " + newDataDir); } // ----------------------------------------------------------------------- // Stage 2 — Sentence splitting // ----------------------------------------------------------------------- /** * Reads each metadata JSON produced by the source processor, then for * each document text file counts sentences properly using the OpenNLP * splitter and writes the sentence list to a parallel .sentences file * (one sentence per line) used by later stages. */ private void runSentenceSplitting(BulgarianSentenceSplitter splitter) { try { FileHandler fh = new FileHandler(); int docs = 0; for (File txtFile : fh.getFileListing(new File(newDataDir))) { if (!txtFile.isFile() || !txtFile.getName().endsWith(".txt")) continue; // Read document text StringBuilder sb = new StringBuilder(); try (Scanner sc = new Scanner(txtFile, StandardCharsets.UTF_8)) { while (sc.hasNextLine()) sb.append(sc.nextLine()).append('\n'); } String text = sb.toString().trim(); // Split into sentences and persist to .sentences sidecar file String[] sentences = splitter.split(text); File sentFile = new File(newDataDir, txtFile.getName() .replace(".txt", ".sentences")); try (Writer w = new OutputStreamWriter( new FileOutputStream(sentFile), StandardCharsets.UTF_8)) { for (String sent : sentences) { if (!sent.isBlank()) { w.write(sent.trim()); w.write('\n'); } } } docs++; } log("Sentence splitting complete. Documents: " + docs); } catch (Exception e) { e.printStackTrace(); } } // ----------------------------------------------------------------------- // Stage 3 — Boilerplate cleaning // ----------------------------------------------------------------------- private void runCleaning() { FileCleanProcessor fcp = new FileCleanProcessor(boilerplateThreshold); // Learn from sample fcp.learnFromSample(sampleDir); fcp.printTopCommonLines(20); // Save blocklist for audit / reproducibility if (blocklistFile != null && !blocklistFile.isBlank()) { fcp.saveBlocklist(blocklistFile); } // Clean the new data directory fcp.cleanDirectory(newDataDir, keepBackups); log("Boilerplate cleaning complete → " + newDataDir); } // ----------------------------------------------------------------------- // Stage 4 — Deduplication // ----------------------------------------------------------------------- private void runDeduplication() { DeduplicationProcessor dp = new DeduplicationProcessor( dedupThreshold, dedupShingleSize, dedupNumHashes); // Index the full existing corpus log("Indexing full corpus for deduplication…"); dp.indexCorpus(FULL_DATA_DIR); log("Corpus indexed. Sentences: " + dp.getCorpusSize()); // Detect near-duplicates in new data String report = dedupReport != null ? dedupReport : newMetaDir + "dedup_report.tsv"; dp.detectDuplicates(newDataDir, report); if (removeDuplicates) { dp.removeDuplicatesFromNewFolder(newDataDir, keepBackups); } } // ----------------------------------------------------------------------- // Stages 5-7 — PII, Bias annotation + final counts // ----------------------------------------------------------------------- /** * For each document: * a) reads the (cleaned, deduplicated) .sentences sidecar file, * b) runs PII and/or Bias scoring per sentence, * c) recomputes word/sentence/token counts on the surviving text, * d) merges all computed values into a DocumentMetadata and writes * the final metadata JSON to newMetaDir. */ private void runAnnotationAndCounts(BulgarianSentenceSplitter splitter, PIIDetector piiDetector, BiasAnalyser biasAnalyser) { try { FileHandler fh = new FileHandler(); JSONProcessor jp = new JSONProcessor(); int docs = 0, errors = 0; for (File sentFile : fh.getFileListing(new File(newDataDir))) { if (!sentFile.isFile() || !sentFile.getName().endsWith(".sentences")) continue; String stem = sentFile.getName().replace(".sentences", ""); // --- Load sentences --- List sentences = new ArrayList<>(); try (Scanner sc = new Scanner(sentFile, StandardCharsets.UTF_8)) { while (sc.hasNextLine()) { String s = sc.nextLine().trim(); if (!s.isBlank()) sentences.add(s); } } if (sentences.isEmpty()) { log("[WARN] No sentences for: " + stem); errors++; continue; } // --- Load or create DocumentMetadata --- DocumentMetadata meta = loadOrCreateMetadata(jp, stem); // --- PII per sentence --- List piiVec = new ArrayList<>(); if (piiDetector != null) { int sentIdx = 0; for (String sent : sentences) { PIIDetector.SentencePIIScore score = piiDetector.analyseSentence(sent, stem + "-" + sentIdx++); piiVec.add(score.getPiiCoverage()); } } meta.setPiiVector(piiVec); // --- Bias per sentence --- List biasVec = new ArrayList<>(); if (biasAnalyser != null) { for (String sent : sentences) { SentenceBiasScore score = biasAnalyser.analyseSentence(sent); biasVec.add(score.totalCoverage()); } } meta.setBiasVector(biasVec); // --- Recompute counts from surviving sentences --- int nSentences = sentences.size(); int nWords = 0; int nTokens = 0; for (String sent : sentences) { String[] toks = sent.split("\\s+"); nWords += toks.length; // estimate tokens: words + punctuation characters nTokens += toks.length + sent.length() - sent.replaceAll("[.,;:!?()\\-]", "").length(); } // Paragraphs: count blank-line groups in the original text file int nParagraphs = countParagraphs(new File(newDataDir, stem + ".txt")); meta.setNumberSentences(nSentences) .setNumberWords(nWords) .setNumberTokens(nTokens) .setNumberParagraphs(nParagraphs); // --- Persist metadata JSON --- writeMetadata(meta, newMetaDir + stem + "_meta.json"); docs++; } log("Annotation & counts complete. Documents: " + docs + " Errors: " + errors); } catch (Exception e) { e.printStackTrace(); } } // ----------------------------------------------------------------------- // Stage 8 // ----------------------------------------------------------------------- /** */ private void runPersist() { try { FileHandler fh = new FileHandler(); int dataCopied = 0, metaCopied = 0; // Copy text files for (File f : fh.getFileListing(new File(newDataDir))) { if (!f.isFile() || !f.getName().endsWith(".txt")) continue; File dest = new File(FULL_DATA_DIR, f.getName()); if (!dest.exists()) { Files.copy(f.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); dataCopied++; } } // Copy metadata JSON files for (File f : fh.getFileListing(new File(newMetaDir))) { if (!f.isFile() || !f.getName().endsWith("_meta.json")) continue; File dest = new File(FULL_META_DIR, f.getName()); if (!dest.exists()) { Files.copy(f.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); metaCopied++; } } log("Persist complete. Text files copied: " + dataCopied + " Metadata files copied: " + metaCopied); log("FULL_DATA_DIR : " + FULL_DATA_DIR); log("FULL_META_DIR : " + FULL_META_DIR); } catch (Exception e) { e.printStackTrace(); } } // ----------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------- private DocumentMetadata loadOrCreateMetadata(JSONProcessor jp, String stem) { // Try to find a seed metadata JSON written by the source processor // Filename conventions: stem + ".json" or stem + "_meta.json" String[] candidates = { newMetaDir + stem + "_meta.json", newMetaDir + stem + ".json" }; for (String path : candidates) { File f = new File(path); if (f.exists()) { try { JSONObject raw = jp.readJSON(f); // First try full schema, then legacy format if (raw.containsKey("Identifier")) { return DocumentMetadata.fromJson(raw); } else { DocumentMetadata m = new DocumentMetadata(stem); m.mergeLegacy(raw); return m; } } catch (Exception e) { log("[WARN] Could not parse metadata JSON for " + stem + ": " + e.getMessage()); } } } // Fall back to empty skeleton return new DocumentMetadata(stem); } private void writeMetadata(DocumentMetadata meta, String outPath) throws Exception { JSONObject json = meta.toJson(); try (Writer w = new OutputStreamWriter( new FileOutputStream(outPath), StandardCharsets.UTF_8)) { json.writeJSONString(w); } } private int countParagraphs(File txtFile) { if (!txtFile.exists()) return 0; int count = 0; boolean inPara = false; try (Scanner sc = new Scanner(txtFile, StandardCharsets.UTF_8)) { while (sc.hasNextLine()) { String line = sc.nextLine(); if (line.isBlank()) { inPara = false; } else { if (!inPara) { count++; inPara = true; } } } } catch (Exception e) { /* ignored */ } return Math.max(count, 1); } private BiasAnalyser buildBiasAnalyser(BulgarianSentenceSplitter splitter) { if (biasDictPath == null || biasDictPath.isBlank()) { log("[WARN] No bias dictionary path set — bias scoring disabled."); return null; } BiasLexicon lexicon = new BiasLexicon(biasDictPath); return new BiasAnalyser(lexicon, splitter); } private void validateConfig() { List missing = new ArrayList<>(); if (sourceProcessor == null) missing.add("sourceProcessor"); if (newDataDir == null || newDataDir.isBlank()) missing.add("newDataDir"); if (sampleDir == null || sampleDir.isBlank()) missing.add("sampleDir"); if (newMetaDir == null || newMetaDir.isBlank()) missing.add("newMetaDir"); if (!missing.isEmpty()) throw new IllegalStateException( "Pipeline configuration missing: " + missing); } private void ensureDirs(String... paths) { for (String p : paths) { if (p != null) new File(p).mkdirs(); } } private void banner(String msg) { System.out.println("\n" + "=".repeat(60)); System.out.println(" " + msg); System.out.println("=".repeat(60)); } private void log(String msg) { System.out.println("[Pipeline] " + msg); } }