| import csv | |
| import math | |
| import time | |
| import sys | |
| import os | |
| statedatafile = "20260106_uszips.csv" | |
| ziplist = "813_customer_ziplist.txt" | |
| servicecenters = "813_service_locations.txt" | |
| outputfile1 = "report1.txt" | |
| outputfile2 = "report2.txt" | |
| errorfile = "errorfile.txt" | |
| EARTH_RADIUS_MILES = 3958.7613 | |
| def _normalize_zip(zip_code: str) -> str: | |
| if zip_code is None: | |
| return "" | |
| z = str(zip_code).strip().strip('"').strip("'") | |
| if not z: | |
| return "" | |
| if "-" in z: | |
| z = z.split("-", 1)[0] | |
| digits = "".join(ch for ch in z if ch.isdigit()) | |
| if len(digits) == 4: | |
| digits = "0" + digits | |
| if len(digits) >= 5: | |
| digits = digits[:5] | |
| return digits | |
| def _load_zip_locations(data_path: str): | |
| keep = {} | |
| loaded = 0 | |
| ext = os.path.splitext(data_path)[1].lower() | |
| if ext == ".csv": | |
| with open(data_path, newline="", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| if not reader.fieldnames: | |
| raise ValueError("CSV file appears to be missing a header row") | |
| for row in reader: | |
| z = _normalize_zip(row.get("zip", "")) | |
| if not z: | |
| continue | |
| try: | |
| lat = float(row.get("lat", "")) | |
| lng = float(row.get("lng", "")) | |
| except (TypeError, ValueError): | |
| continue | |
| lat_rad = math.radians(lat) | |
| lng_rad = math.radians(lng) | |
| keep[z] = (lat_rad, lng_rad, math.cos(lat_rad)) | |
| loaded += 1 | |
| return keep, loaded | |
| with open(data_path, "r", encoding="utf-8", errors="replace") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| parts = line.split("|") | |
| if len(parts) < 3: | |
| continue | |
| z = _normalize_zip(parts[0]) | |
| if not z: | |
| continue | |
| try: | |
| x = float(parts[1]) | |
| y = float(parts[2]) | |
| except ValueError: | |
| continue | |
| keep[z] = (x, y, None) | |
| loaded += 1 | |
| return keep, loaded | |
| def _distance_between(a, b) -> int: | |
| if a[2] is None or b[2] is None: | |
| difx = abs(a[0] - b[0]) | |
| dify = abs(a[1] - b[1]) | |
| return int((math.sqrt((difx * difx) + (dify * dify))) / 2) | |
| dlat = b[0] - a[0] | |
| dlon = b[1] - a[1] | |
| sin_dlat = math.sin(dlat / 2.0) | |
| sin_dlon = math.sin(dlon / 2.0) | |
| h = (sin_dlat * sin_dlat) + (a[2] * b[2] * (sin_dlon * sin_dlon)) | |
| c = 2.0 * math.asin(min(1.0, math.sqrt(h))) | |
| return int(EARTH_RADIUS_MILES * c) | |
| def process_data(): | |
| keep = {} | |
| try: | |
| keep, loaded_locations = _load_zip_locations(statedatafile) | |
| with open(errorfile, 'w') as error_file: | |
| error_file.write(f"ERROR REPORT\n{loaded_locations} Zip code locations successfully read from data file\n") | |
| except Exception as e: | |
| with open(errorfile, 'w') as error_file: | |
| error_file.write(f"Unable to open {statedatafile}: {str(e)}\n") | |
| raise Exception(f"Failed to read zipcode data: {str(e)}") | |
| zipunique = {} | |
| addup = {} | |
| zipfileerrorscount = {} | |
| try: | |
| with open(ziplist, 'r') as file: | |
| ziplisting = file.readlines() | |
| for line in ziplisting: | |
| line = line.strip() | |
| parts = line.split('\t') | |
| if len(parts) < 2: | |
| continue | |
| raw_clientzip, number = parts | |
| clientzip = _normalize_zip(raw_clientzip) | |
| if not clientzip: | |
| key = raw_clientzip.strip() if raw_clientzip.strip() else "[blank]" | |
| zipfileerrorscount[key] = zipfileerrorscount.get(key, 0) + 1 | |
| continue | |
| if clientzip not in keep: | |
| if not clientzip: | |
| clientzip = "[blank]" | |
| zipfileerrorscount[clientzip] = zipfileerrorscount.get(clientzip, 0) + 1 | |
| continue | |
| zipunique[clientzip] = 1 | |
| try: | |
| number = int(number) | |
| except ValueError: | |
| number = 0 | |
| addup[clientzip] = addup.get(clientzip, 0) + number | |
| with open(errorfile, 'a') as error_file: | |
| error_file.write(f"{len(ziplisting)} records with {len(zipunique)} unique Zip codes read from customer file\n") | |
| except Exception as e: | |
| with open(errorfile, 'a') as error_file: | |
| error_file.write(f"Unable to open {ziplist}: {str(e)}\n") | |
| raise Exception(f"Failed to read customer zip data: {str(e)}") | |
| ziplisting = list(zipunique.keys()) | |
| serviceloc = {} | |
| servicecentererror = {} | |
| try: | |
| with open(servicecenters, 'r') as file: | |
| servicec = file.readlines() | |
| with open(errorfile, 'a') as error_file: | |
| error_file.write(f"{len(servicec)} service centers records successfully read\n\n") | |
| for line2 in servicec: | |
| line2 = line2.strip() | |
| parts = line2.split('\t') | |
| if len(parts) < 5: | |
| continue | |
| name, address, city, state, zip_code = parts[0:5] | |
| raw_zip_code = zip_code | |
| zip_code = _normalize_zip(zip_code) | |
| if not zip_code: | |
| key = raw_zip_code.strip() if raw_zip_code.strip() else "[blank]" | |
| servicecentererror[key] = 1 | |
| continue | |
| if zip_code not in keep: | |
| servicecentererror[zip_code] = 1 | |
| continue | |
| serviceloc[zip_code] = 1 | |
| except Exception as e: | |
| with open(errorfile, 'a') as error_file: | |
| error_file.write(f"Unable to open {servicecenters}: {str(e)}\n") | |
| raise Exception(f"Failed to read service center data: {str(e)}") | |
| locationkeys = list(serviceloc.keys()) | |
| recordstokeep = {} | |
| closest = {} | |
| try: | |
| with open(outputfile1, 'w') as output_file: | |
| for element in ziplisting: | |
| element = element.strip() | |
| start_coords = keep[element] | |
| distances = [] | |
| for location in locationkeys: | |
| location = location.strip() | |
| if len(location) == 4: | |
| location = "0" + location | |
| if location not in keep: | |
| continue | |
| end_coords = keep[location] | |
| distances.append(_distance_between(start_coords, end_coords)) | |
| distances.sort() | |
| if len(distances) >= 3: | |
| top_three = [distances[0]+1, distances[1]+1, distances[2]+1] | |
| recordstokeep[element] = f"{element}\t{addup[element]}\t{top_three[0]},{top_three[1]},{top_three[2]}" | |
| closest[top_three[0]] = closest.get(top_three[0], 0) + addup[element] | |
| else: | |
| available = [d+1 for d in distances] | |
| while len(available) < 3: | |
| available.append(99999) | |
| recordstokeep[element] = f"{element}\t{addup[element]}\t{available[0]},{available[1]},{available[2]}" | |
| if available[0] != 99999: | |
| closest[available[0]] = closest.get(available[0], 0) + addup[element] | |
| for key in sorted(recordstokeep.keys()): | |
| output_file.write(f"{recordstokeep[key]}\n") | |
| except Exception as e: | |
| with open(errorfile, 'a') as error_file: | |
| error_file.write(f"Error processing distances: {str(e)}\n") | |
| raise Exception(f"Failed to process distances: {str(e)}") | |
| try: | |
| with open(errorfile, 'a') as error_file: | |
| for key in sorted(zipfileerrorscount.keys()): | |
| error_file.write(f"Customer List File Error: The zip code {key} is not valid. It occurs {zipfileerrorscount[key]} times.\n") | |
| for key in servicecentererror: | |
| if not key: | |
| key = "[blank]" | |
| error_file.write(f"Service Center File Error: The zip code {key} not valid.\n") | |
| except Exception as e: | |
| print(f"Warning: Unable to write to error file: {str(e)}") | |
| try: | |
| with open(outputfile2, 'w') as output_file: | |
| for key in sorted(closest.keys()): | |
| output_file.write(f"{key}\t{closest[key]}\n") | |
| except Exception as e: | |
| with open(errorfile, 'a') as error_file: | |
| error_file.write(f"Unable to write to {outputfile2}: {str(e)}\n") | |
| raise Exception(f"Failed to write customer distribution report: {str(e)}") | |
| return True | |
| if __name__ == "__main__": | |
| try: | |
| process_data() | |
| print(f"Processing complete. Execution time: {time.process_time()}") | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| sys.exit(1) |