import csv import math import time import sys import os statedatafile = "20260106_uszips.csv" ziplist = "813_customer_ziplist.txt" servicecenters = "813_service_locations.txt" outputfile1 = "report1.txt" outputfile2 = "report2.txt" errorfile = "errorfile.txt" EARTH_RADIUS_MILES = 3958.7613 def _normalize_zip(zip_code: str) -> str: if zip_code is None: return "" z = str(zip_code).strip().strip('"').strip("'") if not z: return "" if "-" in z: z = z.split("-", 1)[0] digits = "".join(ch for ch in z if ch.isdigit()) if len(digits) == 4: digits = "0" + digits if len(digits) >= 5: digits = digits[:5] return digits def _load_zip_locations(data_path: str): keep = {} loaded = 0 ext = os.path.splitext(data_path)[1].lower() if ext == ".csv": with open(data_path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) if not reader.fieldnames: raise ValueError("CSV file appears to be missing a header row") for row in reader: z = _normalize_zip(row.get("zip", "")) if not z: continue try: lat = float(row.get("lat", "")) lng = float(row.get("lng", "")) except (TypeError, ValueError): continue lat_rad = math.radians(lat) lng_rad = math.radians(lng) keep[z] = (lat_rad, lng_rad, math.cos(lat_rad)) loaded += 1 return keep, loaded with open(data_path, "r", encoding="utf-8", errors="replace") as f: for line in f: line = line.strip() if not line: continue parts = line.split("|") if len(parts) < 3: continue z = _normalize_zip(parts[0]) if not z: continue try: x = float(parts[1]) y = float(parts[2]) except ValueError: continue keep[z] = (x, y, None) loaded += 1 return keep, loaded def _distance_between(a, b) -> int: if a[2] is None or b[2] is None: difx = abs(a[0] - b[0]) dify = abs(a[1] - b[1]) return int((math.sqrt((difx * difx) + (dify * dify))) / 2) dlat = b[0] - a[0] dlon = b[1] - a[1] sin_dlat = math.sin(dlat / 2.0) sin_dlon = math.sin(dlon / 2.0) h = (sin_dlat * sin_dlat) + (a[2] * b[2] * (sin_dlon * sin_dlon)) c = 2.0 * math.asin(min(1.0, math.sqrt(h))) return int(EARTH_RADIUS_MILES * c) def process_data(): keep = {} try: keep, loaded_locations = _load_zip_locations(statedatafile) with open(errorfile, 'w') as error_file: error_file.write(f"ERROR REPORT\n{loaded_locations} Zip code locations successfully read from data file\n") except Exception as e: with open(errorfile, 'w') as error_file: error_file.write(f"Unable to open {statedatafile}: {str(e)}\n") raise Exception(f"Failed to read zipcode data: {str(e)}") zipunique = {} addup = {} zipfileerrorscount = {} try: with open(ziplist, 'r') as file: ziplisting = file.readlines() for line in ziplisting: line = line.strip() parts = line.split('\t') if len(parts) < 2: continue raw_clientzip, number = parts clientzip = _normalize_zip(raw_clientzip) if not clientzip: key = raw_clientzip.strip() if raw_clientzip.strip() else "[blank]" zipfileerrorscount[key] = zipfileerrorscount.get(key, 0) + 1 continue if clientzip not in keep: if not clientzip: clientzip = "[blank]" zipfileerrorscount[clientzip] = zipfileerrorscount.get(clientzip, 0) + 1 continue zipunique[clientzip] = 1 try: number = int(number) except ValueError: number = 0 addup[clientzip] = addup.get(clientzip, 0) + number with open(errorfile, 'a') as error_file: error_file.write(f"{len(ziplisting)} records with {len(zipunique)} unique Zip codes read from customer file\n") except Exception as e: with open(errorfile, 'a') as error_file: error_file.write(f"Unable to open {ziplist}: {str(e)}\n") raise Exception(f"Failed to read customer zip data: {str(e)}") ziplisting = list(zipunique.keys()) serviceloc = {} servicecentererror = {} try: with open(servicecenters, 'r') as file: servicec = file.readlines() with open(errorfile, 'a') as error_file: error_file.write(f"{len(servicec)} service centers records successfully read\n\n") for line2 in servicec: line2 = line2.strip() parts = line2.split('\t') if len(parts) < 5: continue name, address, city, state, zip_code = parts[0:5] raw_zip_code = zip_code zip_code = _normalize_zip(zip_code) if not zip_code: key = raw_zip_code.strip() if raw_zip_code.strip() else "[blank]" servicecentererror[key] = 1 continue if zip_code not in keep: servicecentererror[zip_code] = 1 continue serviceloc[zip_code] = 1 except Exception as e: with open(errorfile, 'a') as error_file: error_file.write(f"Unable to open {servicecenters}: {str(e)}\n") raise Exception(f"Failed to read service center data: {str(e)}") locationkeys = list(serviceloc.keys()) recordstokeep = {} closest = {} try: with open(outputfile1, 'w') as output_file: for element in ziplisting: element = element.strip() start_coords = keep[element] distances = [] for location in locationkeys: location = location.strip() if len(location) == 4: location = "0" + location if location not in keep: continue end_coords = keep[location] distances.append(_distance_between(start_coords, end_coords)) distances.sort() if len(distances) >= 3: top_three = [distances[0]+1, distances[1]+1, distances[2]+1] recordstokeep[element] = f"{element}\t{addup[element]}\t{top_three[0]},{top_three[1]},{top_three[2]}" closest[top_three[0]] = closest.get(top_three[0], 0) + addup[element] else: available = [d+1 for d in distances] while len(available) < 3: available.append(99999) recordstokeep[element] = f"{element}\t{addup[element]}\t{available[0]},{available[1]},{available[2]}" if available[0] != 99999: closest[available[0]] = closest.get(available[0], 0) + addup[element] for key in sorted(recordstokeep.keys()): output_file.write(f"{recordstokeep[key]}\n") except Exception as e: with open(errorfile, 'a') as error_file: error_file.write(f"Error processing distances: {str(e)}\n") raise Exception(f"Failed to process distances: {str(e)}") try: with open(errorfile, 'a') as error_file: for key in sorted(zipfileerrorscount.keys()): error_file.write(f"Customer List File Error: The zip code {key} is not valid. It occurs {zipfileerrorscount[key]} times.\n") for key in servicecentererror: if not key: key = "[blank]" error_file.write(f"Service Center File Error: The zip code {key} not valid.\n") except Exception as e: print(f"Warning: Unable to write to error file: {str(e)}") try: with open(outputfile2, 'w') as output_file: for key in sorted(closest.keys()): output_file.write(f"{key}\t{closest[key]}\n") except Exception as e: with open(errorfile, 'a') as error_file: error_file.write(f"Unable to write to {outputfile2}: {str(e)}\n") raise Exception(f"Failed to write customer distribution report: {str(e)}") return True if __name__ == "__main__": try: process_data() print(f"Processing complete. Execution time: {time.process_time()}") except Exception as e: print(f"Error: {str(e)}") sys.exit(1)