Spaces:
Running
Running
| """ | |
| Google Sheets Data Service | |
| Fetches and syncs data from Gapura irregularity reports | |
| """ | |
| import os | |
| import logging | |
| import hashlib | |
| import re | |
| from io import StringIO | |
| from typing import List, Dict, Any, Optional, TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from data.cache_service import CacheService | |
| from google.oauth2.service_account import Credentials | |
| from googleapiclient.discovery import build | |
| from datetime import datetime | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| env_path = os.path.join(os.path.dirname(__file__), "..", ".env") | |
| if os.path.exists(env_path): | |
| try: | |
| with open(env_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| content_clean = re.sub( | |
| r"(?ms)^\s*GOOGLE_SHEETS_CREDENTIALS_JSON\s*=\s*\{.*?\}\s*$", "", content | |
| ) | |
| load_dotenv(stream=StringIO(content_clean)) | |
| logger.info(f"Loaded .env from {env_path} (sanitized)") | |
| except Exception: | |
| load_dotenv(env_path) | |
| logger.info(f"Loaded .env from {env_path}") | |
| CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", 300)) | |
| class GoogleSheetsService: | |
| """ | |
| Service for fetching data from Google Sheets with Redis caching | |
| """ | |
| def __init__(self, cache: Optional["CacheService"] = None): | |
| self.scopes = ["https://www.googleapis.com/auth/spreadsheets.readonly"] | |
| self.service = None | |
| self.cache = cache | |
| self._authenticate() | |
| def _authenticate(self): | |
| """Authenticate with Google Sheets API""" | |
| try: | |
| # Get private key from environment | |
| private_key = os.getenv("GOOGLE_PRIVATE_KEY", "") | |
| if not private_key: | |
| raise ValueError("GOOGLE_PRIVATE_KEY not found in environment") | |
| # The key is stored with literal \n characters, replace with actual newlines | |
| if "\\n" in private_key: | |
| private_key = private_key.replace("\\n", "\n") | |
| # Log key structure (sanitized) | |
| logger.info( | |
| f"Private key loaded: {len(private_key)} chars, {private_key.count(chr(10))} newlines" | |
| ) | |
| # Get credentials from environment | |
| credentials_info = { | |
| "type": "service_account", | |
| "project_id": "elementum-ebook", | |
| "private_key_id": os.getenv("GOOGLE_PRIVATE_KEY_ID", ""), | |
| "private_key": private_key, | |
| "client_email": os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL"), | |
| "client_id": "", | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| } | |
| credentials = Credentials.from_service_account_info( | |
| credentials_info, scopes=self.scopes | |
| ) | |
| self.service = build("sheets", "v4", credentials=credentials) | |
| logger.info("Successfully authenticated with Google Sheets API") | |
| except Exception as e: | |
| logger.error(f"Authentication failed: {str(e)}") | |
| raise | |
| def fetch_sheet_data( | |
| self, | |
| spreadsheet_id: str, | |
| sheet_name: str, | |
| range_str: str = "A1:Z1000", | |
| bypass_cache: bool = False, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Fetch data from a specific sheet with caching | |
| Args: | |
| spreadsheet_id: Google Sheet ID | |
| sheet_name: Sheet tab name (e.g., "NON CARGO", "CGO") | |
| range_str: Cell range to fetch | |
| bypass_cache: If True, skip cache and fetch fresh data | |
| Returns: | |
| List of dictionaries with row data | |
| """ | |
| cache_key = self._generate_cache_key(spreadsheet_id, sheet_name, range_str) | |
| if self.cache and not bypass_cache: | |
| cached_data = self.cache.get(cache_key) | |
| if cached_data is not None: | |
| logger.info(f"Cache hit for {sheet_name}") | |
| return cached_data | |
| try: | |
| range_notation = f"{sheet_name}!{range_str}" | |
| logger.info(f"Fetching data from {range_notation}") | |
| result = ( | |
| self.service.spreadsheets() | |
| .values() | |
| .get(spreadsheetId=spreadsheet_id, range=range_notation) | |
| .execute() | |
| ) | |
| values = result.get("values", []) | |
| if not values: | |
| logger.warning(f"No data found in {range_notation}") | |
| return [] | |
| headers = values[0] | |
| rows = values[1:] | |
| data = [] | |
| for i, row in enumerate(rows): | |
| row_padded = row + [""] * (len(headers) - len(row)) | |
| row_dict = {} | |
| for j, header in enumerate(headers): | |
| clean_header = header.strip().replace(" ", "_").replace("/", "_") | |
| row_dict[clean_header] = ( | |
| row_padded[j] if j < len(row_padded) else "" | |
| ) | |
| row_dict["_row_id"] = f"{sheet_name}_{i + 2}" | |
| row_dict["_sheet_name"] = sheet_name | |
| data.append(row_dict) | |
| logger.info(f"Successfully fetched {len(data)} rows from {sheet_name}") | |
| if self.cache: | |
| self.cache.set(cache_key, data, CACHE_TTL) | |
| logger.info(f"Cached data for {sheet_name} (TTL: {CACHE_TTL}s)") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Error fetching sheet data: {str(e)}") | |
| raise | |
| def _generate_cache_key( | |
| self, spreadsheet_id: str, sheet_name: str, range_str: str | |
| ) -> str: | |
| """Generate a unique cache key""" | |
| key_data = f"{spreadsheet_id}:{sheet_name}:{range_str}" | |
| return f"sheets:{hashlib.md5(key_data.encode()).hexdigest()}" | |
| def invalidate_cache(self, spreadsheet_id: str, sheet_name: str = None) -> int: | |
| """Invalidate cache for a spreadsheet or specific sheet""" | |
| if not self.cache: | |
| return 0 | |
| if sheet_name: | |
| pattern = f"sheets:*{sheet_name}*" | |
| else: | |
| pattern = f"sheets:*" | |
| return self.cache.delete_pattern(pattern) | |
| def fetch_all_sheets(self, spreadsheet_id: str) -> Dict[str, List[Dict]]: | |
| """ | |
| Fetch data from all sheets | |
| Returns: | |
| Dictionary with sheet names as keys | |
| """ | |
| try: | |
| # Get sheet metadata | |
| spreadsheet = ( | |
| self.service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() | |
| ) | |
| sheets = spreadsheet.get("sheets", []) | |
| all_data = {} | |
| for sheet in sheets: | |
| sheet_name = sheet["properties"]["title"] | |
| # Skip empty sheets | |
| if sheet["properties"]["gridProperties"]["rowCount"] <= 1: | |
| logger.info(f"Skipping empty sheet: {sheet_name}") | |
| continue | |
| data = self.fetch_sheet_data(spreadsheet_id, sheet_name) | |
| all_data[sheet_name] = data | |
| return all_data | |
| except Exception as e: | |
| logger.error(f"Error fetching all sheets: {str(e)}") | |
| raise | |
| def fetch_sheets_batch_data( | |
| self, | |
| spreadsheet_id: str, | |
| sheet_ranges: List[Dict[str, str]], | |
| bypass_cache: bool = False, | |
| ) -> Dict[str, List[Dict[str, Any]]]: | |
| result: Dict[str, List[Dict[str, Any]]] = {} | |
| to_fetch: List[Dict[str, str]] = [] | |
| cache_keys: Dict[str, str] = {} | |
| if self.cache and not bypass_cache: | |
| for item in sheet_ranges: | |
| k = self._generate_cache_key(spreadsheet_id, item["name"], item["range"]) | |
| cache_keys[item["name"]] = k | |
| cached = self.cache.get(k) | |
| if cached is not None: | |
| result[item["name"]] = cached | |
| else: | |
| to_fetch.append(item) | |
| else: | |
| to_fetch = list(sheet_ranges) | |
| if to_fetch: | |
| ranges = [f'{i["name"]}!{i["range"]}' for i in to_fetch] | |
| response = ( | |
| self.service.spreadsheets() | |
| .values() | |
| .batchGet( | |
| spreadsheetId=spreadsheet_id, | |
| ranges=ranges, | |
| valueRenderOption="UNFORMATTED_VALUE", | |
| ) | |
| .execute() | |
| ) | |
| value_ranges = response.get("valueRanges", []) | |
| for vr in value_ranges: | |
| rng = vr.get("range", "") | |
| sheet_name = rng.split("!")[0].replace("'", "") | |
| values = vr.get("values", []) | |
| if not values: | |
| result[sheet_name] = [] | |
| continue | |
| headers = values[0] | |
| rows = values[1:] | |
| data: List[Dict[str, Any]] = [] | |
| for i, row in enumerate(rows): | |
| row_padded = row + [""] * (len(headers) - len(row)) | |
| row_dict: Dict[str, Any] = {} | |
| for j, header in enumerate(headers): | |
| clean_header = header.strip().replace(" ", "_").replace("/", "_") | |
| row_dict[clean_header] = row_padded[j] if j < len(row_padded) else "" | |
| row_dict["_row_id"] = f"{sheet_name}_{i + 2}" | |
| row_dict["_sheet_name"] = sheet_name | |
| data.append(row_dict) | |
| result[sheet_name] = data | |
| if self.cache and not bypass_cache: | |
| k = cache_keys.get(sheet_name) or self._generate_cache_key( | |
| spreadsheet_id, | |
| sheet_name, | |
| next((it["range"] for it in sheet_ranges if it["name"] == sheet_name), ""), | |
| ) | |
| self.cache.set(k, data, CACHE_TTL) | |
| for item in sheet_ranges: | |
| if item["name"] not in result: | |
| result[item["name"]] = [] | |
| return result | |
| def fetch_sheets_selected_columns( | |
| self, | |
| spreadsheet_id: str, | |
| requests: List[Dict[str, Any]], | |
| bypass_cache: bool = False, | |
| ) -> Dict[str, List[Dict[str, Any]]]: | |
| result: Dict[str, List[Dict[str, Any]]] = {} | |
| def _key(sheet: str, cols: List[str], limit: int) -> str: | |
| cols_key = ",".join(sorted([c.strip() for c in cols])) | |
| raw = f"{spreadsheet_id}:{sheet}:cols:{cols_key}:limit:{limit}" | |
| return f"sheets:{hashlib.md5(raw.encode()).hexdigest()}" | |
| to_fetch_meta = [] | |
| metas: Dict[str, List[str]] = {} | |
| for req in requests: | |
| sheet = req["name"] | |
| cols = req.get("required_headers", []) | |
| limit = int(req.get("max_rows", 10000)) | |
| if self.cache and not bypass_cache: | |
| ck = _key(sheet, cols, limit) | |
| cached = self.cache.get(ck) | |
| if cached is not None: | |
| result[sheet] = cached | |
| continue | |
| to_fetch_meta.append(sheet) | |
| if to_fetch_meta: | |
| ranges = [f"{s}!1:1" for s in to_fetch_meta] | |
| meta_resp = ( | |
| self.service.spreadsheets() | |
| .values() | |
| .batchGet( | |
| spreadsheetId=spreadsheet_id, | |
| ranges=ranges, | |
| valueRenderOption="UNFORMATTED_VALUE", | |
| ) | |
| .execute() | |
| ) | |
| for vr in meta_resp.get("valueRanges", []): | |
| rng = vr.get("range", "") | |
| sheet_name = rng.split("!")[0].replace("'", "") | |
| headers = vr.get("values", [[]])[0] if vr.get("values") else [] | |
| clean_headers = [h.strip().replace(" ", "_").replace("/", "_") for h in headers] | |
| metas[sheet_name] = clean_headers | |
| def _idx_to_letter(idx: int) -> str: | |
| letters = "" | |
| while idx > 0: | |
| idx, rem = divmod(idx - 1, 26) | |
| letters = chr(65 + rem) + letters | |
| return letters | |
| fetch_ranges: List[str] = [] | |
| plan = [] | |
| for req in requests: | |
| sheet = req["name"] | |
| cols = req.get("required_headers", []) | |
| limit = int(req.get("max_rows", 10000)) | |
| if sheet in result: | |
| continue | |
| header_row = metas.get(sheet, []) | |
| # Build normalized header map to handle variants like 'Irregularity / Complain Category' vs 'Irregularity/Complain Category' | |
| norm_map = {re.sub("_+", "_", h).lower(): i + 1 for i, h in enumerate(header_row)} | |
| indices = [] | |
| for col in cols: | |
| c = col.strip().replace(" ", "_").replace("/", "_") | |
| c_norm = re.sub("_+", "_", c).lower() | |
| idx = norm_map.get(c_norm) | |
| if idx: | |
| indices.append(idx) | |
| col_letters = [_idx_to_letter(i) for i in indices] | |
| ranges = [f"{sheet}!{L}1:{L}{limit+1}" for L in col_letters] | |
| fetch_ranges.extend(ranges) | |
| plan.append((sheet, cols, indices, col_letters, limit)) | |
| if fetch_ranges: | |
| data_resp = ( | |
| self.service.spreadsheets() | |
| .values() | |
| .batchGet( | |
| spreadsheetId=spreadsheet_id, | |
| ranges=fetch_ranges, | |
| valueRenderOption="UNFORMATTED_VALUE", | |
| ) | |
| .execute() | |
| ) | |
| vr_map: Dict[str, List[List[Any]]] = {} | |
| for vr in data_resp.get("valueRanges", []): | |
| rng = vr.get("range", "") | |
| vr_map[rng] = vr.get("values", []) | |
| for sheet, cols, indices, letters, limit in plan: | |
| columns_data: List[List[Any]] = [] | |
| for L in letters: | |
| rng = f"'{sheet}'!{L}1:{L}{limit+1}" | |
| alt_rng = f"{sheet}!{L}1:{L}{limit+1}" | |
| vals = vr_map.get(rng) or vr_map.get(alt_rng) or [] | |
| col_vals = vals[1:] if vals else [] | |
| columns_data.append(col_vals) | |
| n_rows = max((len(c) for c in columns_data), default=0) | |
| rows: List[Dict[str, Any]] = [] | |
| for r in range(n_rows): | |
| d: Dict[str, Any] = {} | |
| for ci, col_name in enumerate(cols): | |
| col_list = columns_data[ci] if ci < len(columns_data) else [] | |
| v = col_list[r][0] if r < len(col_list) and col_list[r] else "" | |
| clean_col = col_name.strip().replace(" ", "_").replace("/", "_") | |
| d[clean_col] = v | |
| d["_row_id"] = f"{sheet}_{r+2}" | |
| d["_sheet_name"] = sheet | |
| rows.append(d) | |
| result[sheet] = rows | |
| if self.cache and not bypass_cache: | |
| ck = _key(sheet, cols, limit) | |
| self.cache.set(ck, rows, CACHE_TTL) | |
| for req in requests: | |
| if req["name"] not in result: | |
| result[req["name"]] = [] | |
| return result | |
| def to_dataframe(self, data: List[Dict]) -> pd.DataFrame: | |
| """Convert data to pandas DataFrame""" | |
| df = pd.DataFrame(data) | |
| # Convert date column if exists | |
| date_columns = ["Date_of_Event", "dateOfEvent", "Date"] | |
| for col in date_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| return df | |
| def sync_to_database(self, data: List[Dict], db_connection: Any): | |
| """ | |
| Sync fetched data to database | |
| TODO: Implement database sync | |
| """ | |
| logger.info(f"Syncing {len(data)} records to database") | |
| # Implementation depends on your database | |
| pass | |
| class DataPreprocessor: | |
| """ | |
| Preprocess irregularity report data for ML models | |
| """ | |
| def __init__(self): | |
| self.severity_keywords = { | |
| "high": [ | |
| "damage", | |
| "torn", | |
| "broken", | |
| "emergency", | |
| "critical", | |
| "urgent", | |
| "severe", | |
| ], | |
| "medium": ["delay", "late", "wrong", "incorrect", "missing"], | |
| "low": ["minor", "small", "slight"], | |
| } | |
| def clean_text(self, text: str) -> str: | |
| """Clean and normalize text""" | |
| if not text: | |
| return "" | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove extra whitespace | |
| text = " ".join(text.split()) | |
| return text | |
| def extract_features(self, report: Dict) -> Dict[str, Any]: | |
| """Extract features from a single report""" | |
| # Parse date | |
| date_str = report.get("Date_of_Event", "") or report.get("dateOfEvent", "") | |
| try: | |
| date_obj = pd.to_datetime(date_str) | |
| day_of_week = date_obj.dayofweek | |
| month = date_obj.month | |
| is_weekend = day_of_week in [5, 6] | |
| except: | |
| day_of_week = 0 | |
| month = 1 | |
| is_weekend = False | |
| # Text features | |
| report_text = report.get("Report", "") or report.get("report", "") | |
| root_cause = report.get("Root_Caused", "") or report.get("rootCause", "") | |
| action_taken = report.get("Action_Taken", "") or report.get("actionTaken", "") | |
| cleaned_report = self.clean_text(report_text) | |
| # Count severity keywords | |
| severity_score = 0 | |
| for severity, keywords in self.severity_keywords.items(): | |
| for keyword in keywords: | |
| if keyword in cleaned_report: | |
| severity_score += {"high": 3, "medium": 2, "low": 1}[severity] | |
| return { | |
| # Time features | |
| "day_of_week": day_of_week, | |
| "month": month, | |
| "is_weekend": is_weekend, | |
| # Categorical | |
| "airline": report.get("Airlines", "Unknown"), | |
| "airline_type": report.get("Jenis_Maskapai", "Unknown"), | |
| "hub": report.get("HUB", "Unknown"), | |
| "branch": report.get("Branch", "Unknown"), | |
| "category": report.get("Irregularity_Complain_Category", "Unknown"), | |
| "area": report.get("Area", "Unknown"), | |
| # Text features | |
| "report_length": len(report_text), | |
| "report_word_count": len(report_text.split()) if report_text else 0, | |
| "root_cause_length": len(root_cause), | |
| "action_taken_length": len(action_taken), | |
| "severity_keyword_count": severity_score, | |
| # Binary | |
| "has_photos": bool(report.get("Upload_Irregularity_Photo", "")), | |
| "is_cargo": "Cargo" in report.get("Report_Category", ""), | |
| "is_complaint": report.get("Report_Category", "") == "Complaint", | |
| "is_closed": report.get("Status", "").lower() == "closed", | |
| # Raw text for NLP | |
| "report_text": cleaned_report, | |
| "root_cause_text": self.clean_text(root_cause), | |
| "action_taken_text": self.clean_text(action_taken), | |
| "combined_text": f"{cleaned_report} {self.clean_text(root_cause)} {self.clean_text(action_taken)}", | |
| } | |
| def preprocess_batch(self, reports: List[Dict]) -> List[Dict]: | |
| """Preprocess a batch of reports""" | |
| return [self.extract_features(report) for report in reports] | |
| # ============== Usage Example ============== | |
| if __name__ == "__main__": | |
| # Initialize service | |
| sheets_service = GoogleSheetsService() | |
| preprocessor = DataPreprocessor() | |
| # Fetch data | |
| spreadsheet_id = os.getenv("GOOGLE_SHEET_ID") | |
| # Fetch NON CARGO sheet | |
| non_cargo_data = sheets_service.fetch_sheet_data( | |
| spreadsheet_id=spreadsheet_id, sheet_name="NON CARGO" | |
| ) | |
| print(f"Fetched {len(non_cargo_data)} rows from NON CARGO") | |
| # Preprocess first 5 rows | |
| if non_cargo_data: | |
| sample_features = preprocessor.preprocess_batch(non_cargo_data[:5]) | |
| print("\nSample features:") | |
| for i, features in enumerate(sample_features): | |
| print(f"\nRow {i + 1}:") | |
| print(f" Airline: {features['airline']}") | |
| print(f" Category: {features['category']}") | |
| print(f" Report length: {features['report_length']}") | |
| print(f" Severity keywords: {features['severity_keyword_count']}") | |