GeoAccess / corelate.py
OKRN's picture
Upload 6 files
8c710bc verified
import csv
import math
import time
import sys
import os
statedatafile = "20260106_uszips.csv"
ziplist = "813_customer_ziplist.txt"
servicecenters = "813_service_locations.txt"
outputfile1 = "report1.txt"
outputfile2 = "report2.txt"
errorfile = "errorfile.txt"
EARTH_RADIUS_MILES = 3958.7613
def _normalize_zip(zip_code: str) -> str:
if zip_code is None:
return ""
z = str(zip_code).strip().strip('"').strip("'")
if not z:
return ""
if "-" in z:
z = z.split("-", 1)[0]
digits = "".join(ch for ch in z if ch.isdigit())
if len(digits) == 4:
digits = "0" + digits
if len(digits) >= 5:
digits = digits[:5]
return digits
def _load_zip_locations(data_path: str):
keep = {}
loaded = 0
ext = os.path.splitext(data_path)[1].lower()
if ext == ".csv":
with open(data_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
if not reader.fieldnames:
raise ValueError("CSV file appears to be missing a header row")
for row in reader:
z = _normalize_zip(row.get("zip", ""))
if not z:
continue
try:
lat = float(row.get("lat", ""))
lng = float(row.get("lng", ""))
except (TypeError, ValueError):
continue
lat_rad = math.radians(lat)
lng_rad = math.radians(lng)
keep[z] = (lat_rad, lng_rad, math.cos(lat_rad))
loaded += 1
return keep, loaded
with open(data_path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split("|")
if len(parts) < 3:
continue
z = _normalize_zip(parts[0])
if not z:
continue
try:
x = float(parts[1])
y = float(parts[2])
except ValueError:
continue
keep[z] = (x, y, None)
loaded += 1
return keep, loaded
def _distance_between(a, b) -> int:
if a[2] is None or b[2] is None:
difx = abs(a[0] - b[0])
dify = abs(a[1] - b[1])
return int((math.sqrt((difx * difx) + (dify * dify))) / 2)
dlat = b[0] - a[0]
dlon = b[1] - a[1]
sin_dlat = math.sin(dlat / 2.0)
sin_dlon = math.sin(dlon / 2.0)
h = (sin_dlat * sin_dlat) + (a[2] * b[2] * (sin_dlon * sin_dlon))
c = 2.0 * math.asin(min(1.0, math.sqrt(h)))
return int(EARTH_RADIUS_MILES * c)
def process_data():
keep = {}
try:
keep, loaded_locations = _load_zip_locations(statedatafile)
with open(errorfile, 'w') as error_file:
error_file.write(f"ERROR REPORT\n{loaded_locations} Zip code locations successfully read from data file\n")
except Exception as e:
with open(errorfile, 'w') as error_file:
error_file.write(f"Unable to open {statedatafile}: {str(e)}\n")
raise Exception(f"Failed to read zipcode data: {str(e)}")
zipunique = {}
addup = {}
zipfileerrorscount = {}
try:
with open(ziplist, 'r') as file:
ziplisting = file.readlines()
for line in ziplisting:
line = line.strip()
parts = line.split('\t')
if len(parts) < 2:
continue
raw_clientzip, number = parts
clientzip = _normalize_zip(raw_clientzip)
if not clientzip:
key = raw_clientzip.strip() if raw_clientzip.strip() else "[blank]"
zipfileerrorscount[key] = zipfileerrorscount.get(key, 0) + 1
continue
if clientzip not in keep:
if not clientzip:
clientzip = "[blank]"
zipfileerrorscount[clientzip] = zipfileerrorscount.get(clientzip, 0) + 1
continue
zipunique[clientzip] = 1
try:
number = int(number)
except ValueError:
number = 0
addup[clientzip] = addup.get(clientzip, 0) + number
with open(errorfile, 'a') as error_file:
error_file.write(f"{len(ziplisting)} records with {len(zipunique)} unique Zip codes read from customer file\n")
except Exception as e:
with open(errorfile, 'a') as error_file:
error_file.write(f"Unable to open {ziplist}: {str(e)}\n")
raise Exception(f"Failed to read customer zip data: {str(e)}")
ziplisting = list(zipunique.keys())
serviceloc = {}
servicecentererror = {}
try:
with open(servicecenters, 'r') as file:
servicec = file.readlines()
with open(errorfile, 'a') as error_file:
error_file.write(f"{len(servicec)} service centers records successfully read\n\n")
for line2 in servicec:
line2 = line2.strip()
parts = line2.split('\t')
if len(parts) < 5:
continue
name, address, city, state, zip_code = parts[0:5]
raw_zip_code = zip_code
zip_code = _normalize_zip(zip_code)
if not zip_code:
key = raw_zip_code.strip() if raw_zip_code.strip() else "[blank]"
servicecentererror[key] = 1
continue
if zip_code not in keep:
servicecentererror[zip_code] = 1
continue
serviceloc[zip_code] = 1
except Exception as e:
with open(errorfile, 'a') as error_file:
error_file.write(f"Unable to open {servicecenters}: {str(e)}\n")
raise Exception(f"Failed to read service center data: {str(e)}")
locationkeys = list(serviceloc.keys())
recordstokeep = {}
closest = {}
try:
with open(outputfile1, 'w') as output_file:
for element in ziplisting:
element = element.strip()
start_coords = keep[element]
distances = []
for location in locationkeys:
location = location.strip()
if len(location) == 4:
location = "0" + location
if location not in keep:
continue
end_coords = keep[location]
distances.append(_distance_between(start_coords, end_coords))
distances.sort()
if len(distances) >= 3:
top_three = [distances[0]+1, distances[1]+1, distances[2]+1]
recordstokeep[element] = f"{element}\t{addup[element]}\t{top_three[0]},{top_three[1]},{top_three[2]}"
closest[top_three[0]] = closest.get(top_three[0], 0) + addup[element]
else:
available = [d+1 for d in distances]
while len(available) < 3:
available.append(99999)
recordstokeep[element] = f"{element}\t{addup[element]}\t{available[0]},{available[1]},{available[2]}"
if available[0] != 99999:
closest[available[0]] = closest.get(available[0], 0) + addup[element]
for key in sorted(recordstokeep.keys()):
output_file.write(f"{recordstokeep[key]}\n")
except Exception as e:
with open(errorfile, 'a') as error_file:
error_file.write(f"Error processing distances: {str(e)}\n")
raise Exception(f"Failed to process distances: {str(e)}")
try:
with open(errorfile, 'a') as error_file:
for key in sorted(zipfileerrorscount.keys()):
error_file.write(f"Customer List File Error: The zip code {key} is not valid. It occurs {zipfileerrorscount[key]} times.\n")
for key in servicecentererror:
if not key:
key = "[blank]"
error_file.write(f"Service Center File Error: The zip code {key} not valid.\n")
except Exception as e:
print(f"Warning: Unable to write to error file: {str(e)}")
try:
with open(outputfile2, 'w') as output_file:
for key in sorted(closest.keys()):
output_file.write(f"{key}\t{closest[key]}\n")
except Exception as e:
with open(errorfile, 'a') as error_file:
error_file.write(f"Unable to write to {outputfile2}: {str(e)}\n")
raise Exception(f"Failed to write customer distribution report: {str(e)}")
return True
if __name__ == "__main__":
try:
process_data()
print(f"Processing complete. Execution time: {time.process_time()}")
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)