CSS_EDA_Dashboard / src /test_connection.py
arash7920's picture
Upload 35 files
29854ee verified
"""
Quick test script to verify Athena connection and basic functionality.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
def main():
"""Test basic connection and functionality."""
print("Testing Athena Connection...")
print("=" * 60)
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
print(f"βœ“ Configuration loaded")
print(f" Database: {config.database_name}")
print(f" Workgroup: {config.workgroup}")
print(f" Region: {config.region}")
print(f" S3 Output: {config.s3_output_location}")
print()
# Initialize Athena
try:
athena = AthenaQuery(config)
print("βœ“ Athena client initialized")
except Exception as e:
print(f"βœ— Failed to initialize Athena client: {e}")
return
# Test simple query
try:
print("Testing simple query...")
test_query = f"SHOW TABLES IN {config.database_name}"
df = athena.query_to_dataframe(test_query, timeout=60)
print(f"βœ“ Query executed successfully")
print(f" Found {len(df)} tables")
if not df.empty:
print(f" Sample tables: {list(df.iloc[:, 0])[:5]}")
except Exception as e:
print(f"βœ— Query failed: {e}")
import traceback
traceback.print_exc()
return
# Test catalog
try:
print("\nTesting catalog...")
catalog = DataLakeCatalog(athena, config)
tables = catalog.list_tables()
print(f"βœ“ Catalog initialized")
print(f" Total tables: {len(tables)}")
if tables:
devices = catalog.list_devices()
print(f" Devices found: {len(devices)}")
if devices:
print(f" Sample devices: {devices[:3]}")
except Exception as e:
print(f"βœ— Catalog test failed: {e}")
import traceback
traceback.print_exc()
return
print("\n" + "=" * 60)
print("βœ“ All tests passed! Connection is working.")
print("=" * 60)
if __name__ == "__main__":
main()