SlimG commited on
Commit
1a077cc
·
1 Parent(s): 87132b2

add refresh of materialized views

Browse files
Files changed (2) hide show
  1. src/jobs/views.py +40 -0
  2. src/repository/views.py +26 -0
src/jobs/views.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from rq_scheduler import Scheduler
3
+
4
+ from src.jobs.scheduler import get_redis_connection, is_job_scheduled_queued_or_started
5
+ from src.repository.common import get_session
6
+ from src.repository import views
7
+
8
+ logging.basicConfig(level=logging.INFO,
9
+ handlers=[logging.StreamHandler()])
10
+ logger = logging.getLogger(__name__)
11
+
12
+ def refresh_all_materialized_views():
13
+ try:
14
+ with get_session() as session:
15
+ views.refresh_all_materialized_views(db=session)
16
+ except Exception as e:
17
+ logger.error(f"Error: {e}")
18
+
19
+ def schedule_refresh():
20
+ """
21
+ Schedule the job to refresh the materialized view
22
+ """
23
+ job_id = 'refresh_materialized_views'
24
+
25
+ if job := is_job_scheduled_queued_or_started(job_id=job_id):
26
+ return job
27
+
28
+ with get_redis_connection() as r:
29
+ scheduler = Scheduler(connection=r)
30
+
31
+ # Schedule the job to run immediately
32
+ logger.info("Scheduling job for refreshing views")
33
+ job = scheduler.cron(
34
+ cron_string="0 3 * * *", # Every day at 3AM
35
+ func=refresh_all_materialized_views,
36
+ repeat=None,
37
+ id=job_id,
38
+ )
39
+
40
+ return job
src/repository/views.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import text
2
+ from sqlalchemy.orm import Session
3
+
4
+ def refresh_materialized_view(db: Session, view_name: str, schema_name: str) -> None:
5
+ """
6
+ Refresh concurrently the materialized view in Postgres
7
+ """
8
+ query = f"REFRESH MATERIALIZED VIEW {schema_name}.{view_name} WITH DATA;"
9
+
10
+ db.execute(text(query))
11
+ db.commit()
12
+
13
+ def refresh_all_materialized_views(db: Session) -> None:
14
+ """
15
+ Refresh all materialized views in Postgres
16
+ """
17
+ query = """
18
+ SELECT schemaname, matviewname
19
+ FROM pg_matviews;
20
+ """
21
+
22
+ result = db.execute(text(query)).fetchall()
23
+
24
+ for row in result:
25
+ refresh_materialized_view(db, view_name=row[1], schema_name=row[0])
26
+ print(f"Refreshed materialized view: {row[0]}.{row[1]}")