File size: 11,262 Bytes
68356aa | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 | #!/usr/bin/env python3
"""
create_mock_data_csv.py
Fetches random user data from randomuser.me (or a similar service) and creates
mock data in CSV format that imitates having multiple 'Profiles' and multiple
'Identity' rows. Each row in the CSV represents an Identity and includes:
- a parent Profile ID and Profile name,
- first_name, last_name, birth_year, etc.,
- and possibly random typos in selected fields (based on a user-defined percentage).
Usage example:
python create_mock_data_csv.py --num_profiles=100 --typo_percentage=10 --output_file="mock_data.csv"
"""
import requests
import random
import logging
import argparse
import csv
import uuid
import numpy as np
# If you use the nicknames library: pip install nicknames
# from nicknames import NickNamer
# For demonstration, let's fallback gracefully if not installed.
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
from nicknames import NickNamer
NICKNAMES_AVAILABLE = True
except ImportError:
NICKNAMES_AVAILABLE = False
logger.warning("nicknames library is not installed. Nickname feature will be limited.")
def fetch_random_users(num_profiles):
"""
Fetch random user data from the randomuser.me API.
Returns a list of user dicts with relevant attributes.
"""
url = f"https://randomuser.me/api/?results={num_profiles}&nat=us"
response = requests.get(url)
response.raise_for_status()
data = response.json()
return data.get("results", [])
class User:
"""
Simple container for user data fetched from randomuser.me,
plus logic for generating nicknames, emails, phone numbers,
and introducing random typos.
"""
def __init__(self, user_data):
self.user_data = user_data
# Extract basic info
self.name_data = user_data.get("name", {})
self.first_name = self.name_data.get("first", "Unknown")
self.last_name = self.name_data.get("last", "Unknown")
self.nickname = self._choose_nickname()
dob = user_data.get("dob", {})
self.birth_year = str(dob.get("date", "")[:4]) # 'YYYY-MM-DD...' -> 'YYYY'
# Email address: random combination of first, last, year, etc.
self.email_address = self._generate_email()
# Phone number: just digits from the API phone.
phone_raw = user_data.get("phone", "")
self.phone_number = "".join(filter(str.isdigit, phone_raw))
# Address fields
location = user_data.get("location", {})
self.street_number = str(location.get("street", {}).get("number", ""))
self.street_name = location.get("street", {}).get("name", "")
self.city = location.get("city", "")
self.state = location.get("state", "")
self.country = location.get("country", "")
self.zip_code = str(location.get("postcode", ""))
@property
def full_name(self):
return f"{self.first_name} {self.last_name}"
@property
def full_address(self):
return f"{self.street_number} {self.street_name}, {self.city}, {self.state} {self.zip_code}"
def _choose_nickname(self):
"""
Uses the nicknames library if available, otherwise falls back to the first name.
"""
if NICKNAMES_AVAILABLE:
nn = NickNamer()
possible_nicknames = nn.nicknames_of(self.first_name)
if possible_nicknames:
return random.choice(list(possible_nicknames))
return self.first_name
def _generate_email(self):
domain = random.choice(["gmail", "yahoo", "hotmail", "outlook"])
first_part = random.choice([self.first_name, self.nickname, self.first_name[:1]])
last_part = random.choice([self.last_name, self.last_name[:1]])
optional = random.choice(["", self.birth_year, self.birth_year[-2:], str(random.randint(1, 100))])
return f"{first_part}{last_part}{optional}@{domain}.com".lower()
def add_typo(self, property_name):
"""
Introduce a random typo into the specified property (e.g. 'first_name').
If property_name == 'full_address', we randomly pick an address field to modify.
"""
if property_name == "full_address":
property_name = random.choice(
["street_number", "street_name", "city", "state", "zip_code"]
)
current_value = getattr(self, property_name, None)
if not current_value or not isinstance(current_value, str):
return # If it's empty or not a string, skip
original_value = current_value
new_value = self._apply_random_typo(current_value)
setattr(self, property_name, new_value)
logger.debug(f"Applying typo: {property_name}: '{original_value}' -> '{new_value}'")
def _apply_random_typo(self, text):
"""
Introduce a random single-character error (delete, swap, insert, replace)
or regenerate an email.
"""
if not text:
return text
option = random.choice(["delete", "swap", "insert", "replace"])
# If email, sometimes just regenerate the entire email.
if "@" in text:
# 1 in 3 chance we fully regenerate the email.
if random.random() < 0.33:
return self._generate_email()
if len(text) == 1:
# If we have only one character, we can only do replace or insert.
option = random.choice(["insert", "replace"])
index = random.randint(0, len(text) - 1)
if option == "delete":
# Remove 1 char
return text[:index] + text[index+1:]
elif option == "swap":
# Swap with the next char if possible
if index < len(text) - 1:
# swap
lst = list(text)
lst[index], lst[index+1] = lst[index+1], lst[index]
return "".join(lst)
else:
# fallback to replace if we can't swap
letter = random.choice("abcdefghijklmnopqrstuvwxyz")
return text[:index] + letter + text[index+1:]
elif option == "insert":
# Insert a random letter at index
letter = random.choice("abcdefghijklmnopqrstuvwxyz")
return text[:index] + letter + text[index:]
elif option == "replace":
letter = random.choice("abcdefghijklmnopqrstuvwxyz")
return text[:index] + letter + text[index+1:]
# Fallback: no change
return text
def main(num_profiles, typo_percentage, output_file):
"""
1) Fetch random user data from randomuser.me
2) For each user, create 1..N 'Profile' nodes
3) For each 'Profile', create 1..M 'Identities'
4) Introduce random typos in selected fields
5) Write all Identity rows to CSV, including their associated Profile info
"""
logger.info(f"Generating mock data for {num_profiles} profiles...")
api_data = fetch_random_users(num_profiles)
rows_to_write = []
# The number of identity nodes depends on random gaussian logic or your own preference
# e.g. a normal distribution around 8 with std=5, clipped to positives
# We'll keep the same approach from the original script.
for data in api_data:
user = User(data)
# random number of Identities
num_ids = abs(int(np.random.normal(8, 5))) # e.g. mean=8, std=5
# pick how many distinct "Profile" nodes each user might produce
# (in the original code, we used some logic to decide 1 or 2 or 3 profiles)
if num_ids > 4:
num_profiles_for_user = random.choice([1, 1, 1, 2, 2, 3])
else:
num_profiles_for_user = 1
# Create the Profile IDs and store them
profile_ids = [str(uuid.uuid4()) for _ in range(num_profiles_for_user)]
profile_name = user.full_name # in the original script, we used the same name for each 'Profile'
# We'll distribute the Identity rows across these profiles
profile_idx = 0
for i in range(num_ids):
# If the fraction i/num_ids > fraction dividing the profiles,
# move to next profile. (just a simple distribution approach)
if num_profiles_for_user > 1:
if i / num_ids > (profile_idx + 1) / num_profiles_for_user:
profile_idx += 1
current_profile_id = profile_ids[profile_idx]
# Possibly apply a typo
# For each new identity row (beyond the first?), there's a chance to add a typo
if i > 0 and random.random() < (typo_percentage / 100.0):
# choose a random field
possible_fields = ["first_name", "last_name", "email_address",
"phone_number", "full_address", "birth_year"]
chosen_field = random.choice(possible_fields)
user.add_typo(chosen_field)
# Create a row for the Identity
identity_id = str(uuid.uuid4())
row = {
"profile_id": current_profile_id,
"profile_name": profile_name,
"identity_id": identity_id,
"first_name": user.first_name,
"last_name": user.last_name,
"nickname": user.nickname,
"birth_year": user.birth_year,
"email_address": user.email_address,
"phone_number": user.phone_number,
"street_number": user.street_number,
"street_name": user.street_name,
"city": user.city,
"state": user.state,
"country": user.country,
"zip_code": user.zip_code
}
rows_to_write.append(row)
# Now write the CSV
fieldnames = [
"profile_id",
"profile_name",
"identity_id",
"first_name",
"last_name",
"nickname",
"birth_year",
"email_address",
"phone_number",
"street_number",
"street_name",
"city",
"state",
"country",
"zip_code"
]
logger.info(f"Writing {len(rows_to_write)} rows to {output_file}...")
with open(output_file, mode="w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows_to_write)
logger.info("Finished writing CSV mock data.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate mock entity-resolution data in CSV format."
)
parser.add_argument("--num_profiles", type=int, default=10,
help="Number of random 'users' to fetch from randomuser.me (default 10).")
parser.add_argument("--typo_percentage", type=float, default=10.0,
help="Chance (0..100) that each new Identity row (beyond the first) has a random typo (default 10%).")
parser.add_argument("--output_file", type=str, default="mock_data.csv",
help="Output CSV filename (default 'mock_data.csv').")
args = parser.parse_args()
main(args.num_profiles, args.typo_percentage, args.output_file)
|