Spaces:
Sleeping
Sleeping
File size: 2,684 Bytes
e869d90 29854ee e869d90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | """
Quick test script to verify Athena connection and basic functionality.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
def main():
"""Test basic connection and functionality."""
print("Testing Athena Connection...")
print("=" * 60)
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
print(f"β Configuration loaded")
print(f" Database: {config.database_name}")
print(f" Workgroup: {config.workgroup}")
print(f" Region: {config.region}")
print(f" S3 Output: {config.s3_output_location}")
print()
# Initialize Athena
try:
athena = AthenaQuery(config)
print("β Athena client initialized")
except Exception as e:
print(f"β Failed to initialize Athena client: {e}")
return
# Test simple query
try:
print("Testing simple query...")
test_query = f"SHOW TABLES IN {config.database_name}"
df = athena.query_to_dataframe(test_query, timeout=60)
print(f"β Query executed successfully")
print(f" Found {len(df)} tables")
if not df.empty:
print(f" Sample tables: {list(df.iloc[:, 0])[:5]}")
except Exception as e:
print(f"β Query failed: {e}")
import traceback
traceback.print_exc()
return
# Test catalog
try:
print("\nTesting catalog...")
catalog = DataLakeCatalog(athena, config)
tables = catalog.list_tables()
print(f"β Catalog initialized")
print(f" Total tables: {len(tables)}")
if tables:
devices = catalog.list_devices()
print(f" Devices found: {len(devices)}")
if devices:
print(f" Sample devices: {devices[:3]}")
except Exception as e:
print(f"β Catalog test failed: {e}")
import traceback
traceback.print_exc()
return
print("\n" + "=" * 60)
print("β All tests passed! Connection is working.")
print("=" * 60)
if __name__ == "__main__":
main()
|