Spaces:
Build error
Build error
| """ | |
| Google Sheets Data Service | |
| Fetches and syncs data from Gapura irregularity reports | |
| """ | |
| import os | |
| import logging | |
| import hashlib | |
| import re | |
| from io import StringIO | |
| from typing import List, Dict, Any, Optional, TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from data.cache_service import CacheService | |
| from google.oauth2.service_account import Credentials | |
| from googleapiclient.discovery import build | |
| from datetime import datetime | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| env_path = os.path.join(os.path.dirname(__file__), "..", ".env") | |
| if os.path.exists(env_path): | |
| try: | |
| with open(env_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| content_clean = re.sub( | |
| r"(?ms)^\s*GOOGLE_SHEETS_CREDENTIALS_JSON\s*=\s*\{.*?\}\s*$", "", content | |
| ) | |
| load_dotenv(stream=StringIO(content_clean)) | |
| logger.info(f"Loaded .env from {env_path} (sanitized)") | |
| except Exception: | |
| load_dotenv(env_path) | |
| logger.info(f"Loaded .env from {env_path}") | |
| CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", 300)) | |
| class GoogleSheetsService: | |
| """ | |
| Service for fetching data from Google Sheets with Redis caching | |
| """ | |
| def __init__(self, cache: Optional["CacheService"] = None): | |
| self.scopes = ["https://www.googleapis.com/auth/spreadsheets.readonly"] | |
| self.service = None | |
| self.cache = cache | |
| self._authenticate() | |
| def _authenticate(self): | |
| """Authenticate with Google Sheets API""" | |
| try: | |
| # Get private key from environment | |
| private_key = os.getenv("GOOGLE_PRIVATE_KEY", "") | |
| if not private_key: | |
| raise ValueError("GOOGLE_PRIVATE_KEY not found in environment") | |
| # The key is stored with literal \n characters, replace with actual newlines | |
| if "\\n" in private_key: | |
| private_key = private_key.replace("\\n", "\n") | |
| # Log key structure (sanitized) | |
| logger.info( | |
| f"Private key loaded: {len(private_key)} chars, {private_key.count(chr(10))} newlines" | |
| ) | |
| # Get credentials from environment | |
| credentials_info = { | |
| "type": "service_account", | |
| "project_id": "elementum-ebook", | |
| "private_key_id": os.getenv("GOOGLE_PRIVATE_KEY_ID", ""), | |
| "private_key": private_key, | |
| "client_email": os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL"), | |
| "client_id": "", | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| } | |
| credentials = Credentials.from_service_account_info( | |
| credentials_info, scopes=self.scopes | |
| ) | |
| self.service = build("sheets", "v4", credentials=credentials) | |
| logger.info("Successfully authenticated with Google Sheets API") | |
| except Exception as e: | |
| logger.error(f"Authentication failed: {str(e)}") | |
| raise | |
| def fetch_sheet_data( | |
| self, | |
| spreadsheet_id: str, | |
| sheet_name: str, | |
| range_str: str = "A1:Z1000", | |
| bypass_cache: bool = False, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Fetch data from a specific sheet with caching | |
| Args: | |
| spreadsheet_id: Google Sheet ID | |
| sheet_name: Sheet tab name (e.g., "NON CARGO", "CGO") | |
| range_str: Cell range to fetch | |
| bypass_cache: If True, skip cache and fetch fresh data | |
| Returns: | |
| List of dictionaries with row data | |
| """ | |
| cache_key = self._generate_cache_key(spreadsheet_id, sheet_name, range_str) | |
| if self.cache and not bypass_cache: | |
| cached_data = self.cache.get(cache_key) | |
| if cached_data is not None: | |
| logger.info(f"Cache hit for {sheet_name}") | |
| return cached_data | |
| try: | |
| range_notation = f"{sheet_name}!{range_str}" | |
| logger.info(f"Fetching data from {range_notation}") | |
| result = ( | |
| self.service.spreadsheets() | |
| .values() | |
| .get(spreadsheetId=spreadsheet_id, range=range_notation) | |
| .execute() | |
| ) | |
| values = result.get("values", []) | |
| if not values: | |
| logger.warning(f"No data found in {range_notation}") | |
| return [] | |
| headers = values[0] | |
| rows = values[1:] | |
| data = [] | |
| for i, row in enumerate(rows): | |
| row_padded = row + [""] * (len(headers) - len(row)) | |
| row_dict = {} | |
| for j, header in enumerate(headers): | |
| clean_header = header.strip().replace(" ", "_").replace("/", "_") | |
| row_dict[clean_header] = ( | |
| row_padded[j] if j < len(row_padded) else "" | |
| ) | |
| row_dict["_row_id"] = f"{sheet_name}_{i + 2}" | |
| row_dict["_sheet_name"] = sheet_name | |
| data.append(row_dict) | |
| logger.info(f"Successfully fetched {len(data)} rows from {sheet_name}") | |
| if self.cache: | |
| self.cache.set(cache_key, data, CACHE_TTL) | |
| logger.info(f"Cached data for {sheet_name} (TTL: {CACHE_TTL}s)") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Error fetching sheet data: {str(e)}") | |
| raise | |
| def _generate_cache_key( | |
| self, spreadsheet_id: str, sheet_name: str, range_str: str | |
| ) -> str: | |
| """Generate a unique cache key""" | |
| key_data = f"{spreadsheet_id}:{sheet_name}:{range_str}" | |
| return f"sheets:{hashlib.md5(key_data.encode()).hexdigest()}" | |
| def invalidate_cache(self, spreadsheet_id: str, sheet_name: str = None) -> int: | |
| """Invalidate cache for a spreadsheet or specific sheet""" | |
| if not self.cache: | |
| return 0 | |
| if sheet_name: | |
| pattern = f"sheets:*{sheet_name}*" | |
| else: | |
| pattern = f"sheets:*" | |
| return self.cache.delete_pattern(pattern) | |
| def fetch_all_sheets(self, spreadsheet_id: str) -> Dict[str, List[Dict]]: | |
| """ | |
| Fetch data from all sheets | |
| Returns: | |
| Dictionary with sheet names as keys | |
| """ | |
| try: | |
| # Get sheet metadata | |
| spreadsheet = ( | |
| self.service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() | |
| ) | |
| sheets = spreadsheet.get("sheets", []) | |
| all_data = {} | |
| for sheet in sheets: | |
| sheet_name = sheet["properties"]["title"] | |
| # Skip empty sheets | |
| if sheet["properties"]["gridProperties"]["rowCount"] <= 1: | |
| logger.info(f"Skipping empty sheet: {sheet_name}") | |
| continue | |
| data = self.fetch_sheet_data(spreadsheet_id, sheet_name) | |
| all_data[sheet_name] = data | |
| return all_data | |
| except Exception as e: | |
| logger.error(f"Error fetching all sheets: {str(e)}") | |
| raise | |
| def to_dataframe(self, data: List[Dict]) -> pd.DataFrame: | |
| """Convert data to pandas DataFrame""" | |
| df = pd.DataFrame(data) | |
| # Convert date column if exists | |
| date_columns = ["Date_of_Event", "dateOfEvent", "Date"] | |
| for col in date_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| return df | |
| def sync_to_database(self, data: List[Dict], db_connection: Any): | |
| """ | |
| Sync fetched data to database | |
| TODO: Implement database sync | |
| """ | |
| logger.info(f"Syncing {len(data)} records to database") | |
| # Implementation depends on your database | |
| pass | |
| class DataPreprocessor: | |
| """ | |
| Preprocess irregularity report data for ML models | |
| """ | |
| def __init__(self): | |
| self.severity_keywords = { | |
| "high": [ | |
| "damage", | |
| "torn", | |
| "broken", | |
| "emergency", | |
| "critical", | |
| "urgent", | |
| "severe", | |
| ], | |
| "medium": ["delay", "late", "wrong", "incorrect", "missing"], | |
| "low": ["minor", "small", "slight"], | |
| } | |
| def clean_text(self, text: str) -> str: | |
| """Clean and normalize text""" | |
| if not text: | |
| return "" | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove extra whitespace | |
| text = " ".join(text.split()) | |
| return text | |
| def extract_features(self, report: Dict) -> Dict[str, Any]: | |
| """Extract features from a single report""" | |
| # Parse date | |
| date_str = report.get("Date_of_Event", "") or report.get("dateOfEvent", "") | |
| try: | |
| date_obj = pd.to_datetime(date_str) | |
| day_of_week = date_obj.dayofweek | |
| month = date_obj.month | |
| is_weekend = day_of_week in [5, 6] | |
| except: | |
| day_of_week = 0 | |
| month = 1 | |
| is_weekend = False | |
| # Text features | |
| report_text = report.get("Report", "") or report.get("report", "") | |
| root_cause = report.get("Root_Caused", "") or report.get("rootCause", "") | |
| action_taken = report.get("Action_Taken", "") or report.get("actionTaken", "") | |
| cleaned_report = self.clean_text(report_text) | |
| # Count severity keywords | |
| severity_score = 0 | |
| for severity, keywords in self.severity_keywords.items(): | |
| for keyword in keywords: | |
| if keyword in cleaned_report: | |
| severity_score += {"high": 3, "medium": 2, "low": 1}[severity] | |
| return { | |
| # Time features | |
| "day_of_week": day_of_week, | |
| "month": month, | |
| "is_weekend": is_weekend, | |
| # Categorical | |
| "airline": report.get("Airlines", "Unknown"), | |
| "airline_type": report.get("Jenis_Maskapai", "Unknown"), | |
| "hub": report.get("HUB", "Unknown"), | |
| "branch": report.get("Branch", "Unknown"), | |
| "category": report.get("Irregularity_Complain_Category", "Unknown"), | |
| "area": report.get("Area", "Unknown"), | |
| # Text features | |
| "report_length": len(report_text), | |
| "report_word_count": len(report_text.split()) if report_text else 0, | |
| "root_cause_length": len(root_cause), | |
| "action_taken_length": len(action_taken), | |
| "severity_keyword_count": severity_score, | |
| # Binary | |
| "has_photos": bool(report.get("Upload_Irregularity_Photo", "")), | |
| "is_cargo": "Cargo" in report.get("Report_Category", ""), | |
| "is_complaint": report.get("Report_Category", "") == "Complaint", | |
| "is_closed": report.get("Status", "").lower() == "closed", | |
| # Raw text for NLP | |
| "report_text": cleaned_report, | |
| "root_cause_text": self.clean_text(root_cause), | |
| "action_taken_text": self.clean_text(action_taken), | |
| "combined_text": f"{cleaned_report} {self.clean_text(root_cause)} {self.clean_text(action_taken)}", | |
| } | |
| def preprocess_batch(self, reports: List[Dict]) -> List[Dict]: | |
| """Preprocess a batch of reports""" | |
| return [self.extract_features(report) for report in reports] | |
| # ============== Usage Example ============== | |
| if __name__ == "__main__": | |
| # Initialize service | |
| sheets_service = GoogleSheetsService() | |
| preprocessor = DataPreprocessor() | |
| # Fetch data | |
| spreadsheet_id = os.getenv("GOOGLE_SHEET_ID") | |
| # Fetch NON CARGO sheet | |
| non_cargo_data = sheets_service.fetch_sheet_data( | |
| spreadsheet_id=spreadsheet_id, sheet_name="NON CARGO" | |
| ) | |
| print(f"Fetched {len(non_cargo_data)} rows from NON CARGO") | |
| # Preprocess first 5 rows | |
| if non_cargo_data: | |
| sample_features = preprocessor.preprocess_batch(non_cargo_data[:5]) | |
| print("\nSample features:") | |
| for i, features in enumerate(sample_features): | |
| print(f"\nRow {i + 1}:") | |
| print(f" Airline: {features['airline']}") | |
| print(f" Category: {features['category']}") | |
| print(f" Report length: {features['report_length']}") | |
| print(f" Severity keywords: {features['severity_keyword_count']}") | |