File size: 11,262 Bytes
68356aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3

"""
create_mock_data_csv.py

Fetches random user data from randomuser.me (or a similar service) and creates
mock data in CSV format that imitates having multiple 'Profiles' and multiple
'Identity' rows. Each row in the CSV represents an Identity and includes:
- a parent Profile ID and Profile name,
- first_name, last_name, birth_year, etc.,
- and possibly random typos in selected fields (based on a user-defined percentage).

Usage example:
    python create_mock_data_csv.py --num_profiles=100 --typo_percentage=10 --output_file="mock_data.csv"
"""

import requests
import random
import logging
import argparse
import csv
import uuid
import numpy as np
# If you use the nicknames library: pip install nicknames
# from nicknames import NickNamer
# For demonstration, let's fallback gracefully if not installed.

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

try:
    from nicknames import NickNamer
    NICKNAMES_AVAILABLE = True
except ImportError:
    NICKNAMES_AVAILABLE = False
    logger.warning("nicknames library is not installed. Nickname feature will be limited.")


def fetch_random_users(num_profiles):
    """
    Fetch random user data from the randomuser.me API.
    Returns a list of user dicts with relevant attributes.
    """
    url = f"https://randomuser.me/api/?results={num_profiles}&nat=us"
    response = requests.get(url)
    response.raise_for_status()
    data = response.json()
    return data.get("results", [])


class User:
    """
    Simple container for user data fetched from randomuser.me,
    plus logic for generating nicknames, emails, phone numbers,
    and introducing random typos.
    """
    def __init__(self, user_data):
        self.user_data = user_data

        # Extract basic info
        self.name_data = user_data.get("name", {})
        self.first_name = self.name_data.get("first", "Unknown")
        self.last_name = self.name_data.get("last", "Unknown")
        self.nickname = self._choose_nickname()

        dob = user_data.get("dob", {})
        self.birth_year = str(dob.get("date", "")[:4])  # 'YYYY-MM-DD...' -> 'YYYY'

        # Email address: random combination of first, last, year, etc.
        self.email_address = self._generate_email()

        # Phone number: just digits from the API phone.
        phone_raw = user_data.get("phone", "")
        self.phone_number = "".join(filter(str.isdigit, phone_raw))

        # Address fields
        location = user_data.get("location", {})
        self.street_number = str(location.get("street", {}).get("number", ""))
        self.street_name = location.get("street", {}).get("name", "")
        self.city = location.get("city", "")
        self.state = location.get("state", "")
        self.country = location.get("country", "")
        self.zip_code = str(location.get("postcode", ""))

    @property
    def full_name(self):
        return f"{self.first_name} {self.last_name}"

    @property
    def full_address(self):
        return f"{self.street_number} {self.street_name}, {self.city}, {self.state} {self.zip_code}"

    def _choose_nickname(self):
        """
        Uses the nicknames library if available, otherwise falls back to the first name.
        """
        if NICKNAMES_AVAILABLE:
            nn = NickNamer()
            possible_nicknames = nn.nicknames_of(self.first_name)
            if possible_nicknames:
                return random.choice(list(possible_nicknames))
        return self.first_name

    def _generate_email(self):
        domain = random.choice(["gmail", "yahoo", "hotmail", "outlook"])
        first_part = random.choice([self.first_name, self.nickname, self.first_name[:1]])
        last_part = random.choice([self.last_name, self.last_name[:1]])
        optional = random.choice(["", self.birth_year, self.birth_year[-2:], str(random.randint(1, 100))])
        return f"{first_part}{last_part}{optional}@{domain}.com".lower()

    def add_typo(self, property_name):
        """
        Introduce a random typo into the specified property (e.g. 'first_name').
        If property_name == 'full_address', we randomly pick an address field to modify.
        """
        if property_name == "full_address":
            property_name = random.choice(
                ["street_number", "street_name", "city", "state", "zip_code"]
            )

        current_value = getattr(self, property_name, None)
        if not current_value or not isinstance(current_value, str):
            return  # If it's empty or not a string, skip

        original_value = current_value
        new_value = self._apply_random_typo(current_value)
        setattr(self, property_name, new_value)
        logger.debug(f"Applying typo: {property_name}: '{original_value}' -> '{new_value}'")

    def _apply_random_typo(self, text):
        """
        Introduce a random single-character error (delete, swap, insert, replace)
        or regenerate an email.
        """
        if not text:
            return text

        option = random.choice(["delete", "swap", "insert", "replace"])

        # If email, sometimes just regenerate the entire email.
        if "@" in text:
            # 1 in 3 chance we fully regenerate the email.
            if random.random() < 0.33:
                return self._generate_email()

        if len(text) == 1:
            # If we have only one character, we can only do replace or insert.
            option = random.choice(["insert", "replace"])

        index = random.randint(0, len(text) - 1)

        if option == "delete":
            # Remove 1 char
            return text[:index] + text[index+1:]

        elif option == "swap":
            # Swap with the next char if possible
            if index < len(text) - 1:
                # swap
                lst = list(text)
                lst[index], lst[index+1] = lst[index+1], lst[index]
                return "".join(lst)
            else:
                # fallback to replace if we can't swap
                letter = random.choice("abcdefghijklmnopqrstuvwxyz")
                return text[:index] + letter + text[index+1:]

        elif option == "insert":
            # Insert a random letter at index
            letter = random.choice("abcdefghijklmnopqrstuvwxyz")
            return text[:index] + letter + text[index:]

        elif option == "replace":
            letter = random.choice("abcdefghijklmnopqrstuvwxyz")
            return text[:index] + letter + text[index+1:]

        # Fallback: no change
        return text


def main(num_profiles, typo_percentage, output_file):
    """
    1) Fetch random user data from randomuser.me
    2) For each user, create 1..N 'Profile' nodes
    3) For each 'Profile', create 1..M 'Identities'
    4) Introduce random typos in selected fields
    5) Write all Identity rows to CSV, including their associated Profile info
    """
    logger.info(f"Generating mock data for {num_profiles} profiles...")
    api_data = fetch_random_users(num_profiles)

    rows_to_write = []

    # The number of identity nodes depends on random gaussian logic or your own preference
    # e.g. a normal distribution around 8 with std=5, clipped to positives
    # We'll keep the same approach from the original script.
    for data in api_data:
        user = User(data)

        # random number of Identities
        num_ids = abs(int(np.random.normal(8, 5)))  # e.g. mean=8, std=5

        # pick how many distinct "Profile" nodes each user might produce
        # (in the original code, we used some logic to decide 1 or 2 or 3 profiles)
        if num_ids > 4:
            num_profiles_for_user = random.choice([1, 1, 1, 2, 2, 3])
        else:
            num_profiles_for_user = 1

        # Create the Profile IDs and store them
        profile_ids = [str(uuid.uuid4()) for _ in range(num_profiles_for_user)]
        profile_name = user.full_name  # in the original script, we used the same name for each 'Profile'

        # We'll distribute the Identity rows across these profiles
        profile_idx = 0

        for i in range(num_ids):
            # If the fraction i/num_ids > fraction dividing the profiles,
            # move to next profile. (just a simple distribution approach)
            if num_profiles_for_user > 1:
                if i / num_ids > (profile_idx + 1) / num_profiles_for_user:
                    profile_idx += 1

            current_profile_id = profile_ids[profile_idx]

            # Possibly apply a typo
            # For each new identity row (beyond the first?), there's a chance to add a typo
            if i > 0 and random.random() < (typo_percentage / 100.0):
                # choose a random field
                possible_fields = ["first_name", "last_name", "email_address",
                                   "phone_number", "full_address", "birth_year"]
                chosen_field = random.choice(possible_fields)
                user.add_typo(chosen_field)

            # Create a row for the Identity
            identity_id = str(uuid.uuid4())
            row = {
                "profile_id": current_profile_id,
                "profile_name": profile_name,
                "identity_id": identity_id,
                "first_name": user.first_name,
                "last_name": user.last_name,
                "nickname": user.nickname,
                "birth_year": user.birth_year,
                "email_address": user.email_address,
                "phone_number": user.phone_number,
                "street_number": user.street_number,
                "street_name": user.street_name,
                "city": user.city,
                "state": user.state,
                "country": user.country,
                "zip_code": user.zip_code
            }
            rows_to_write.append(row)

    # Now write the CSV
    fieldnames = [
        "profile_id",
        "profile_name",
        "identity_id",
        "first_name",
        "last_name",
        "nickname",
        "birth_year",
        "email_address",
        "phone_number",
        "street_number",
        "street_name",
        "city",
        "state",
        "country",
        "zip_code"
    ]

    logger.info(f"Writing {len(rows_to_write)} rows to {output_file}...")

    with open(output_file, mode="w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows_to_write)

    logger.info("Finished writing CSV mock data.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Generate mock entity-resolution data in CSV format."
    )
    parser.add_argument("--num_profiles", type=int, default=10,
                        help="Number of random 'users' to fetch from randomuser.me (default 10).")
    parser.add_argument("--typo_percentage", type=float, default=10.0,
                        help="Chance (0..100) that each new Identity row (beyond the first) has a random typo (default 10%).")
    parser.add_argument("--output_file", type=str, default="mock_data.csv",
                        help="Output CSV filename (default 'mock_data.csv').")

    args = parser.parse_args()
    main(args.num_profiles, args.typo_percentage, args.output_file)