#!/usr/bin/env python3 """ Script to add District metadata to Qdrant chunks based on filename analysis. Handles Uganda districts, ministry mappings, and LLM inference for ambiguous cases. """ import re import yaml import logging from dataclasses import dataclass from typing import Dict, List, Optional from qdrant_client import QdrantClient # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class DistrictMapping: """Mapping for district-related entities""" name: str aliases: List[str] is_district: bool = True class DistrictMetadataProcessor: def __init__(self, config_path: str = "src/config/settings.yaml"): # Load config manually with open(config_path, 'r') as f: self.config = yaml.safe_load(f) # Initialize Qdrant client (will be imported when needed) self.llm_client = None self.qdrant_client = None self.collection_name = self.config["qdrant"]["collection_name"] # Initialize district mappings self.district_mappings = self._initialize_district_mappings() self.ministry_mappings = self._initialize_ministry_mappings() def _initialize_district_mappings(self) -> Dict[str, DistrictMapping]: """Initialize Uganda districts and their aliases""" districts = [ # Central Region DistrictMapping("Kampala", ["KCCA", "Kampala Capital City Authority"]), DistrictMapping("Wakiso", ["Wakiso"]), DistrictMapping("Mukono", ["Mukono"]), DistrictMapping("Luweero", ["Luweero"]), DistrictMapping("Nakaseke", ["Nakaseke"]), DistrictMapping("Nakasongola", ["Nakasongola"]), DistrictMapping("Kayunga", ["Kayunga"]), DistrictMapping("Buikwe", ["Buikwe"]), DistrictMapping("Buvuma", ["Buvuma"]), # Northern Region DistrictMapping("Gulu", ["Gulu", "Gulu DLG"]), DistrictMapping("Kitgum", ["Kitgum"]), DistrictMapping("Pader", ["Pader"]), DistrictMapping("Agago", ["Agago"]), DistrictMapping("Lamwo", ["Lamwo"]), DistrictMapping("Nwoya", ["Nwoya"]), DistrictMapping("Amuru", ["Amuru"]), DistrictMapping("Omoro", ["Omoro"]), DistrictMapping("Oyam", ["Oyam"]), DistrictMapping("Kole", ["Kole"]), DistrictMapping("Apac", ["Apac", "Apac District"]), DistrictMapping("Lira", ["Lira"]), DistrictMapping("Alebtong", ["Alebtong"]), DistrictMapping("Amolatar", ["Amolatar"]), DistrictMapping("Dokolo", ["Dokolo"]), DistrictMapping("Otuke", ["Otuke"]), DistrictMapping("Kwania", ["Kwania"]), # Eastern Region DistrictMapping("Jinja", ["Jinja"]), DistrictMapping("Kamuli", ["Kamuli"]), DistrictMapping("Iganga", ["Iganga"]), DistrictMapping("Bugiri", ["Bugiri"]), DistrictMapping("Mayuge", ["Mayuge"]), DistrictMapping("Namayingo", ["Namayingo"]), DistrictMapping("Busia", ["Busia"]), DistrictMapping("Tororo", ["Tororo"]), DistrictMapping("Pallisa", ["Pallisa"]), DistrictMapping("Kumi", ["Kumi"]), DistrictMapping("Bukedea", ["Bukedea"]), DistrictMapping("Soroti", ["Soroti"]), DistrictMapping("Serere", ["Serere"]), DistrictMapping("Ngora", ["Ngora"]), DistrictMapping("Kaberamaido", ["Kaberamaido"]), DistrictMapping("Kalaki", ["Kalaki"]), DistrictMapping("Kapelebyong", ["Kapelebyong"]), DistrictMapping("Amuria", ["Amuria"]), DistrictMapping("Katakwi", ["Katakwi"]), DistrictMapping("Kotido", ["Kotido"]), DistrictMapping("Abim", ["Abim"]), DistrictMapping("Kaabong", ["Kaabong", "Kaabong District"]), DistrictMapping("Karenga", ["Karenga"]), DistrictMapping("Moroto", ["Moroto"]), DistrictMapping("Napak", ["Napak"]), DistrictMapping("Nabilatuk", ["Nabilatuk"]), DistrictMapping("Amudat", ["Amudat"]), DistrictMapping("Nakapiripirit", ["Nakapiripirit"]), DistrictMapping("Bukwo", ["Bukwo"]), DistrictMapping("Kween", ["Kween"]), DistrictMapping("Kapchorwa", ["Kapchorwa"]), DistrictMapping("Sironko", ["Sironko"]), DistrictMapping("Manafwa", ["Manafwa"]), DistrictMapping("Bududa", ["Bududa"]), DistrictMapping("Mbale", ["Mbale"]), DistrictMapping("Butaleja", ["Butaleja"]), DistrictMapping("Namisindwa", ["Namisindwa"]), DistrictMapping("Bulambuli", ["Bulambuli"]), # Western Region DistrictMapping("Masaka", ["Masaka"]), DistrictMapping("Kalungu", ["Kalungu"]), DistrictMapping("Bukomansimbi", ["Bukomansimbi"]), DistrictMapping("Lwengo", ["Lwengo"]), DistrictMapping("Sembabule", ["Sembabule"]), DistrictMapping("Rakai", ["Rakai"]), DistrictMapping("Kyotera", ["Kyotera"]), DistrictMapping("Mpigi", ["Mpigi"]), DistrictMapping("Butambala", ["Butambala"]), DistrictMapping("Gomba", ["Gomba"]), DistrictMapping("Mityana", ["Mityana"]), DistrictMapping("Mubende", ["Mubende"]), DistrictMapping("Kassanda", ["Kassanda"]), DistrictMapping("Kiboga", ["Kiboga"]), DistrictMapping("Kyankwanzi", ["Kyankwanzi"]), DistrictMapping("Hoima", ["Hoima"]), DistrictMapping("Kikuube", ["Kikuube"]), DistrictMapping("Kakumiro", ["Kakumiro"]), DistrictMapping("Kibaale", ["Kibaale"]), DistrictMapping("Kagadi", ["Kagadi"]), DistrictMapping("Buliisa", ["Buliisa"]), DistrictMapping("Masindi", ["Masindi"]), DistrictMapping("Kiryandongo", ["Kiryandongo"]), DistrictMapping("Buliisa", ["Buliisa"]), DistrictMapping("Pakwach", ["Pakwach"]), DistrictMapping("Nebbi", ["Nebbi"]), DistrictMapping("Zombo", ["Zombo"]), DistrictMapping("Arua", ["Arua"]), DistrictMapping("Terego", ["Terego"]), DistrictMapping("Madi-Okollo", ["Madi-Okollo"]), DistrictMapping("Obongi", ["Obongi"]), DistrictMapping("Moyo", ["Moyo"]), DistrictMapping("Yumbe", ["Yumbe"]), DistrictMapping("Koboko", ["Koboko"]), DistrictMapping("Maracha", ["Maracha"]), DistrictMapping("Adjumani", ["Adjumani"]), # South Western Region DistrictMapping("Mbarara", ["Mbarara"]), DistrictMapping("Ibanda", ["Ibanda"]), DistrictMapping("Isingiro", ["Isingiro"]), DistrictMapping("Kiruhura", ["Kiruhura"]), DistrictMapping("Kazo", ["Kazo"]), DistrictMapping("Ntungamo", ["Ntungamo"]), DistrictMapping("Rwampara", ["Rwampara"]), DistrictMapping("Rubanda", ["Rubanda"]), DistrictMapping("Rukiga", ["Rukiga"]), DistrictMapping("Kanungu", ["Kanungu"]), DistrictMapping("Rukungiri", ["Rukungiri"]), DistrictMapping("Kisoro", ["Kisoro"]), DistrictMapping("Bundibugyo", ["Bundibugyo"]), DistrictMapping("Ntoroko", ["Ntoroko"]), DistrictMapping("Kasese", ["Kasese"]), DistrictMapping("Bunyangabu", ["Bunyangabu"]), DistrictMapping("Fort Portal", ["Fort Portal"]), DistrictMapping("Kabarole", ["Kabarole"]), DistrictMapping("Kyenjojo", ["Kyenjojo"]), DistrictMapping("Kamwenge", ["Kamwenge"]), DistrictMapping("Kitagwenda", ["Kitagwenda"]), DistrictMapping("Kyegegwa", ["Kyegegwa"]), DistrictMapping("Mitooma", ["Mitooma"]), DistrictMapping("Rubirizi", ["Rubirizi"]), DistrictMapping("Sheema", ["Sheema"]), DistrictMapping("Bushenyi", ["Bushenyi"]), # Special cases DistrictMapping("Kalangala", ["Kalangala", "Kalangala DLG"]), ] # Create mapping dictionary mapping_dict = {} for district in districts: mapping_dict[district.name.lower()] = district for alias in district.aliases: mapping_dict[alias.lower()] = district return mapping_dict def _initialize_ministry_mappings(self) -> Dict[str, str]: """Initialize ministry and organization mappings""" return { "maaif": "Ministry of Agriculture, Animal Industry and Fisheries", "mwts": "Ministry of Works and Transport", "kcca": "Kampala Capital City Authority", "oag": "Office of the Auditor General", "arsdp": "Albertine Regional Sustainable Development Project", "avcdp": "Agriculture Value Chain Development Project", "ida": "International Development Association", "dlg": "District Local Government", "lg": "Local Government", } def _extract_district_from_filename(self, filename: str) -> Optional[str]: """Extract district from filename using pattern matching""" filename_lower = filename.lower() # Check for explicit district mentions for key, district_mapping in self.district_mappings.items(): if key in filename_lower: return district_mapping.name # Check for ministry/organization patterns that are NOT districts for ministry_key in self.ministry_mappings.keys(): if ministry_key in filename_lower: return None # This is a ministry, not a district # Check for patterns like "District Local Government" district_pattern = r'(\w+)\s+district\s+local\s+government' match = re.search(district_pattern, filename_lower) if match: district_name = match.group(1).title() if district_name.lower() in self.district_mappings: return self.district_mappings[district_name.lower()].name # Check for patterns like "DLG Report" dlg_pattern = r'(\w+)\s+dlg\s+report' match = re.search(dlg_pattern, filename_lower) if match: district_name = match.group(1).title() if district_name.lower() in self.district_mappings: return self.district_mappings[district_name.lower()].name return None def _infer_district_with_llm(self, filename: str) -> Optional[str]: """Use LLM to infer district from filename when pattern matching fails""" # For now, return None - LLM integration can be added later logger.info(f"LLM inference needed for filename: {filename}") return None def infer_district(self, filename: str) -> Optional[str]: """Main method to infer district from filename""" # First try pattern matching district = self._extract_district_from_filename(filename) if district: return district # If pattern matching fails, use LLM return self._infer_district_with_llm(filename) def fetch_chunks_batch(self, batch_size: int = 100, offset: int = 0) -> List[Dict]: """Fetch a batch of chunks from Qdrant (metadata only)""" try: # Import Qdrant client when needed if self.qdrant_client is None: self.qdrant_client = QdrantClient( url=self.config["qdrant"]["url"], api_key=self.config["qdrant"]["api_key"] ) # Get points with metadata only (no vectors) points = self.qdrant_client.scroll( collection_name=self.collection_name, limit=batch_size, offset=offset, with_payload=True, with_vectors=False )[0] return points except Exception as e: logger.error(f"Failed to fetch batch: {e}") return [] def update_chunks_with_district(self, points: List[Dict]) -> int: """Update chunks with district metadata""" updated_count = 0 # Import Qdrant client when needed if self.qdrant_client is None: from qdrant_client import QdrantClient self.qdrant_client = QdrantClient( url=self.config["qdrant"]["url"], api_key=self.config["qdrant"]["api_key"] ) for point in points: try: point_id = point.id metadata = point.payload.get("metadata", {}) filename = metadata.get("filename", "") if not filename: logger.warning(f"Point {point_id} has no filename") continue # Infer district district = self.infer_district(filename) # Update metadata updated_metadata = metadata.copy() updated_metadata["district"] = district # Update point in Qdrant self.qdrant_client.set_payload( collection_name=self.collection_name, payload={"metadata": updated_metadata}, points=[point_id] ) updated_count += 1 logger.info(f"Updated point {point_id}: {filename} -> {district}") except Exception as e: logger.error(f"Failed to update point {point_id}: {e}") return updated_count def process_all_chunks(self, batch_size: int = 100): """Process all chunks in batches""" total_updated = 0 offset = 0 logger.info(f"Starting to process chunks in batches of {batch_size}") while True: # Fetch batch points = self.fetch_chunks_batch(batch_size, offset) if not points: break logger.info(f"Processing batch: {len(points)} points (offset: {offset})") # Update batch updated_count = self.update_chunks_with_district(points) total_updated += updated_count logger.info(f"Updated {updated_count} points in this batch") # Move to next batch offset += batch_size logger.info(f"Total updated: {total_updated} points") return total_updated def main(): """Main function to run the district metadata processor""" try: processor = DistrictMetadataProcessor() # Test with a small batch first logger.info("Testing with first 10 chunks...") test_points = processor.fetch_chunks_batch(10, 0) if test_points: logger.info("Test batch fetched successfully. Processing...") for point in test_points: filename = point.payload.get("metadata", {}).get("filename", "") district = processor.infer_district(filename) logger.info(f"Test: {filename} -> {district}") # Ask user if they want to proceed with full processing response = input("\nProceed with full processing? (y/n): ") if response.lower() == 'y': processor.process_all_chunks(batch_size=100) else: logger.info("Processing cancelled by user") except Exception as e: logger.error(f"Error in main: {e}") raise if __name__ == "__main__": main()