Spaces:
Running
Running
| import os | |
| import pandas as pd | |
| import gspread | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| import json | |
| import gradio as gr | |
| import time | |
| from datetime import datetime, timedelta | |
| import threading | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Scopes (read-only) | |
| SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly"] | |
| def authorize(): | |
| creds = None | |
| # Get JSON content from environment variables | |
| token_json_content = os.getenv('TOKEN_JSON') | |
| credentials_json_content = os.getenv('CREDENTIALS_JSON') | |
| # Load token from environment variable if exists | |
| if token_json_content: | |
| try: | |
| token_info = json.loads(token_json_content) | |
| creds = Credentials.from_authorized_user_info(token_info, SCOPES) | |
| except json.JSONDecodeError: | |
| print("β οΈ Invalid TOKEN_JSON format in environment variable") | |
| # If no valid credentials, start OAuth flow | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| if not credentials_json_content: | |
| raise ValueError("CREDENTIALS_JSON environment variable is required for OAuth flow") | |
| try: | |
| credentials_info = json.loads(credentials_json_content) | |
| flow = InstalledAppFlow.from_client_config(credentials_info, SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| except json.JSONDecodeError: | |
| raise ValueError("Invalid CREDENTIALS_JSON format in environment variable") | |
| # Save token back to environment (for this session only) | |
| # Note: You may want to update your .env file manually with the new token | |
| print("π New token generated. Consider updating TOKEN_JSON in your .env file with:") | |
| print(f"TOKEN_JSON={creds.to_json()}") | |
| return gspread.authorize(creds) | |
| # Function to get data from a specific sheet | |
| def get_sheet_data(spreadsheet, gid, sheet_name): | |
| try: | |
| sheet = spreadsheet.get_worksheet_by_id(gid) | |
| all_values = sheet.get_all_values() | |
| if not all_values or len(all_values) <= 4: | |
| print(f"β οΈ {sheet_name} sheet doesn't have enough data") | |
| return pd.DataFrame() | |
| # Use row 4 as headers (the actual column names) | |
| headers = all_values[4] | |
| clean_headers = [] | |
| seen_headers = {} | |
| for i, header in enumerate(headers): | |
| if header.strip(): # Non-empty header | |
| base_header = header.strip() | |
| # Handle duplicate headers by adding a counter | |
| if base_header in seen_headers: | |
| seen_headers[base_header] += 1 | |
| clean_header = f"{base_header}_{seen_headers[base_header]}" | |
| else: | |
| seen_headers[base_header] = 0 | |
| clean_header = base_header | |
| clean_headers.append(clean_header) | |
| else: # Empty header | |
| clean_headers.append(f"Empty_Col_{i}") | |
| # Create DataFrame starting from row 5 (after headers) | |
| data_rows = all_values[5:] | |
| if data_rows: | |
| df = pd.DataFrame(data_rows, columns=clean_headers) | |
| # Remove completely empty columns | |
| df = df.loc[:, (df != '').any(axis=0)] | |
| print(f"β Loaded {len(df)} rows from {sheet_name}") | |
| return df | |
| else: | |
| print(f"β οΈ No data rows found in {sheet_name}") | |
| return pd.DataFrame() | |
| except Exception as e: | |
| print(f"β Error loading {sheet_name}: {str(e)}") | |
| return pd.DataFrame() | |
| # Function to get details sheet information | |
| def get_details_info(spreadsheet): | |
| try: | |
| details_sheet = spreadsheet.get_worksheet_by_id(847680829) | |
| all_values = details_sheet.get_all_values() | |
| if not all_values: | |
| return None | |
| # Use row 4 as headers | |
| headers = all_values[4] | |
| clean_headers = [] | |
| for i, header in enumerate(headers): | |
| if header.strip(): | |
| clean_headers.append(header.strip()) | |
| else: | |
| clean_headers.append(f"Empty_Col_{i}") | |
| # Get data rows after header | |
| data_rows = all_values[5:] | |
| if data_rows: | |
| df = pd.DataFrame(data_rows, columns=clean_headers) | |
| df = df.loc[:, (df != '').any(axis=0)] | |
| details_info = {} | |
| # Extract specific information | |
| for idx in range(len(df)): | |
| student_data = df.iloc[idx] | |
| year_value = str(student_data.get('YEAR', '')).strip() | |
| # Get Average Reward Points | |
| if 'AVERAGE REWARD POINT' in year_value: | |
| details_info['average_points'] = { | |
| 'I': student_data.get('I', ''), | |
| 'II': student_data.get('II', ''), | |
| 'II L': student_data.get('II L', ''), | |
| 'III': student_data.get('III', ''), | |
| 'IV': student_data.get('IV', '') | |
| } | |
| # Get IP 2 Redemption Dates | |
| elif 'Last Day for IP 2 Redemption Duration' in str(student_data.get('Redemption Dates', '')): | |
| details_info['ip2_redemption'] = { | |
| 'S1': student_data.get('S1', ''), | |
| 'S2': student_data.get('S2', ''), | |
| 'S3': student_data.get('S3', ''), | |
| 'S4': student_data.get('S4', ''), | |
| 'S5': student_data.get('S5', ''), | |
| 'S6': student_data.get('S6', ''), | |
| 'S7': student_data.get('S7', ''), | |
| 'S8': student_data.get('S8', '') | |
| } | |
| # Get IP 1 Redemption Dates | |
| elif 'Last Day for IP 1 Redemption Duration' in str(student_data.get('Redemption Dates', '')): | |
| details_info['ip1_redemption'] = { | |
| 'S1': student_data.get('S1', ''), | |
| 'S2': student_data.get('S2', ''), | |
| 'S3': student_data.get('S3', ''), | |
| 'S4': student_data.get('S4', ''), | |
| 'S5': student_data.get('S5', ''), | |
| 'S6': student_data.get('S6', ''), | |
| 'S7': student_data.get('S7', ''), | |
| 'S8': student_data.get('S8', '') | |
| } | |
| # Get Last Updated Information | |
| elif 'POINTS LAST UPDATED' in year_value: | |
| details_info['last_updated'] = year_value | |
| return details_info | |
| except Exception as e: | |
| print(f"β Error loading Details Sheet: {str(e)}") | |
| return None | |
| # Initialize global variables | |
| print("π Initializing application...") | |
| client = authorize() | |
| SHEET_ID = os.getenv('GOOGLE_SHEET_ID') | |
| spreadsheet = client.open_by_key(SHEET_ID) | |
| # Load data from all sheets (Original 3 + New 17 = 20 sheets total) | |
| sheet_configs = [ | |
| # Original sheets | |
| {"gid": 688907204, "name": "AIML"}, | |
| {"gid": 451167295, "name": "AIDS"}, | |
| {"gid": 1955995189, "name": "Sheet_3"}, | |
| {"gid": 821473193, "name": "Sheet_4"}, | |
| {"gid": 1798819643, "name": "Sheet_5"}, | |
| {"gid": 1057532042, "name": "Sheet_6"}, | |
| {"gid": 1848020834, "name": "Sheet_7"}, | |
| {"gid": 48570283, "name": "Sheet_8"}, | |
| {"gid": 559332743, "name": "Sheet_9"}, | |
| {"gid": 1481375682, "name": "Sheet_10"}, | |
| {"gid": 1136877763, "name": "Sheet_11"}, | |
| {"gid": 510521423, "name": "Sheet_12"}, | |
| {"gid": 1936618, "name": "Sheet_13"}, | |
| {"gid": 91989289, "name": "Sheet_14"}, | |
| {"gid": 30073516, "name": "Sheet_15"}, | |
| {"gid": 857542309, "name": "Sheet_16"}, | |
| {"gid": 790318539, "name": "Sheet_17"}, | |
| {"gid": 587090068, "name": "Sheet_18"}, | |
| {"gid": 260192612, "name": "Sheet_19"}, | |
| {"gid": 400900059, "name": "Sheet_20"} | |
| ] | |
| # π GLOBAL DATA CACHE WITH 12-HOUR AUTO-REFRESH | |
| data_cache = { | |
| "combined_df": None, | |
| "details_info": None, | |
| "last_update": None, | |
| "cache_duration_hours": 12, # 12 hours cache | |
| "is_loading": False | |
| } | |
| def load_all_data(): | |
| """Load and cache all data from Google Sheets""" | |
| global data_cache | |
| if data_cache["is_loading"]: | |
| print("β³ Data loading already in progress...") | |
| return data_cache["combined_df"], data_cache["details_info"] | |
| data_cache["is_loading"] = True | |
| print(f"π Loading fresh data from {len(sheet_configs)} Google Sheets...") | |
| start_time = time.time() | |
| try: | |
| # Load all sheet data | |
| all_dataframes = [] | |
| for config in sheet_configs: | |
| df = get_sheet_data(spreadsheet, config["gid"], config["name"]) | |
| if not df.empty: | |
| df['Source_Sheet'] = config["name"] | |
| all_dataframes.append(df) | |
| print(f" π {config['name']}: {len(df)} rows") | |
| # Combine dataframes | |
| if all_dataframes: | |
| try: | |
| combined_df = pd.concat(all_dataframes, ignore_index=True, sort=False) | |
| print(f"β Successfully combined {len(combined_df)} records from {len(all_dataframes)} sheets") | |
| except Exception as e: | |
| print(f"β Error combining dataframes: {str(e)}") | |
| print("π Trying alternative approach...") | |
| # Alternative approach: standardize columns first | |
| standard_columns = ['SL. NO.', 'YEAR', 'ROLL NO.', 'STUDENT NAME', 'COURSE CODE', | |
| 'DEPARTMENT', 'MENTOR NAME', 'CUMULATIVE REWARD POINTS', | |
| 'REEDEMED POINTS', 'BALANCE POINTS', 'Source_Sheet'] | |
| standardized_dfs = [] | |
| for df in all_dataframes: | |
| new_df = pd.DataFrame() | |
| for col in standard_columns: | |
| if col == 'Source_Sheet': | |
| new_df[col] = df.get('Source_Sheet', '') | |
| else: | |
| # Try to find matching column | |
| found_col = None | |
| for df_col in df.columns: | |
| if col.upper() in df_col.upper() or df_col.upper() in col.upper(): | |
| found_col = df_col | |
| break | |
| if found_col: | |
| new_df[col] = df[found_col] | |
| else: | |
| new_df[col] = '' | |
| standardized_dfs.append(new_df) | |
| combined_df = pd.concat(standardized_dfs, ignore_index=True, sort=False) | |
| print(f"β Alternative approach successful: {len(combined_df)} records combined") | |
| else: | |
| combined_df = pd.DataFrame() | |
| print("β No data found in any sheets") | |
| # Load details info | |
| details_info = get_details_info(spreadsheet) | |
| # Update cache | |
| data_cache["combined_df"] = combined_df | |
| data_cache["details_info"] = details_info | |
| data_cache["last_update"] = datetime.now() | |
| load_time = time.time() - start_time | |
| print(f"β±οΈ Data loaded and cached in {load_time:.2f} seconds") | |
| print(f"π Next auto-refresh in {data_cache['cache_duration_hours']} hours") | |
| return combined_df, details_info | |
| except Exception as e: | |
| print(f"β Error loading data: {str(e)}") | |
| return data_cache.get("combined_df", pd.DataFrame()), data_cache.get("details_info", None) | |
| finally: | |
| data_cache["is_loading"] = False | |
| def get_cached_data(): | |
| """Get data from cache or refresh if 12 hours have passed""" | |
| now = datetime.now() | |
| # Check if cache is empty or expired (12 hours) | |
| if (data_cache["last_update"] is None or | |
| data_cache["combined_df"] is None or | |
| (now - data_cache["last_update"]).total_seconds() > (data_cache["cache_duration_hours"] * 3600)): | |
| print("π Cache expired or empty, loading fresh data...") | |
| return load_all_data() | |
| else: | |
| cache_age_hours = (now - data_cache["last_update"]).total_seconds() / 3600 | |
| print(f"π Using cached data (age: {cache_age_hours:.1f} hours)") | |
| return data_cache["combined_df"], data_cache["details_info"] | |
| def auto_refresh_worker(): | |
| """Background worker to auto-refresh data every 12 hours""" | |
| while True: | |
| try: | |
| # Sleep for 12 hours (43200 seconds) | |
| time.sleep(43200) | |
| print("β° 12-hour auto-refresh triggered...") | |
| load_all_data() | |
| except Exception as e: | |
| print(f"β Auto-refresh error: {str(e)}") | |
| # If error, wait 1 hour before trying again | |
| time.sleep(3600) | |
| # Load initial data | |
| print("π Loading initial data...") | |
| load_all_data() | |
| # Start background auto-refresh thread | |
| refresh_thread = threading.Thread(target=auto_refresh_worker, daemon=True) | |
| refresh_thread.start() | |
| print("π Auto-refresh thread started (updates every 12 hours)") | |
| # Function to search student with cached data | |
| def search_student(roll_no): | |
| if not roll_no.strip(): | |
| return "β Please enter a roll number" | |
| # Convert roll number to uppercase for consistent searching | |
| roll_no = roll_no.strip().upper() | |
| # Get cached data (fast response, auto-refreshes every 12 hours) | |
| combined_df, details_info = get_cached_data() | |
| if combined_df.empty: | |
| return "β No data available from Google Sheets" | |
| # Look for 'ROLL NO.' column | |
| roll_column = None | |
| for col in combined_df.columns: | |
| if 'roll' in col.lower() and 'no' in col.lower(): | |
| roll_column = col | |
| break | |
| if roll_column is None: | |
| return f"β Roll number column not found. Available columns: {list(combined_df.columns)}" | |
| # Convert the roll numbers in DataFrame to uppercase for comparison | |
| student = combined_df[combined_df[roll_column].astype(str).str.strip().str.upper() == roll_no] | |
| if student.empty: | |
| return f"β Roll No '{roll_no}' not found in any sheet" | |
| # Log to see which roll number is asked by user | |
| print("Roll No Searched:", roll_no) | |
| record = student.iloc[0].to_dict() | |
| student_year = str(record.get('YEAR', '')).strip() | |
| # Format output - Simplified version | |
| output = [] | |
| output.append("=" * 80) | |
| output.append("STUDENT DETAILS") | |
| output.append("=" * 80) | |
| # Main student details | |
| main_fields = ['ROLL NO.', 'STUDENT NAME', 'YEAR', 'DEPARTMENT', 'MENTOR NAME', | |
| 'CUMULATIVE REWARD POINTS', 'REEDEMED POINTS', 'BALANCE POINTS'] | |
| for field in main_fields: | |
| value = record.get(field, '') | |
| if str(value).strip(): | |
| output.append(f"{field:<25}: {value}") | |
| # Get student's current points (clean numeric value) | |
| try: | |
| student_points_str = str(record.get('BALANCE POINTS', '')).replace(',', '').strip() | |
| student_points = float(student_points_str) if student_points_str else 0 | |
| except: | |
| student_points = 0 | |
| # Add year-specific average points and analysis | |
| if details_info and 'average_points' in details_info: | |
| output.append("\n" + "=" * 80) | |
| output.append(f"AVERAGE REWARD POINTS FOR YEAR {student_year}") | |
| output.append("=" * 80) | |
| if student_year in details_info['average_points']: | |
| avg_points_str = details_info['average_points'][student_year] | |
| try: | |
| avg_points = float(avg_points_str) if avg_points_str else 0 | |
| except: | |
| avg_points = 0 | |
| if avg_points > 0: | |
| output.append(f"Average Points for Year {student_year:<8}: {avg_points_str}") | |
| # Calculate difference and provide guidance | |
| points_difference = avg_points - student_points | |
| if points_difference > 0: | |
| # Student is below average | |
| output.append(f"\nπ― POINTS NEEDED TO REACH AVERAGE: {points_difference:.0f} points") | |
| output.append("\nπ‘ WAYS TO EARN POINTS:") | |
| output.append(" β’ PS Activities") | |
| output.append(" β’ TAC") | |
| output.append(" β’ Hackathons / Technical Events") | |
| output.append(" β’ Project Competitions") | |
| else: | |
| # Student is at or above average | |
| output.append(f"\nπ EXCELLENT! You are {abs(points_difference):.0f} points ABOVE the average!") | |
| output.append(" Keep up the great work! π") | |
| # Add last updated info | |
| if details_info and 'last_updated' in details_info: | |
| output.append("\n" + "-" * 60) | |
| output.append("LAST UPDATE INFO") | |
| output.append("-" * 60) | |
| output.append(details_info['last_updated']) | |
| output.append("\n" + "=" * 80) | |
| return "\n".join(output) | |
| # Function to get system information | |
| def get_system_info(): | |
| combined_df, details_info = get_cached_data() | |
| if not details_info: | |
| return "β No system information available" | |
| output = [] | |
| output.append("=" * 80) | |
| output.append("SYSTEM INFORMATION") | |
| output.append("=" * 80) | |
| # Average Points | |
| if 'average_points' in details_info: | |
| output.append("\nAVERAGE REWARD POINTS BY YEAR:") | |
| output.append("-" * 40) | |
| for year, points in details_info['average_points'].items(): | |
| if points: | |
| output.append(f"Year {year:<10}: {points}") | |
| # Redemption Dates | |
| if 'ip1_redemption' in details_info: | |
| output.append("\nIP 1 REDEMPTION DATES:") | |
| output.append("-" * 40) | |
| for semester, date in details_info['ip1_redemption'].items(): | |
| if date and date != '-': | |
| output.append(f"{semester:<10}: {date}") | |
| if 'ip2_redemption' in details_info: | |
| output.append("\nIP 2 REDEMPTION DATES:") | |
| output.append("-" * 40) | |
| for semester, date in details_info['ip2_redemption'].items(): | |
| if date and date != '-': | |
| output.append(f"{semester:<10}: {date}") | |
| if 'last_updated' in details_info: | |
| output.append(f"\nLAST UPDATED:") | |
| output.append("-" * 40) | |
| output.append(details_info['last_updated']) | |
| # Cache info | |
| if data_cache["last_update"]: | |
| cache_age = datetime.now() - data_cache["last_update"] | |
| hours = cache_age.total_seconds() / 3600 | |
| next_refresh_hours = 12 - hours | |
| output.append(f"\nπ Data age: {hours:.1f} hours") | |
| if next_refresh_hours > 0: | |
| output.append(f"β° Next auto-refresh in: {next_refresh_hours:.1f} hours") | |
| else: | |
| output.append("β° Auto-refresh due now") | |
| output.append("\n" + "=" * 80) | |
| return "\n".join(output) | |
| # Create Gradio interface | |
| with gr.Blocks(title="Student Reward Points Check", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# π Student Reward Points Check") | |
| gr.Markdown("### Search for student details including reward points and redemption dates") | |
| gr.Markdown("π **Auto-Updates**: Data automatically refreshes every 12 hours") | |
| with gr.Tabs(): | |
| with gr.TabItem("π Student Search"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| roll_input = gr.Textbox( | |
| label="Enter Roll Number", | |
| placeholder="e.g., 7376222AL181", | |
| value="" | |
| ) | |
| with gr.Column(scale=1): | |
| search_btn = gr.Button("π Search Student", variant="primary") | |
| result_output = gr.Textbox( | |
| label="Student Details", | |
| lines=25, | |
| max_lines=30, | |
| show_copy_button=True, | |
| autoscroll=False | |
| ) | |
| with gr.TabItem("βΉοΈ System Information"): | |
| system_btn = gr.Button("π Get System Information", variant="secondary") | |
| system_output = gr.Textbox( | |
| label="System Information", | |
| lines=30, | |
| max_lines=50, | |
| show_copy_button=True, | |
| autoscroll=False, | |
| interactive=False, | |
| show_label=True | |
| ) | |
| # Event handlers | |
| search_btn.click(fn=search_student, inputs=roll_input, outputs=result_output) | |
| roll_input.submit(fn=search_student, inputs=roll_input, outputs=result_output) | |
| system_btn.click(fn=get_system_info, outputs=system_output) | |
| # Load system info on startup | |
| app.load(fn=get_system_info, outputs=system_output) | |
| # π FOOTER SECTION | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown( | |
| """ | |
| <div style="text-align: center; margin-top: 20px; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 10px; color: white;"> | |
| <h3 style="margin: 0; color: white;">π» Developed with β€οΈ by</h3> | |
| <h2 style="margin: 5px 0; color: #ffd700;">PRANESH S</h2> | |
| <div style="margin: 15px 0;"> | |
| <a href="https://github.com/Pranesh-2005" target="_blank" style="color: #ffd700; text-decoration: none; margin: 0 10px; font-size: 16px;"> | |
| π± GitHub | |
| </a> | |
| <span style="color: #ffd700;">|</span> | |
| <a href="https://www.linkedin.com/in/pranesh5264/" target="_blank" style="color: #ffd700; text-decoration: none; margin: 0 10px; font-size: 16px;"> | |
| πΌ LinkedIn | |
| </a> | |
| </div> | |
| <p style="margin: 10px 0; font-style: italic; color: #e0e0e0;">Made with π Love and Support</p> | |
| <p style="margin: 5px 0; font-size: 14px; color: #b0b0b0;">π Empowering students with instant reward points tracking</p> | |
| </div> | |
| """, | |
| elem_id="footer" | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| print("π Launching Gradio interface...") | |
| app.launch(share=False, debug=True, server_name="0.0.0.0", server_port=7860, pwa=True) |