|
|
import requests |
|
|
import json |
|
|
import re |
|
|
from bs4 import BeautifulSoup |
|
|
from typing import List, Dict, Any, Tuple |
|
|
from utils import clean_time |
|
|
|
|
|
def scrape_workshops_from_squarespace(url: str) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Extract workshops using our robust Squarespace JSON + HTML parsing system |
|
|
""" |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
json_url = f"{url}?format=json" |
|
|
print(f"π Trying Squarespace JSON API: {json_url}") |
|
|
|
|
|
response = requests.get(json_url, headers=headers, timeout=10) |
|
|
if response.status_code == 200: |
|
|
try: |
|
|
json_data = response.json() |
|
|
workshops = extract_workshops_from_json(json_data, json_url) |
|
|
if workshops: |
|
|
print(f"β
Extracted {len(workshops)} workshops from JSON API") |
|
|
return workshops |
|
|
else: |
|
|
print("β No workshops found in JSON, falling back to HTML") |
|
|
except json.JSONDecodeError: |
|
|
print("β Invalid JSON response, falling back to HTML") |
|
|
|
|
|
|
|
|
print(f"π Falling back to HTML scraping for {url}") |
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
workshops = parse_workshops_from_html(soup, url) |
|
|
|
|
|
if workshops: |
|
|
print(f"β
Extracted {len(workshops)} workshops from HTML parsing") |
|
|
return workshops |
|
|
else: |
|
|
print("β No workshops found in HTML") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error scraping workshops from {url}: {e}") |
|
|
return [] |
|
|
|
|
|
def extract_workshops_from_json(data: Any, source_url: str) -> List[Dict[str, str]]: |
|
|
"""Extract workshop information from Squarespace JSON data""" |
|
|
workshops = [] |
|
|
|
|
|
|
|
|
if isinstance(data, dict) and 'mainContent' in data: |
|
|
main_content_html = data['mainContent'] |
|
|
if isinstance(main_content_html, str): |
|
|
print(f"π― Found mainContent HTML! Length: {len(main_content_html)} characters") |
|
|
|
|
|
soup = BeautifulSoup(main_content_html, 'html.parser') |
|
|
workshops = parse_workshops_from_html(soup, source_url) |
|
|
|
|
|
if workshops: |
|
|
return workshops |
|
|
|
|
|
return workshops |
|
|
|
|
|
def parse_workshops_from_html(soup, source_url: str) -> List[Dict[str, str]]: |
|
|
"""Enhanced HTML parsing specifically for workshop content""" |
|
|
workshops = [] |
|
|
workshop_texts = set() |
|
|
|
|
|
print(f"π ENHANCED HTML PARSING:") |
|
|
|
|
|
|
|
|
potential_containers = soup.find_all(['div', 'section', 'article'], |
|
|
attrs={'class': re.compile(r'(item|card|product|workshop|class)', re.I)}) |
|
|
|
|
|
print(f" Found {len(potential_containers)} potential workshop containers") |
|
|
|
|
|
for container in potential_containers: |
|
|
workshop_text = container.get_text(strip=True) |
|
|
|
|
|
if len(workshop_text) < 30 or workshop_text in workshop_texts: |
|
|
continue |
|
|
|
|
|
if any(keyword in workshop_text.lower() for keyword in ['with', 'casting', 'director', 'agent', 'perfect submission', 'crush the callback', 'get scene']): |
|
|
workshop = extract_single_workshop_from_text(workshop_text, source_url) |
|
|
if workshop and not is_duplicate_workshop(workshop, workshops): |
|
|
workshops.append(workshop) |
|
|
workshop_texts.add(workshop_text) |
|
|
|
|
|
|
|
|
all_text = soup.get_text() |
|
|
|
|
|
workshop_patterns = [ |
|
|
|
|
|
r'((?:The\s+)?(?:Perfect\s+Submission|Crush\s+the\s+Callback|Get\s+Scene\s+360?))\s+with\s+((?:Casting\s+Director|DDO\s+Agent|Manager|Director|Producer|Agent|Acting\s+Coach|Talent\s+Agent|Executive\s+Casting\s+Producer)\s+[A-Za-z\s]+?)\s+on\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r'((?:Atlanta\s+Models\s+&\s+Talent\s+President|Talent\s+Agent|Casting\s+Director|Manager|Director|Producer|Agent)\s+[A-Za-z\s]+?),\s+((?:The\s+)?(?:Perfect\s+Submission|Crush\s+the\s+Callback|Get\s+Scene\s+360?))\s+on\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r'(Casting\s+Director)\s+([A-Za-z\s\-]+?),\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*(?:at\s+)?([0-9:]+\s*(?:AM|PM))?', |
|
|
] |
|
|
|
|
|
for i, pattern in enumerate(workshop_patterns): |
|
|
matches = re.findall(pattern, all_text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
workshop = parse_refined_workshop_match(match, i+1, source_url) |
|
|
if workshop and not is_duplicate_workshop(workshop, workshops): |
|
|
workshops.append(workshop) |
|
|
|
|
|
print(f"π― TOTAL UNIQUE WORKSHOPS FOUND: {len(workshops)}") |
|
|
return workshops |
|
|
|
|
|
def extract_single_workshop_from_text(text: str, source_url: str) -> Dict[str, str]: |
|
|
"""Extract workshop info from a single text block""" |
|
|
|
|
|
|
|
|
text = re.sub(r'\$[0-9,]+\.00', '', text) |
|
|
text = re.sub(r'Featured|Sold Out', '', text, flags=re.IGNORECASE) |
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
text = re.sub(r'\n+', ' ', text) |
|
|
|
|
|
patterns = [ |
|
|
|
|
|
r'((?:The\s+)?(?:Perfect\s+Submission|Crush\s+the\s+Callback|Get\s+Scene\s+360?))\s+with\s+((?:Casting\s+Director|CD|DDO\s+Agent|Manager|Director|Producer|Agent|Acting\s+Coach|Talent\s+Agent|Executive\s+Casting\s+Producer|Atlanta\s+Models\s+&\s+Talent\s+President)\s+[A-Za-z\s\-]+?)\s+on\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r'((?:Atlanta\s+Models\s+&\s+Talent\s+President|Talent\s+Agent|Casting\s+Director|Casting\s+Associate|Manager|Director|Producer|Agent|Executive\s+Casting\s+Producer)\s+[A-Za-z\s\-]+?),\s+((?:The\s+)?(?:Perfect\s+Submission|Crush\s+the\s+Callback|Get\s+Scene\s+360?))\s+on\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r'(Casting\s+Director|Casting\s+Associate)\s+([A-Za-z\s\-]+?),\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*(?:at\s+)?([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r"([A-Za-z']+\s+(?:Executive\s+Casting\s+Producer|Studios\s+Casting\s+Associate))\s+([A-Za-z\s]+?)\s+(?:on\s+)?(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?", |
|
|
|
|
|
|
|
|
r'([A-Za-z\s]+)\s+(Agent|Talent)\s+([A-Za-z\s]+?)\s+(?:on\s+)?(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r'([A-Za-z\s]+\s+Talent),\s+([A-Za-z\s\.]+?),\s+((?:The\s+)?(?:Perfect\s+Submission|Crush\s+the\s+Callback|Get\s+Scene\s+360?))\s+on\s+(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?', |
|
|
|
|
|
|
|
|
r'^([A-Za-z\s&\']{3,25}(?:Director|Agent|Manager|Producer|President|Coach))\s+([A-Za-z\s\-]{3,30}?)\s+(?:on\s+)?(\w+\s+\d+(?:st|nd|rd|th)?)\s*[@\s]*([0-9:]+\s*(?:AM|PM))?$' |
|
|
] |
|
|
|
|
|
for i, pattern in enumerate(patterns): |
|
|
match = re.search(pattern, text, re.IGNORECASE) |
|
|
if match: |
|
|
return parse_pattern_match(match, i, source_url) |
|
|
|
|
|
return None |
|
|
|
|
|
def parse_pattern_match(match, pattern_index: int, source_url: str) -> Dict[str, str]: |
|
|
"""Parse a regex match or tuple based on pattern type""" |
|
|
|
|
|
def get_grp(m, idx): |
|
|
val = "" |
|
|
if hasattr(m, 'group'): |
|
|
try: |
|
|
val = m.group(idx) |
|
|
except IndexError: |
|
|
val = "" |
|
|
|
|
|
|
|
|
elif isinstance(m, (tuple, list)): |
|
|
if 0 <= idx-1 < len(m): |
|
|
val = m[idx-1] |
|
|
|
|
|
return val if val is not None else "" |
|
|
|
|
|
|
|
|
workshop_title = "" |
|
|
instructor_title = "" |
|
|
instructor_name = "" |
|
|
date_str = "" |
|
|
time_str = "" |
|
|
|
|
|
try: |
|
|
if pattern_index == 0: |
|
|
workshop_title = get_grp(match, 1).strip() |
|
|
professional_full = get_grp(match, 2).strip() |
|
|
date_str = get_grp(match, 3).strip() |
|
|
time_str = get_grp(match, 4).strip() |
|
|
|
|
|
if professional_full.startswith('CD '): |
|
|
professional_full = 'Casting Director ' + professional_full[3:] |
|
|
|
|
|
instructor_title, instructor_name = parse_professional_info(professional_full) |
|
|
|
|
|
elif pattern_index == 1: |
|
|
professional_full = get_grp(match, 1).strip() |
|
|
workshop_title = get_grp(match, 2).strip() |
|
|
date_str = get_grp(match, 3).strip() |
|
|
time_str = get_grp(match, 4).strip() |
|
|
|
|
|
instructor_title, instructor_name = parse_professional_info(professional_full) |
|
|
|
|
|
elif pattern_index == 2: |
|
|
instructor_title = get_grp(match, 1).strip() |
|
|
instructor_name = get_grp(match, 2).strip() |
|
|
date_str = get_grp(match, 3).strip() |
|
|
time_str = get_grp(match, 4).strip() |
|
|
workshop_title = "Casting Workshop" |
|
|
|
|
|
elif pattern_index == 3: |
|
|
instructor_title = get_grp(match, 1).strip() |
|
|
instructor_name = get_grp(match, 2).strip() |
|
|
date_str = get_grp(match, 3).strip() |
|
|
time_str = get_grp(match, 4).strip() |
|
|
workshop_title = "Industry Workshop" |
|
|
|
|
|
elif pattern_index == 4: |
|
|
company_name = get_grp(match, 1).strip() |
|
|
agent_type = get_grp(match, 2).strip() |
|
|
instructor_name = get_grp(match, 3).strip() |
|
|
date_str = get_grp(match, 4).strip() |
|
|
time_str = get_grp(match, 5).strip() |
|
|
|
|
|
instructor_title = f"{company_name} {agent_type}" |
|
|
workshop_title = "Industry Workshop" |
|
|
|
|
|
elif pattern_index == 5: |
|
|
company_name = get_grp(match, 1).strip() |
|
|
instructor_name = get_grp(match, 2).strip() |
|
|
workshop_title = get_grp(match, 3).strip() |
|
|
date_str = get_grp(match, 4).strip() |
|
|
time_str = get_grp(match, 5).strip() |
|
|
|
|
|
instructor_title = company_name |
|
|
|
|
|
else: |
|
|
professional_full = get_grp(match, 1).strip() + " " + get_grp(match, 2).strip() |
|
|
date_str = get_grp(match, 3).strip() |
|
|
time_str = get_grp(match, 4).strip() |
|
|
workshop_title = "Industry Workshop" |
|
|
|
|
|
if len(professional_full) > 50 or '\n' in professional_full: |
|
|
return None |
|
|
|
|
|
instructor_title, instructor_name = parse_professional_info(professional_full) |
|
|
|
|
|
if instructor_name and date_str: |
|
|
|
|
|
full_text = f"{workshop_title} with {instructor_title} {instructor_name}" |
|
|
if date_str: |
|
|
full_text += f" on {date_str}" |
|
|
if time_str: |
|
|
full_text += f" at {clean_time(time_str)}" |
|
|
|
|
|
return { |
|
|
'title': workshop_title, |
|
|
'instructor_name': instructor_name, |
|
|
'instructor_title': instructor_title, |
|
|
'date': date_str, |
|
|
'time': clean_time(time_str), |
|
|
'full_text': full_text, |
|
|
'source_url': source_url |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error parsing pattern match: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def parse_professional_info(professional_full: str) -> tuple: |
|
|
"""Parse professional title and name from full string""" |
|
|
|
|
|
professional_full = re.sub(r'\s+', ' ', professional_full).strip() |
|
|
|
|
|
|
|
|
specific_titles = [ |
|
|
'Atlanta Models & Talent President', |
|
|
'Executive Casting Producer', |
|
|
'Casting Director', |
|
|
'Casting Associate', |
|
|
'DDO Agent', |
|
|
'Talent Agent', |
|
|
'Acting Coach' |
|
|
] |
|
|
|
|
|
for title in specific_titles: |
|
|
if title in professional_full: |
|
|
title_pos = professional_full.find(title) |
|
|
|
|
|
if title_pos == 0: |
|
|
name_part = professional_full[len(title):].strip() |
|
|
return title, name_part |
|
|
else: |
|
|
name_part = professional_full[:title_pos].strip().rstrip(',') |
|
|
return title, name_part |
|
|
|
|
|
|
|
|
single_word_titles = ['Manager', 'Director', 'Producer', 'Agent', 'Coach', 'President'] |
|
|
|
|
|
words = professional_full.split() |
|
|
for i, word in enumerate(words): |
|
|
if word in single_word_titles: |
|
|
if i > 0 and words[i-1] in ['Casting', 'Talent', 'Executive', 'DDO', 'Acting']: |
|
|
title = f"{words[i-1]} {word}" |
|
|
name_parts = words[:i-1] + words[i+1:] |
|
|
else: |
|
|
title = word |
|
|
name_parts = words[:i] + words[i+1:] |
|
|
|
|
|
name = ' '.join(name_parts).strip() |
|
|
return title, name |
|
|
|
|
|
|
|
|
if len(words) >= 2: |
|
|
return words[0], ' '.join(words[1:]) |
|
|
|
|
|
return '', professional_full |
|
|
|
|
|
def parse_refined_workshop_match(match, pattern_num: int, source_url: str) -> Dict[str, str]: |
|
|
"""Parse a regex match into a clean workshop dictionary""" |
|
|
return parse_pattern_match(match, pattern_num-1, source_url) |
|
|
|
|
|
def is_duplicate_workshop(new_workshop: Dict, existing_workshops: List[Dict]) -> bool: |
|
|
"""Enhanced duplicate detection""" |
|
|
for existing in existing_workshops: |
|
|
if (existing.get('instructor_name', '').strip().lower() == new_workshop.get('instructor_name', '').strip().lower() and |
|
|
existing.get('date', '').strip().lower() == new_workshop.get('date', '').strip().lower()): |
|
|
|
|
|
existing_title = existing.get('title', '').strip().lower() |
|
|
new_title = new_workshop.get('title', '').strip().lower() |
|
|
|
|
|
if (existing_title == new_title or |
|
|
'workshop' in existing_title and 'workshop' in new_title or |
|
|
existing_title in new_title or new_title in existing_title): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def calculate_workshop_confidence(w: Dict) -> float: |
|
|
"""Calculate confidence score of retrieved workshop data""" |
|
|
score = 0.0 |
|
|
if w.get('title'): score += 0.3 |
|
|
if w.get('instructor_name'): score += 0.3 |
|
|
if w.get('date'): score += 0.2 |
|
|
if w.get('time'): score += 0.1 |
|
|
if w.get('source_url'): score += 0.1 |
|
|
return round(score, 2) |