Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Script to add District metadata to Qdrant chunks based on filename analysis. | |
| Handles Uganda districts, ministry mappings, and LLM inference for ambiguous cases. | |
| """ | |
| import re | |
| import yaml | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Optional | |
| from qdrant_client import QdrantClient | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class DistrictMapping: | |
| """Mapping for district-related entities""" | |
| name: str | |
| aliases: List[str] | |
| is_district: bool = True | |
| class DistrictMetadataProcessor: | |
| def __init__(self, config_path: str = "src/config/settings.yaml"): | |
| # Load config manually | |
| with open(config_path, 'r') as f: | |
| self.config = yaml.safe_load(f) | |
| # Initialize Qdrant client (will be imported when needed) | |
| self.llm_client = None | |
| self.qdrant_client = None | |
| self.collection_name = self.config["qdrant"]["collection_name"] | |
| # Initialize district mappings | |
| self.district_mappings = self._initialize_district_mappings() | |
| self.ministry_mappings = self._initialize_ministry_mappings() | |
| def _initialize_district_mappings(self) -> Dict[str, DistrictMapping]: | |
| """Initialize Uganda districts and their aliases""" | |
| districts = [ | |
| # Central Region | |
| DistrictMapping("Kampala", ["KCCA", "Kampala Capital City Authority"]), | |
| DistrictMapping("Wakiso", ["Wakiso"]), | |
| DistrictMapping("Mukono", ["Mukono"]), | |
| DistrictMapping("Luweero", ["Luweero"]), | |
| DistrictMapping("Nakaseke", ["Nakaseke"]), | |
| DistrictMapping("Nakasongola", ["Nakasongola"]), | |
| DistrictMapping("Kayunga", ["Kayunga"]), | |
| DistrictMapping("Buikwe", ["Buikwe"]), | |
| DistrictMapping("Buvuma", ["Buvuma"]), | |
| # Northern Region | |
| DistrictMapping("Gulu", ["Gulu", "Gulu DLG"]), | |
| DistrictMapping("Kitgum", ["Kitgum"]), | |
| DistrictMapping("Pader", ["Pader"]), | |
| DistrictMapping("Agago", ["Agago"]), | |
| DistrictMapping("Lamwo", ["Lamwo"]), | |
| DistrictMapping("Nwoya", ["Nwoya"]), | |
| DistrictMapping("Amuru", ["Amuru"]), | |
| DistrictMapping("Omoro", ["Omoro"]), | |
| DistrictMapping("Oyam", ["Oyam"]), | |
| DistrictMapping("Kole", ["Kole"]), | |
| DistrictMapping("Apac", ["Apac", "Apac District"]), | |
| DistrictMapping("Lira", ["Lira"]), | |
| DistrictMapping("Alebtong", ["Alebtong"]), | |
| DistrictMapping("Amolatar", ["Amolatar"]), | |
| DistrictMapping("Dokolo", ["Dokolo"]), | |
| DistrictMapping("Otuke", ["Otuke"]), | |
| DistrictMapping("Kwania", ["Kwania"]), | |
| # Eastern Region | |
| DistrictMapping("Jinja", ["Jinja"]), | |
| DistrictMapping("Kamuli", ["Kamuli"]), | |
| DistrictMapping("Iganga", ["Iganga"]), | |
| DistrictMapping("Bugiri", ["Bugiri"]), | |
| DistrictMapping("Mayuge", ["Mayuge"]), | |
| DistrictMapping("Namayingo", ["Namayingo"]), | |
| DistrictMapping("Busia", ["Busia"]), | |
| DistrictMapping("Tororo", ["Tororo"]), | |
| DistrictMapping("Pallisa", ["Pallisa"]), | |
| DistrictMapping("Kumi", ["Kumi"]), | |
| DistrictMapping("Bukedea", ["Bukedea"]), | |
| DistrictMapping("Soroti", ["Soroti"]), | |
| DistrictMapping("Serere", ["Serere"]), | |
| DistrictMapping("Ngora", ["Ngora"]), | |
| DistrictMapping("Kaberamaido", ["Kaberamaido"]), | |
| DistrictMapping("Kalaki", ["Kalaki"]), | |
| DistrictMapping("Kapelebyong", ["Kapelebyong"]), | |
| DistrictMapping("Amuria", ["Amuria"]), | |
| DistrictMapping("Katakwi", ["Katakwi"]), | |
| DistrictMapping("Kotido", ["Kotido"]), | |
| DistrictMapping("Abim", ["Abim"]), | |
| DistrictMapping("Kaabong", ["Kaabong", "Kaabong District"]), | |
| DistrictMapping("Karenga", ["Karenga"]), | |
| DistrictMapping("Moroto", ["Moroto"]), | |
| DistrictMapping("Napak", ["Napak"]), | |
| DistrictMapping("Nabilatuk", ["Nabilatuk"]), | |
| DistrictMapping("Amudat", ["Amudat"]), | |
| DistrictMapping("Nakapiripirit", ["Nakapiripirit"]), | |
| DistrictMapping("Bukwo", ["Bukwo"]), | |
| DistrictMapping("Kween", ["Kween"]), | |
| DistrictMapping("Kapchorwa", ["Kapchorwa"]), | |
| DistrictMapping("Sironko", ["Sironko"]), | |
| DistrictMapping("Manafwa", ["Manafwa"]), | |
| DistrictMapping("Bududa", ["Bududa"]), | |
| DistrictMapping("Mbale", ["Mbale"]), | |
| DistrictMapping("Butaleja", ["Butaleja"]), | |
| DistrictMapping("Namisindwa", ["Namisindwa"]), | |
| DistrictMapping("Bulambuli", ["Bulambuli"]), | |
| # Western Region | |
| DistrictMapping("Masaka", ["Masaka"]), | |
| DistrictMapping("Kalungu", ["Kalungu"]), | |
| DistrictMapping("Bukomansimbi", ["Bukomansimbi"]), | |
| DistrictMapping("Lwengo", ["Lwengo"]), | |
| DistrictMapping("Sembabule", ["Sembabule"]), | |
| DistrictMapping("Rakai", ["Rakai"]), | |
| DistrictMapping("Kyotera", ["Kyotera"]), | |
| DistrictMapping("Mpigi", ["Mpigi"]), | |
| DistrictMapping("Butambala", ["Butambala"]), | |
| DistrictMapping("Gomba", ["Gomba"]), | |
| DistrictMapping("Mityana", ["Mityana"]), | |
| DistrictMapping("Mubende", ["Mubende"]), | |
| DistrictMapping("Kassanda", ["Kassanda"]), | |
| DistrictMapping("Kiboga", ["Kiboga"]), | |
| DistrictMapping("Kyankwanzi", ["Kyankwanzi"]), | |
| DistrictMapping("Hoima", ["Hoima"]), | |
| DistrictMapping("Kikuube", ["Kikuube"]), | |
| DistrictMapping("Kakumiro", ["Kakumiro"]), | |
| DistrictMapping("Kibaale", ["Kibaale"]), | |
| DistrictMapping("Kagadi", ["Kagadi"]), | |
| DistrictMapping("Buliisa", ["Buliisa"]), | |
| DistrictMapping("Masindi", ["Masindi"]), | |
| DistrictMapping("Kiryandongo", ["Kiryandongo"]), | |
| DistrictMapping("Buliisa", ["Buliisa"]), | |
| DistrictMapping("Pakwach", ["Pakwach"]), | |
| DistrictMapping("Nebbi", ["Nebbi"]), | |
| DistrictMapping("Zombo", ["Zombo"]), | |
| DistrictMapping("Arua", ["Arua"]), | |
| DistrictMapping("Terego", ["Terego"]), | |
| DistrictMapping("Madi-Okollo", ["Madi-Okollo"]), | |
| DistrictMapping("Obongi", ["Obongi"]), | |
| DistrictMapping("Moyo", ["Moyo"]), | |
| DistrictMapping("Yumbe", ["Yumbe"]), | |
| DistrictMapping("Koboko", ["Koboko"]), | |
| DistrictMapping("Maracha", ["Maracha"]), | |
| DistrictMapping("Adjumani", ["Adjumani"]), | |
| # South Western Region | |
| DistrictMapping("Mbarara", ["Mbarara"]), | |
| DistrictMapping("Ibanda", ["Ibanda"]), | |
| DistrictMapping("Isingiro", ["Isingiro"]), | |
| DistrictMapping("Kiruhura", ["Kiruhura"]), | |
| DistrictMapping("Kazo", ["Kazo"]), | |
| DistrictMapping("Ntungamo", ["Ntungamo"]), | |
| DistrictMapping("Rwampara", ["Rwampara"]), | |
| DistrictMapping("Rubanda", ["Rubanda"]), | |
| DistrictMapping("Rukiga", ["Rukiga"]), | |
| DistrictMapping("Kanungu", ["Kanungu"]), | |
| DistrictMapping("Rukungiri", ["Rukungiri"]), | |
| DistrictMapping("Kisoro", ["Kisoro"]), | |
| DistrictMapping("Bundibugyo", ["Bundibugyo"]), | |
| DistrictMapping("Ntoroko", ["Ntoroko"]), | |
| DistrictMapping("Kasese", ["Kasese"]), | |
| DistrictMapping("Bunyangabu", ["Bunyangabu"]), | |
| DistrictMapping("Fort Portal", ["Fort Portal"]), | |
| DistrictMapping("Kabarole", ["Kabarole"]), | |
| DistrictMapping("Kyenjojo", ["Kyenjojo"]), | |
| DistrictMapping("Kamwenge", ["Kamwenge"]), | |
| DistrictMapping("Kitagwenda", ["Kitagwenda"]), | |
| DistrictMapping("Kyegegwa", ["Kyegegwa"]), | |
| DistrictMapping("Mitooma", ["Mitooma"]), | |
| DistrictMapping("Rubirizi", ["Rubirizi"]), | |
| DistrictMapping("Sheema", ["Sheema"]), | |
| DistrictMapping("Bushenyi", ["Bushenyi"]), | |
| # Special cases | |
| DistrictMapping("Kalangala", ["Kalangala", "Kalangala DLG"]), | |
| ] | |
| # Create mapping dictionary | |
| mapping_dict = {} | |
| for district in districts: | |
| mapping_dict[district.name.lower()] = district | |
| for alias in district.aliases: | |
| mapping_dict[alias.lower()] = district | |
| return mapping_dict | |
| def _initialize_ministry_mappings(self) -> Dict[str, str]: | |
| """Initialize ministry and organization mappings""" | |
| return { | |
| "maaif": "Ministry of Agriculture, Animal Industry and Fisheries", | |
| "mwts": "Ministry of Works and Transport", | |
| "kcca": "Kampala Capital City Authority", | |
| "oag": "Office of the Auditor General", | |
| "arsdp": "Albertine Regional Sustainable Development Project", | |
| "avcdp": "Agriculture Value Chain Development Project", | |
| "ida": "International Development Association", | |
| "dlg": "District Local Government", | |
| "lg": "Local Government", | |
| } | |
| def _extract_district_from_filename(self, filename: str) -> Optional[str]: | |
| """Extract district from filename using pattern matching""" | |
| filename_lower = filename.lower() | |
| # Check for explicit district mentions | |
| for key, district_mapping in self.district_mappings.items(): | |
| if key in filename_lower: | |
| return district_mapping.name | |
| # Check for ministry/organization patterns that are NOT districts | |
| for ministry_key in self.ministry_mappings.keys(): | |
| if ministry_key in filename_lower: | |
| return None # This is a ministry, not a district | |
| # Check for patterns like "District Local Government" | |
| district_pattern = r'(\w+)\s+district\s+local\s+government' | |
| match = re.search(district_pattern, filename_lower) | |
| if match: | |
| district_name = match.group(1).title() | |
| if district_name.lower() in self.district_mappings: | |
| return self.district_mappings[district_name.lower()].name | |
| # Check for patterns like "DLG Report" | |
| dlg_pattern = r'(\w+)\s+dlg\s+report' | |
| match = re.search(dlg_pattern, filename_lower) | |
| if match: | |
| district_name = match.group(1).title() | |
| if district_name.lower() in self.district_mappings: | |
| return self.district_mappings[district_name.lower()].name | |
| return None | |
| def _infer_district_with_llm(self, filename: str) -> Optional[str]: | |
| """Use LLM to infer district from filename when pattern matching fails""" | |
| # For now, return None - LLM integration can be added later | |
| logger.info(f"LLM inference needed for filename: {filename}") | |
| return None | |
| def infer_district(self, filename: str) -> Optional[str]: | |
| """Main method to infer district from filename""" | |
| # First try pattern matching | |
| district = self._extract_district_from_filename(filename) | |
| if district: | |
| return district | |
| # If pattern matching fails, use LLM | |
| return self._infer_district_with_llm(filename) | |
| def fetch_chunks_batch(self, batch_size: int = 100, offset: int = 0) -> List[Dict]: | |
| """Fetch a batch of chunks from Qdrant (metadata only)""" | |
| try: | |
| # Import Qdrant client when needed | |
| if self.qdrant_client is None: | |
| self.qdrant_client = QdrantClient( | |
| url=self.config["qdrant"]["url"], | |
| api_key=self.config["qdrant"]["api_key"] | |
| ) | |
| # Get points with metadata only (no vectors) | |
| points = self.qdrant_client.scroll( | |
| collection_name=self.collection_name, | |
| limit=batch_size, | |
| offset=offset, | |
| with_payload=True, | |
| with_vectors=False | |
| )[0] | |
| return points | |
| except Exception as e: | |
| logger.error(f"Failed to fetch batch: {e}") | |
| return [] | |
| def update_chunks_with_district(self, points: List[Dict]) -> int: | |
| """Update chunks with district metadata""" | |
| updated_count = 0 | |
| # Import Qdrant client when needed | |
| if self.qdrant_client is None: | |
| from qdrant_client import QdrantClient | |
| self.qdrant_client = QdrantClient( | |
| url=self.config["qdrant"]["url"], | |
| api_key=self.config["qdrant"]["api_key"] | |
| ) | |
| for point in points: | |
| try: | |
| point_id = point.id | |
| metadata = point.payload.get("metadata", {}) | |
| filename = metadata.get("filename", "") | |
| if not filename: | |
| logger.warning(f"Point {point_id} has no filename") | |
| continue | |
| # Infer district | |
| district = self.infer_district(filename) | |
| # Update metadata | |
| updated_metadata = metadata.copy() | |
| updated_metadata["district"] = district | |
| # Update point in Qdrant | |
| self.qdrant_client.set_payload( | |
| collection_name=self.collection_name, | |
| payload={"metadata": updated_metadata}, | |
| points=[point_id] | |
| ) | |
| updated_count += 1 | |
| logger.info(f"Updated point {point_id}: {filename} -> {district}") | |
| except Exception as e: | |
| logger.error(f"Failed to update point {point_id}: {e}") | |
| return updated_count | |
| def process_all_chunks(self, batch_size: int = 100): | |
| """Process all chunks in batches""" | |
| total_updated = 0 | |
| offset = 0 | |
| logger.info(f"Starting to process chunks in batches of {batch_size}") | |
| while True: | |
| # Fetch batch | |
| points = self.fetch_chunks_batch(batch_size, offset) | |
| if not points: | |
| break | |
| logger.info(f"Processing batch: {len(points)} points (offset: {offset})") | |
| # Update batch | |
| updated_count = self.update_chunks_with_district(points) | |
| total_updated += updated_count | |
| logger.info(f"Updated {updated_count} points in this batch") | |
| # Move to next batch | |
| offset += batch_size | |
| logger.info(f"Total updated: {total_updated} points") | |
| return total_updated | |
| def main(): | |
| """Main function to run the district metadata processor""" | |
| try: | |
| processor = DistrictMetadataProcessor() | |
| # Test with a small batch first | |
| logger.info("Testing with first 10 chunks...") | |
| test_points = processor.fetch_chunks_batch(10, 0) | |
| if test_points: | |
| logger.info("Test batch fetched successfully. Processing...") | |
| for point in test_points: | |
| filename = point.payload.get("metadata", {}).get("filename", "") | |
| district = processor.infer_district(filename) | |
| logger.info(f"Test: {filename} -> {district}") | |
| # Ask user if they want to proceed with full processing | |
| response = input("\nProceed with full processing? (y/n): ") | |
| if response.lower() == 'y': | |
| processor.process_all_chunks(batch_size=100) | |
| else: | |
| logger.info("Processing cancelled by user") | |
| except Exception as e: | |
| logger.error(f"Error in main: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |