audit_assistant / add_district_metadata.py
akryldigital's picture
demo version (#5)
40b7ffe verified
#!/usr/bin/env python3
"""
Script to add District metadata to Qdrant chunks based on filename analysis.
Handles Uganda districts, ministry mappings, and LLM inference for ambiguous cases.
"""
import re
import yaml
import logging
from dataclasses import dataclass
from typing import Dict, List, Optional
from qdrant_client import QdrantClient
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class DistrictMapping:
"""Mapping for district-related entities"""
name: str
aliases: List[str]
is_district: bool = True
class DistrictMetadataProcessor:
def __init__(self, config_path: str = "src/config/settings.yaml"):
# Load config manually
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Initialize Qdrant client (will be imported when needed)
self.llm_client = None
self.qdrant_client = None
self.collection_name = self.config["qdrant"]["collection_name"]
# Initialize district mappings
self.district_mappings = self._initialize_district_mappings()
self.ministry_mappings = self._initialize_ministry_mappings()
def _initialize_district_mappings(self) -> Dict[str, DistrictMapping]:
"""Initialize Uganda districts and their aliases"""
districts = [
# Central Region
DistrictMapping("Kampala", ["KCCA", "Kampala Capital City Authority"]),
DistrictMapping("Wakiso", ["Wakiso"]),
DistrictMapping("Mukono", ["Mukono"]),
DistrictMapping("Luweero", ["Luweero"]),
DistrictMapping("Nakaseke", ["Nakaseke"]),
DistrictMapping("Nakasongola", ["Nakasongola"]),
DistrictMapping("Kayunga", ["Kayunga"]),
DistrictMapping("Buikwe", ["Buikwe"]),
DistrictMapping("Buvuma", ["Buvuma"]),
# Northern Region
DistrictMapping("Gulu", ["Gulu", "Gulu DLG"]),
DistrictMapping("Kitgum", ["Kitgum"]),
DistrictMapping("Pader", ["Pader"]),
DistrictMapping("Agago", ["Agago"]),
DistrictMapping("Lamwo", ["Lamwo"]),
DistrictMapping("Nwoya", ["Nwoya"]),
DistrictMapping("Amuru", ["Amuru"]),
DistrictMapping("Omoro", ["Omoro"]),
DistrictMapping("Oyam", ["Oyam"]),
DistrictMapping("Kole", ["Kole"]),
DistrictMapping("Apac", ["Apac", "Apac District"]),
DistrictMapping("Lira", ["Lira"]),
DistrictMapping("Alebtong", ["Alebtong"]),
DistrictMapping("Amolatar", ["Amolatar"]),
DistrictMapping("Dokolo", ["Dokolo"]),
DistrictMapping("Otuke", ["Otuke"]),
DistrictMapping("Kwania", ["Kwania"]),
# Eastern Region
DistrictMapping("Jinja", ["Jinja"]),
DistrictMapping("Kamuli", ["Kamuli"]),
DistrictMapping("Iganga", ["Iganga"]),
DistrictMapping("Bugiri", ["Bugiri"]),
DistrictMapping("Mayuge", ["Mayuge"]),
DistrictMapping("Namayingo", ["Namayingo"]),
DistrictMapping("Busia", ["Busia"]),
DistrictMapping("Tororo", ["Tororo"]),
DistrictMapping("Pallisa", ["Pallisa"]),
DistrictMapping("Kumi", ["Kumi"]),
DistrictMapping("Bukedea", ["Bukedea"]),
DistrictMapping("Soroti", ["Soroti"]),
DistrictMapping("Serere", ["Serere"]),
DistrictMapping("Ngora", ["Ngora"]),
DistrictMapping("Kaberamaido", ["Kaberamaido"]),
DistrictMapping("Kalaki", ["Kalaki"]),
DistrictMapping("Kapelebyong", ["Kapelebyong"]),
DistrictMapping("Amuria", ["Amuria"]),
DistrictMapping("Katakwi", ["Katakwi"]),
DistrictMapping("Kotido", ["Kotido"]),
DistrictMapping("Abim", ["Abim"]),
DistrictMapping("Kaabong", ["Kaabong", "Kaabong District"]),
DistrictMapping("Karenga", ["Karenga"]),
DistrictMapping("Moroto", ["Moroto"]),
DistrictMapping("Napak", ["Napak"]),
DistrictMapping("Nabilatuk", ["Nabilatuk"]),
DistrictMapping("Amudat", ["Amudat"]),
DistrictMapping("Nakapiripirit", ["Nakapiripirit"]),
DistrictMapping("Bukwo", ["Bukwo"]),
DistrictMapping("Kween", ["Kween"]),
DistrictMapping("Kapchorwa", ["Kapchorwa"]),
DistrictMapping("Sironko", ["Sironko"]),
DistrictMapping("Manafwa", ["Manafwa"]),
DistrictMapping("Bududa", ["Bududa"]),
DistrictMapping("Mbale", ["Mbale"]),
DistrictMapping("Butaleja", ["Butaleja"]),
DistrictMapping("Namisindwa", ["Namisindwa"]),
DistrictMapping("Bulambuli", ["Bulambuli"]),
# Western Region
DistrictMapping("Masaka", ["Masaka"]),
DistrictMapping("Kalungu", ["Kalungu"]),
DistrictMapping("Bukomansimbi", ["Bukomansimbi"]),
DistrictMapping("Lwengo", ["Lwengo"]),
DistrictMapping("Sembabule", ["Sembabule"]),
DistrictMapping("Rakai", ["Rakai"]),
DistrictMapping("Kyotera", ["Kyotera"]),
DistrictMapping("Mpigi", ["Mpigi"]),
DistrictMapping("Butambala", ["Butambala"]),
DistrictMapping("Gomba", ["Gomba"]),
DistrictMapping("Mityana", ["Mityana"]),
DistrictMapping("Mubende", ["Mubende"]),
DistrictMapping("Kassanda", ["Kassanda"]),
DistrictMapping("Kiboga", ["Kiboga"]),
DistrictMapping("Kyankwanzi", ["Kyankwanzi"]),
DistrictMapping("Hoima", ["Hoima"]),
DistrictMapping("Kikuube", ["Kikuube"]),
DistrictMapping("Kakumiro", ["Kakumiro"]),
DistrictMapping("Kibaale", ["Kibaale"]),
DistrictMapping("Kagadi", ["Kagadi"]),
DistrictMapping("Buliisa", ["Buliisa"]),
DistrictMapping("Masindi", ["Masindi"]),
DistrictMapping("Kiryandongo", ["Kiryandongo"]),
DistrictMapping("Buliisa", ["Buliisa"]),
DistrictMapping("Pakwach", ["Pakwach"]),
DistrictMapping("Nebbi", ["Nebbi"]),
DistrictMapping("Zombo", ["Zombo"]),
DistrictMapping("Arua", ["Arua"]),
DistrictMapping("Terego", ["Terego"]),
DistrictMapping("Madi-Okollo", ["Madi-Okollo"]),
DistrictMapping("Obongi", ["Obongi"]),
DistrictMapping("Moyo", ["Moyo"]),
DistrictMapping("Yumbe", ["Yumbe"]),
DistrictMapping("Koboko", ["Koboko"]),
DistrictMapping("Maracha", ["Maracha"]),
DistrictMapping("Adjumani", ["Adjumani"]),
# South Western Region
DistrictMapping("Mbarara", ["Mbarara"]),
DistrictMapping("Ibanda", ["Ibanda"]),
DistrictMapping("Isingiro", ["Isingiro"]),
DistrictMapping("Kiruhura", ["Kiruhura"]),
DistrictMapping("Kazo", ["Kazo"]),
DistrictMapping("Ntungamo", ["Ntungamo"]),
DistrictMapping("Rwampara", ["Rwampara"]),
DistrictMapping("Rubanda", ["Rubanda"]),
DistrictMapping("Rukiga", ["Rukiga"]),
DistrictMapping("Kanungu", ["Kanungu"]),
DistrictMapping("Rukungiri", ["Rukungiri"]),
DistrictMapping("Kisoro", ["Kisoro"]),
DistrictMapping("Bundibugyo", ["Bundibugyo"]),
DistrictMapping("Ntoroko", ["Ntoroko"]),
DistrictMapping("Kasese", ["Kasese"]),
DistrictMapping("Bunyangabu", ["Bunyangabu"]),
DistrictMapping("Fort Portal", ["Fort Portal"]),
DistrictMapping("Kabarole", ["Kabarole"]),
DistrictMapping("Kyenjojo", ["Kyenjojo"]),
DistrictMapping("Kamwenge", ["Kamwenge"]),
DistrictMapping("Kitagwenda", ["Kitagwenda"]),
DistrictMapping("Kyegegwa", ["Kyegegwa"]),
DistrictMapping("Mitooma", ["Mitooma"]),
DistrictMapping("Rubirizi", ["Rubirizi"]),
DistrictMapping("Sheema", ["Sheema"]),
DistrictMapping("Bushenyi", ["Bushenyi"]),
# Special cases
DistrictMapping("Kalangala", ["Kalangala", "Kalangala DLG"]),
]
# Create mapping dictionary
mapping_dict = {}
for district in districts:
mapping_dict[district.name.lower()] = district
for alias in district.aliases:
mapping_dict[alias.lower()] = district
return mapping_dict
def _initialize_ministry_mappings(self) -> Dict[str, str]:
"""Initialize ministry and organization mappings"""
return {
"maaif": "Ministry of Agriculture, Animal Industry and Fisheries",
"mwts": "Ministry of Works and Transport",
"kcca": "Kampala Capital City Authority",
"oag": "Office of the Auditor General",
"arsdp": "Albertine Regional Sustainable Development Project",
"avcdp": "Agriculture Value Chain Development Project",
"ida": "International Development Association",
"dlg": "District Local Government",
"lg": "Local Government",
}
def _extract_district_from_filename(self, filename: str) -> Optional[str]:
"""Extract district from filename using pattern matching"""
filename_lower = filename.lower()
# Check for explicit district mentions
for key, district_mapping in self.district_mappings.items():
if key in filename_lower:
return district_mapping.name
# Check for ministry/organization patterns that are NOT districts
for ministry_key in self.ministry_mappings.keys():
if ministry_key in filename_lower:
return None # This is a ministry, not a district
# Check for patterns like "District Local Government"
district_pattern = r'(\w+)\s+district\s+local\s+government'
match = re.search(district_pattern, filename_lower)
if match:
district_name = match.group(1).title()
if district_name.lower() in self.district_mappings:
return self.district_mappings[district_name.lower()].name
# Check for patterns like "DLG Report"
dlg_pattern = r'(\w+)\s+dlg\s+report'
match = re.search(dlg_pattern, filename_lower)
if match:
district_name = match.group(1).title()
if district_name.lower() in self.district_mappings:
return self.district_mappings[district_name.lower()].name
return None
def _infer_district_with_llm(self, filename: str) -> Optional[str]:
"""Use LLM to infer district from filename when pattern matching fails"""
# For now, return None - LLM integration can be added later
logger.info(f"LLM inference needed for filename: {filename}")
return None
def infer_district(self, filename: str) -> Optional[str]:
"""Main method to infer district from filename"""
# First try pattern matching
district = self._extract_district_from_filename(filename)
if district:
return district
# If pattern matching fails, use LLM
return self._infer_district_with_llm(filename)
def fetch_chunks_batch(self, batch_size: int = 100, offset: int = 0) -> List[Dict]:
"""Fetch a batch of chunks from Qdrant (metadata only)"""
try:
# Import Qdrant client when needed
if self.qdrant_client is None:
self.qdrant_client = QdrantClient(
url=self.config["qdrant"]["url"],
api_key=self.config["qdrant"]["api_key"]
)
# Get points with metadata only (no vectors)
points = self.qdrant_client.scroll(
collection_name=self.collection_name,
limit=batch_size,
offset=offset,
with_payload=True,
with_vectors=False
)[0]
return points
except Exception as e:
logger.error(f"Failed to fetch batch: {e}")
return []
def update_chunks_with_district(self, points: List[Dict]) -> int:
"""Update chunks with district metadata"""
updated_count = 0
# Import Qdrant client when needed
if self.qdrant_client is None:
from qdrant_client import QdrantClient
self.qdrant_client = QdrantClient(
url=self.config["qdrant"]["url"],
api_key=self.config["qdrant"]["api_key"]
)
for point in points:
try:
point_id = point.id
metadata = point.payload.get("metadata", {})
filename = metadata.get("filename", "")
if not filename:
logger.warning(f"Point {point_id} has no filename")
continue
# Infer district
district = self.infer_district(filename)
# Update metadata
updated_metadata = metadata.copy()
updated_metadata["district"] = district
# Update point in Qdrant
self.qdrant_client.set_payload(
collection_name=self.collection_name,
payload={"metadata": updated_metadata},
points=[point_id]
)
updated_count += 1
logger.info(f"Updated point {point_id}: {filename} -> {district}")
except Exception as e:
logger.error(f"Failed to update point {point_id}: {e}")
return updated_count
def process_all_chunks(self, batch_size: int = 100):
"""Process all chunks in batches"""
total_updated = 0
offset = 0
logger.info(f"Starting to process chunks in batches of {batch_size}")
while True:
# Fetch batch
points = self.fetch_chunks_batch(batch_size, offset)
if not points:
break
logger.info(f"Processing batch: {len(points)} points (offset: {offset})")
# Update batch
updated_count = self.update_chunks_with_district(points)
total_updated += updated_count
logger.info(f"Updated {updated_count} points in this batch")
# Move to next batch
offset += batch_size
logger.info(f"Total updated: {total_updated} points")
return total_updated
def main():
"""Main function to run the district metadata processor"""
try:
processor = DistrictMetadataProcessor()
# Test with a small batch first
logger.info("Testing with first 10 chunks...")
test_points = processor.fetch_chunks_batch(10, 0)
if test_points:
logger.info("Test batch fetched successfully. Processing...")
for point in test_points:
filename = point.payload.get("metadata", {}).get("filename", "")
district = processor.infer_district(filename)
logger.info(f"Test: {filename} -> {district}")
# Ask user if they want to proceed with full processing
response = input("\nProceed with full processing? (y/n): ")
if response.lower() == 'y':
processor.process_all_chunks(batch_size=100)
else:
logger.info("Processing cancelled by user")
except Exception as e:
logger.error(f"Error in main: {e}")
raise
if __name__ == "__main__":
main()