SlimG commited on
Commit
80baa7f
·
1 Parent(s): e9e3777

improve logging

Browse files
dags/__init__.py CHANGED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import logging.config
3
+
4
+ LOGGING_CONFIG = {
5
+ 'version': 1,
6
+ 'disable_existing_loggers': False,
7
+ 'formatters': {
8
+ 'default': {
9
+ 'format': '[%(asctime)s] %(levelname)s in %(name)s: %(message)s',
10
+ },
11
+ 'verbose': {
12
+ 'format': '[%(asctime)s] %(levelname)s [%(name)s:%(lineno)d] %(message)s',
13
+ },
14
+ },
15
+ 'handlers': {
16
+ 'console': {
17
+ 'class': 'logging.StreamHandler',
18
+ 'formatter': 'default',
19
+ },
20
+ # Exemple fichier
21
+ 'file': {
22
+ 'class': 'logging.FileHandler',
23
+ 'filename': 'app.log',
24
+ 'formatter': 'verbose',
25
+ },
26
+ },
27
+ 'root': {
28
+ 'handlers': ['console', 'file'],
29
+ 'level': 'INFO',
30
+ },
31
+ 'loggers': {
32
+ 'myapp': {
33
+ 'handlers': ['console'],
34
+ 'level': 'DEBUG',
35
+ 'propagate': False,
36
+ },
37
+ }
38
+ }
39
+
40
+ def setup_logging():
41
+ logging.config.dictConfig(LOGGING_CONFIG)
42
+
43
+ setup_logging()
dags/common.py CHANGED
@@ -8,6 +8,8 @@ from datetime import datetime, timedelta
8
  from typing import Generator
9
  from contextlib import contextmanager
10
 
 
 
11
  # Load environment variables from .env file
12
  load_dotenv()
13
 
@@ -63,7 +65,7 @@ def check_db_connection():
63
  try:
64
  with get_session() as session:
65
  session.execute(text("SELECT 1"))
66
- logging.info("Database connection is successful.")
67
  except Exception as e:
68
- logging.error(f"Database connection failed: {e}")
69
  raise
 
8
  from typing import Generator
9
  from contextlib import contextmanager
10
 
11
+ logger = logging.getLogger(__name__)
12
+
13
  # Load environment variables from .env file
14
  load_dotenv()
15
 
 
65
  try:
66
  with get_session() as session:
67
  session.execute(text("SELECT 1"))
68
+ logger.info("Database connection is successful.")
69
  except Exception as e:
70
+ logger.error(f"Database connection failed: {e}")
71
  raise
dags/new_transaction.py CHANGED
@@ -16,6 +16,8 @@ from http.client import (
16
  )
17
  from pathlib import Path
18
 
 
 
19
  # Load environment variables from .env file
20
  load_dotenv(Path(__file__).parent.parent / ".env")
21
 
@@ -39,7 +41,7 @@ def _pull_transaction(ti, prefix: str = ''):
39
  # If status code is 429 or 500, wait for a few seconds and retry
40
  if response.status_code in [TOO_MANY_REQUESTS, INTERNAL_SERVER_ERROR]:
41
  waiting_time = 15
42
- logging.warning(f"Rate limit exceeded. Retrying in {waiting_time} seconds...")
43
  time.sleep(waiting_time)
44
  response = get_current_transaction()
45
 
@@ -55,7 +57,7 @@ def _pull_transaction(ti, prefix: str = ''):
55
  # Push the transaction dictionary to XCom
56
  ti.xcom_push(key=f"{prefix}_transaction_dict", value=transaction_dict)
57
 
58
- logging.info(f"Fetched data: {transaction_dict}")
59
 
60
  @task(task_id="push_transaction")
61
  def _push_transaction(ti, prefix: str = ''):
@@ -93,7 +95,7 @@ def _push_transaction(ti, prefix: str = ''):
93
 
94
  # Check if the transaction dictionary is empty
95
  if not transaction_dict:
96
- logging.error("No transaction data found.")
97
  return
98
 
99
  # Call the fraud detection pipeline with the transaction dictionary
@@ -116,7 +118,7 @@ def _push_transaction(ti, prefix: str = ''):
116
  # Load the JSON data
117
  data = api_response.json()
118
 
119
- logging.info(f"Fraud detection response: {data}")
120
 
121
  # Copy default_args from the original code
122
  dag_args = default_args.copy()
 
16
  )
17
  from pathlib import Path
18
 
19
+ logger = logging.getLogger(__name__)
20
+
21
  # Load environment variables from .env file
22
  load_dotenv(Path(__file__).parent.parent / ".env")
23
 
 
41
  # If status code is 429 or 500, wait for a few seconds and retry
42
  if response.status_code in [TOO_MANY_REQUESTS, INTERNAL_SERVER_ERROR]:
43
  waiting_time = 15
44
+ logger.warning(f"Rate limit exceeded. Retrying in {waiting_time} seconds...")
45
  time.sleep(waiting_time)
46
  response = get_current_transaction()
47
 
 
57
  # Push the transaction dictionary to XCom
58
  ti.xcom_push(key=f"{prefix}_transaction_dict", value=transaction_dict)
59
 
60
+ logger.info(f"Fetched data: {transaction_dict}")
61
 
62
  @task(task_id="push_transaction")
63
  def _push_transaction(ti, prefix: str = ''):
 
95
 
96
  # Check if the transaction dictionary is empty
97
  if not transaction_dict:
98
+ logger.error("No transaction data found.")
99
  return
100
 
101
  # Call the fraud detection pipeline with the transaction dictionary
 
118
  # Load the JSON data
119
  data = api_response.json()
120
 
121
+ logger.info(f"Fraud detection response: {data}")
122
 
123
  # Copy default_args from the original code
124
  dag_args = default_args.copy()
dags/refresh_views.py CHANGED
@@ -5,6 +5,8 @@ from datetime import datetime, timedelta
5
  import logging
6
  from sqlalchemy import text
7
 
 
 
8
  @task(task_id="refresh_views")
9
  def _refresh_views():
10
  """
@@ -17,7 +19,7 @@ def _refresh_views():
17
 
18
  for view in list_views:
19
  view_name = view[0]
20
- logging.info(f"Refreshing materialized view: {view_name}")
21
  session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY public.{view_name} WITH DATA;"))
22
 
23
  session.commit()
 
5
  import logging
6
  from sqlalchemy import text
7
 
8
+ logger = logging.getLogger(__name__)
9
+
10
  @task(task_id="refresh_views")
11
  def _refresh_views():
12
  """
 
19
 
20
  for view in list_views:
21
  view_name = view[0]
22
+ logger.info(f"Refreshing materialized view: {view_name}")
23
  session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY public.{view_name} WITH DATA;"))
24
 
25
  session.commit()