IndraneelKumar commited on
Commit
804054e
·
1 Parent(s): dc59d80

Added RSS Feeds for Medium Articles and Individual Publications

Browse files
.env.example CHANGED
@@ -1,4 +1,4 @@
1
- SUPABASE_DB__TABLE_NAME=substack_articles
2
  SUPABASE_DB__HOST=your_supabase_db_host_here
3
  SUPABASE_DB__NAME=postgres
4
  SUPABASE_DB__USER=your_supabase_db_user_here
@@ -12,7 +12,7 @@ RSS__BATCH_SIZE=30
12
  # Qdrant configurationbatch
13
  QDRANT__API_KEY=your_qdrant_api_key_here
14
  QDRANT__URL=your_qdrant_url_here
15
- QDRANT__COLLECTION_NAME=substack_collection
16
  QDRANT__DENSE_MODEL_NAME=BAAI/bge-base-en-v1.5 # BAAI/bge-large-en-v1.5 (1024), BAAI/bge-base-en-v1.5 (HF, 768). BAAI/bge-base-en (Fastembed, 768)
17
  QDRANT__SPARSE_MODEL_NAME=Qdrant/bm25 # prithivida/Splade_PP_en_v1, Qdrant/bm25
18
  QDRANT__VECTOR_DIM=768 # 768, 1024
@@ -49,7 +49,7 @@ OPENROUTER__API_URL=https://openrouter.ai/api/v1
49
 
50
  # OPIK OBSERVABILITY
51
  OPIK__API_KEY=your_opik_api_key_here
52
- OPIK__PROJECT_NAME=substack-pipeline
53
 
54
  # FastAPI Endpoint
55
  BACKEND_URL=your_fastapi_backend_url_here
 
1
+ SUPABASE_DB__TABLE_NAME=feed_articles
2
  SUPABASE_DB__HOST=your_supabase_db_host_here
3
  SUPABASE_DB__NAME=postgres
4
  SUPABASE_DB__USER=your_supabase_db_user_here
 
12
  # Qdrant configurationbatch
13
  QDRANT__API_KEY=your_qdrant_api_key_here
14
  QDRANT__URL=your_qdrant_url_here
15
+ QDRANT__COLLECTION_NAME=feed_collection
16
  QDRANT__DENSE_MODEL_NAME=BAAI/bge-base-en-v1.5 # BAAI/bge-large-en-v1.5 (1024), BAAI/bge-base-en-v1.5 (HF, 768). BAAI/bge-base-en (Fastembed, 768)
17
  QDRANT__SPARSE_MODEL_NAME=Qdrant/bm25 # prithivida/Splade_PP_en_v1, Qdrant/bm25
18
  QDRANT__VECTOR_DIM=768 # 768, 1024
 
49
 
50
  # OPIK OBSERVABILITY
51
  OPIK__API_KEY=your_opik_api_key_here
52
+ OPIK__PROJECT_NAME=feed-pipeline
53
 
54
  # FastAPI Endpoint
55
  BACKEND_URL=your_fastapi_backend_url_here
README.md CHANGED
@@ -12,7 +12,7 @@ pinned: false
12
 
13
  # Articles Search Engine
14
 
15
- A compact, production-style RAG pipeline. It ingests Substack RSS articles, stores them in Postgres (Supabase), creates dense/sparse embeddings in Qdrant, and exposes search and answer endpoints via FastAPI with a simple Gradio UI.
16
 
17
  ## How it works (brief)
18
  - Ingest RSS → Supabase:
@@ -23,6 +23,7 @@ A compact, production-style RAG pipeline. It ingests Substack RSS articles, stor
23
  - Search + generate:
24
  - FastAPI (`src/api/main.py`) exposes search endpoints (keyword, semantic, hybrid) and assembles answers with citations.
25
  - LLM providers are pluggable with fallback (OpenRouter, OpenAI, Hugging Face).
 
26
  - UI + deploy:
27
  - Gradio app for quick local search (`frontend/app.py`).
28
  - Containerization with Docker and optional deploy to Google Cloud Run.
@@ -31,7 +32,7 @@ A compact, production-style RAG pipeline. It ingests Substack RSS articles, stor
31
  - Python 3.12, FastAPI, Prefect, SQLAlchemy
32
  - Supabase (Postgres) for articles
33
  - Qdrant for vector search (dense + sparse/hybrid)
34
- - OpenRouter / OpenAI / Hugging Face for LLM completion
35
  - Gradio UI, Docker, Google Cloud Run
36
  - Config via Pydantic Settings, `uv` or `pip` for deps
37
 
 
12
 
13
  # Articles Search Engine
14
 
15
+ A compact, production-style RAG pipeline. It ingests Substack, Medium and top publications RSS articles, stores them in Postgres (Supabase), creates dense/sparse embeddings in Qdrant, and exposes search and answer endpoints via FastAPI with a simple Gradio UI.
16
 
17
  ## How it works (brief)
18
  - Ingest RSS → Supabase:
 
23
  - Search + generate:
24
  - FastAPI (`src/api/main.py`) exposes search endpoints (keyword, semantic, hybrid) and assembles answers with citations.
25
  - LLM providers are pluggable with fallback (OpenRouter, OpenAI, Hugging Face).
26
+ - Opik is used for Evaluation
27
  - UI + deploy:
28
  - Gradio app for quick local search (`frontend/app.py`).
29
  - Containerization with Docker and optional deploy to Google Cloud Run.
 
32
  - Python 3.12, FastAPI, Prefect, SQLAlchemy
33
  - Supabase (Postgres) for articles
34
  - Qdrant for vector search (dense + sparse/hybrid)
35
+ - OpenRouter / OpenAI / Hugging Face for LLM completion, Opik for LLM Evaluation
36
  - Gradio UI, Docker, Google Cloud Run
37
  - Config via Pydantic Settings, `uv` or `pip` for deps
38
 
cloudbuild_fastapi.yaml CHANGED
@@ -7,6 +7,6 @@ steps:
7
  export DOCKER_BUILDKIT=1
8
  docker build -t gcr.io/${PROJECT_ID}/${_SERVICE_NAME} -f Dockerfile .
9
  substitutions:
10
- _SERVICE_NAME: "substack-pipeline-fastapi"
11
  images:
12
  - "gcr.io/${PROJECT_ID}/${_SERVICE_NAME}"
 
7
  export DOCKER_BUILDKIT=1
8
  docker build -t gcr.io/${PROJECT_ID}/${_SERVICE_NAME} -f Dockerfile .
9
  substitutions:
10
+ _SERVICE_NAME: "feed-pipeline-fastapi"
11
  images:
12
  - "gcr.io/${PROJECT_ID}/${_SERVICE_NAME}"
deploy_fastapi.sh CHANGED
@@ -26,7 +26,7 @@ echo "✅ Environment variables loaded."
26
  # Configuration
27
  # -----------------------
28
  PROJECT_ID="personal-projects-477710"
29
- SERVICE_NAME="substack-pipeline-fastapi"
30
  REGION="asia-south2" #europe-west1 "europe-west6"
31
  IMAGE_NAME="gcr.io/$PROJECT_ID/$SERVICE_NAME"
32
 
 
26
  # Configuration
27
  # -----------------------
28
  PROJECT_ID="personal-projects-477710"
29
+ SERVICE_NAME="feed-pipeline-fastapi"
30
  REGION="asia-south2" #europe-west1 "europe-west6"
31
  IMAGE_NAME="gcr.io/$PROJECT_ID/$SERVICE_NAME"
32
 
frontend/app.py CHANGED
@@ -551,12 +551,12 @@ def update_model_choices(provider):
551
  # -----------------------
552
  # Gradio UI
553
  # -----------------------
554
- with gr.Blocks(title="Substack Articles LLM Engine", theme=gr.themes.Soft(), css=CUSTOM_CSS) as demo:
555
  # Header
556
  gr.HTML(
557
  "<div id='app-header'>"
558
- " <h1>📰 Substack Articles LLM Engine</h1>"
559
- " <p>Search Substack content or ask an AI across your feeds — fast and delightful.</p>"
560
  "</div>"
561
  )
562
 
 
551
  # -----------------------
552
  # Gradio UI
553
  # -----------------------
554
+ with gr.Blocks(title="AI Search Engine for Articles", theme=gr.themes.Soft(), css=CUSTOM_CSS) as demo:
555
  # Header
556
  gr.HTML(
557
  "<div id='app-header'>"
558
+ " <h1>📰 AI Search Engine for Articles</h1>"
559
+ " <p>Search Substack, Medium and top publications content or ask an AI across your feeds — fast and delightful.</p>"
560
  "</div>"
561
  )
562
 
prefect-cloud.yaml CHANGED
@@ -1,7 +1,7 @@
1
  pull:
2
  - prefect.deployments.steps.git_clone:
3
  id: clone-step
4
- repository: https://github.com/Indraneel99/substack-newsletters-search-course
5
  credentials: "{{ prefect.blocks.github-credentials.my-gh-creds }}"
6
 
7
  - prefect.deployments.steps.run_shell_script:
 
1
  pull:
2
  - prefect.deployments.steps.git_clone:
3
  id: clone-step
4
+ repository: https://github.com/Indraneel99/AISearchEngine
5
  credentials: "{{ prefect.blocks.github-credentials.my-gh-creds }}"
6
 
7
  - prefect.deployments.steps.run_shell_script:
prefect-local.yaml CHANGED
@@ -1,7 +1,7 @@
1
  pull:
2
  - prefect.deployments.steps.git_clone:
3
  id: clone-step
4
- repository: https://github.com/Indraneel99/substack-newsletters-search-course
5
  credentials: "{{ prefect.blocks.github-credentials.my-gh-creds }}"
6
 
7
  # This function ensures pip is installed in the environment (Only needed for Prefect Server)
 
1
  pull:
2
  - prefect.deployments.steps.git_clone:
3
  id: clone-step
4
+ repository: https://github.com/Indraneel99/AISearchEngine
5
  credentials: "{{ prefect.blocks.github-credentials.my-gh-creds }}"
6
 
7
  # This function ensures pip is installed in the environment (Only needed for Prefect Server)
pyproject.toml CHANGED
@@ -1,7 +1,7 @@
1
  [project]
2
- name = "substack-newsletters-search-course"
3
  version = "1.0.0"
4
- description = "A pipeline to retrieve Newsletters from Substack"
5
  readme = "README.md"
6
  authors = [
7
  {name = "Benito Martin"}
 
1
  [project]
2
+ name = "AISearchEngine"
3
  version = "1.0.0"
4
+ description = "A pipeline to retrieve Newsletters from Substack, Medium and Top publications"
5
  readme = "README.md"
6
  authors = [
7
  {name = "Benito Martin"}
src/api/main.py CHANGED
@@ -73,9 +73,9 @@ async def lifespan(app: FastAPI):
73
  # -----------------------
74
 
75
  app = FastAPI(
76
- title="Substack RAG API",
77
  version="1.0",
78
- description="API for Substack Retrieval-Augmented Generation (RAG) system",
79
  lifespan=lifespan,
80
  # root_path=root_path,
81
  )
 
73
  # -----------------------
74
 
75
  app = FastAPI(
76
+ title="Search Engine RAG API",
77
  version="1.0",
78
+ description="API for Articles Search Retrieval-Augmented Generation (RAG) system",
79
  lifespan=lifespan,
80
  # root_path=root_path,
81
  )
src/config.py CHANGED
@@ -12,13 +12,13 @@ from src.models.article_models import FeedItem
12
  # Supabase database settings
13
  # -----------------------------
14
  class SupabaseDBSettings(BaseModel):
15
- table_name: str = Field(default="substack_articles", description="Supabase table name")
16
  host: str = Field(default="localhost", description="Database host")
17
  name: str = Field(default="postgres", description="Database name")
18
  user: str = Field(default="postgres", description="Database user")
19
  password: SecretStr = Field(default=SecretStr("password"), description="Database password")
20
  port: int = Field(default=6543, description="Database port")
21
- test_database: str = Field(default="substack_test", description="Test database name")
22
 
23
 
24
  # -----------------------------
@@ -43,7 +43,7 @@ class QdrantSettings(BaseModel):
43
  api_key: str = Field(default="", description="Qdrant API key")
44
  timeout: int = Field(default=30, description="Qdrant client timeout")
45
  collection_name: str = Field(
46
- default="substack_collection", description="Qdrant collection name"
47
  )
48
  dense_model_name: str = Field(default="BAAI/bge-base-en", description="Dense model name")
49
  sparse_model_name: str = Field(
@@ -126,7 +126,7 @@ class OpenRouterSettings(BaseModel):
126
  # -----------------------------
127
  class OpikObservabilitySettings(BaseModel):
128
  api_key: str = Field(default="", description="Opik Observability API key")
129
- project_name: str = Field(default="substack-pipeline", description="Opik project name")
130
 
131
 
132
  # -----------------------------
 
12
  # Supabase database settings
13
  # -----------------------------
14
  class SupabaseDBSettings(BaseModel):
15
+ table_name: str = Field(default="feed_articles", description="Supabase table name")
16
  host: str = Field(default="localhost", description="Database host")
17
  name: str = Field(default="postgres", description="Database name")
18
  user: str = Field(default="postgres", description="Database user")
19
  password: SecretStr = Field(default=SecretStr("password"), description="Database password")
20
  port: int = Field(default=6543, description="Database port")
21
+ test_database: str = Field(default="feed_test", description="Test database name")
22
 
23
 
24
  # -----------------------------
 
43
  api_key: str = Field(default="", description="Qdrant API key")
44
  timeout: int = Field(default=30, description="Qdrant client timeout")
45
  collection_name: str = Field(
46
+ default="feed_collection", description="Qdrant collection name"
47
  )
48
  dense_model_name: str = Field(default="BAAI/bge-base-en", description="Dense model name")
49
  sparse_model_name: str = Field(
 
126
  # -----------------------------
127
  class OpikObservabilitySettings(BaseModel):
128
  api_key: str = Field(default="", description="Opik Observability API key")
129
+ project_name: str = Field(default="feed-pipeline", description="Opik project name")
130
 
131
 
132
  # -----------------------------
src/configs/feeds_rss.yaml CHANGED
@@ -89,3 +89,39 @@ feeds:
89
  - name: "slys.dev"
90
  author: "Anna & Jakub Slys"
91
  url: "https://iam.slys.dev/feed"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  - name: "slys.dev"
90
  author: "Anna & Jakub Slys"
91
  url: "https://iam.slys.dev/feed"
92
+ - name: "Technology Review"
93
+ author: "Technology Review"
94
+ url: "https://www.technologyreview.com/feed"
95
+ - name: "AI Trends"
96
+ author: "AI Trends"
97
+ url: "https://www.aitrends.com/feed"
98
+ - name: "Machine Learning Mastery"
99
+ author: "Machine Learning Mastery"
100
+ url: "https://machinelearningmastery.com/feed"
101
+ - name: "The Gradient"
102
+ author: "The Gradient"
103
+ url: "https://thegradient.pub/rss/"
104
+ - name: "Towards Data Science"
105
+ author: "Towards Data Science"
106
+ url: "https://towardsdatascience.com/feed/"
107
+ - name: "Microsoft AI"
108
+ author: "Microsoft AI"
109
+ url: "https://blogs.microsoft.com/ai/feed/"
110
+ - name: "Marktechpost"
111
+ author: "Marktechpost"
112
+ url: "https://www.marktechpost.com/feed/"
113
+ - name: "Daily AI"
114
+ author: "Daily AI"
115
+ url: "https://dailyai.com/feed/"
116
+ - name: "TopBots"
117
+ author: "TopBots"
118
+ url: "https://www.topbots.com/feed/"
119
+ - name: "Towards AI"
120
+ author: "Towards AI"
121
+ url: "https://pub.towardsai.net/feed/"
122
+ - name: "The Pycoach"
123
+ author: "Frank Andrade"
124
+ url: "https://medium.com/feed/@frank-andrade"
125
+ - name: "Anthony Alcaraz"
126
+ author: "Anthony Alcaraz"
127
+ url: "https://medium.com/feed/@alcarazanthony1"
src/infrastructure/qdrant/qdrant_vectorstore.py CHANGED
@@ -26,7 +26,7 @@ from qdrant_client.models import Batch, Distance, SparseVector, models
26
  from sqlalchemy.orm import Session
27
 
28
  from src.config import settings
29
- from src.models.sql_models import SubstackArticle
30
  from src.models.vectorstore_models import ArticleChunkPayload
31
  from src.utils.logger_util import log_batch_status, setup_logging
32
  from src.utils.text_splitter import TextSplitter
@@ -522,7 +522,7 @@ class AsyncQdrantVectorStore:
522
 
523
  async def _article_batch_generator(
524
  self, session: Session, from_date: datetime | None = None
525
- ) -> AsyncGenerator[list[SubstackArticle], None]:
526
  """Yield batches of articles from SQL database.
527
 
528
  Args:
@@ -530,7 +530,7 @@ class AsyncQdrantVectorStore:
530
  from_date (datetime, optional): Filter articles from this date.
531
 
532
  Yields:
533
- list[SubstackArticle]: Batch of articles.
534
 
535
  Raises:
536
  Exception: If database query fails.
@@ -542,9 +542,9 @@ class AsyncQdrantVectorStore:
542
  try:
543
  offset = 0
544
  while True:
545
- query = session.query(SubstackArticle).order_by(SubstackArticle.published_at)
546
  if from_date:
547
- query = query.filter(SubstackArticle.published_at >= from_date)
548
  articles = query.offset(offset).limit(self.article_batch_size).all()
549
  if not articles:
550
  break
 
26
  from sqlalchemy.orm import Session
27
 
28
  from src.config import settings
29
+ from src.models.sql_models import FeedArticle
30
  from src.models.vectorstore_models import ArticleChunkPayload
31
  from src.utils.logger_util import log_batch_status, setup_logging
32
  from src.utils.text_splitter import TextSplitter
 
522
 
523
  async def _article_batch_generator(
524
  self, session: Session, from_date: datetime | None = None
525
+ ) -> AsyncGenerator[list[FeedArticle], None]:
526
  """Yield batches of articles from SQL database.
527
 
528
  Args:
 
530
  from_date (datetime, optional): Filter articles from this date.
531
 
532
  Yields:
533
+ list[FeedArticle]: Batch of articles.
534
 
535
  Raises:
536
  Exception: If database query fails.
 
542
  try:
543
  offset = 0
544
  while True:
545
+ query = session.query(FeedArticle).order_by(FeedArticle.published_at)
546
  if from_date:
547
+ query = query.filter(FeedArticle.published_at >= from_date)
548
  articles = query.offset(offset).limit(self.article_batch_size).all()
549
  if not articles:
550
  break
src/infrastructure/supabase/create_db.py CHANGED
@@ -2,17 +2,17 @@ from sqlalchemy import inspect
2
  from sqlalchemy.exc import SQLAlchemyError
3
 
4
  from src.infrastructure.supabase.init_session import init_engine
5
- from src.models.sql_models import Base, SubstackArticle
6
  from src.utils.logger_util import setup_logging
7
 
8
  logger = setup_logging()
9
 
10
 
11
  def create_table() -> None:
12
- """Create the SubstackArticle table in the Supabase Postgres database if it does not exist.
13
 
14
  This function initializes a SQLAlchemy engine, checks if the table defined by
15
- `SubstackArticle.__tablename__` exists in the database, and creates it if necessary.
16
  The engine is properly disposed of after the operation to prevent resource leaks.
17
  Errors during table creation are logged and handled gracefully.
18
 
@@ -33,14 +33,14 @@ def create_table() -> None:
33
  # Create an inspector to check existing tables
34
  inspector = inspect(engine)
35
  existing_tables = inspector.get_table_names()
36
- table_name = SubstackArticle.__tablename__
37
 
38
  # Check if the table already exists
39
  if table_name in existing_tables:
40
  logger.info(f"Table '{table_name}' already exists. No action needed.")
41
  else:
42
  logger.info(f"Table '{table_name}' does not exist. Creating...")
43
- # Create all tables defined in Base.metadata (includes SubstackArticle)
44
  Base.metadata.create_all(bind=engine)
45
  logger.info(f"Table '{table_name}' created successfully.")
46
  except SQLAlchemyError as e:
 
2
  from sqlalchemy.exc import SQLAlchemyError
3
 
4
  from src.infrastructure.supabase.init_session import init_engine
5
+ from src.models.sql_models import Base, FeedArticle
6
  from src.utils.logger_util import setup_logging
7
 
8
  logger = setup_logging()
9
 
10
 
11
  def create_table() -> None:
12
+ """Create the FeedArticle table in the Supabase Postgres database if it does not exist.
13
 
14
  This function initializes a SQLAlchemy engine, checks if the table defined by
15
+ `FeedArticle.__tablename__` exists in the database, and creates it if necessary.
16
  The engine is properly disposed of after the operation to prevent resource leaks.
17
  Errors during table creation are logged and handled gracefully.
18
 
 
33
  # Create an inspector to check existing tables
34
  inspector = inspect(engine)
35
  existing_tables = inspector.get_table_names()
36
+ table_name = FeedArticle.__tablename__
37
 
38
  # Check if the table already exists
39
  if table_name in existing_tables:
40
  logger.info(f"Table '{table_name}' already exists. No action needed.")
41
  else:
42
  logger.info(f"Table '{table_name}' does not exist. Creating...")
43
+ # Create all tables defined in Base.metadata (includes FeedArticle)
44
  Base.metadata.create_all(bind=engine)
45
  logger.info(f"Table '{table_name}' created successfully.")
46
  except SQLAlchemyError as e:
src/models/sql_models.py CHANGED
@@ -12,7 +12,7 @@ class Base(DeclarativeBase):
12
  pass
13
 
14
 
15
- class SubstackArticle(Base):
16
  __tablename__ = settings.supabase_db.table_name
17
 
18
  # Primary internal ID
 
12
  pass
13
 
14
 
15
+ class FeedArticle(Base):
16
  __tablename__ = settings.supabase_db.table_name
17
 
18
  # Primary internal ID
src/pipelines/flows/rss_ingestion_flow.py CHANGED
@@ -3,7 +3,7 @@ from prefect import flow, unmapped
3
  from src.config import settings
4
  from src.infrastructure.supabase.init_session import init_engine
5
  from src.models.article_models import FeedItem
6
- from src.models.sql_models import SubstackArticle
7
  from src.pipelines.tasks.fetch_rss import fetch_rss_entries
8
  from src.pipelines.tasks.ingest_rss import ingest_from_rss
9
  from src.utils.logger_util import setup_logging
@@ -16,7 +16,7 @@ from src.utils.logger_util import setup_logging
16
  retries=2,
17
  retry_delay_seconds=120,
18
  )
19
- def rss_ingest_flow(article_model: type[SubstackArticle] = SubstackArticle) -> None:
20
  """Fetch and ingest articles from configured RSS feeds concurrently.
21
 
22
  Each feed is fetched in parallel and ingested into the database
@@ -24,7 +24,7 @@ def rss_ingest_flow(article_model: type[SubstackArticle] = SubstackArticle) -> N
24
  after completion.
25
 
26
  Args:
27
- article_model (type[SubstackArticle]): SQLAlchemy model for storing articles.
28
 
29
  Returns:
30
  None
@@ -115,4 +115,4 @@ def rss_ingest_flow(article_model: type[SubstackArticle] = SubstackArticle) -> N
115
 
116
 
117
  if __name__ == "__main__":
118
- rss_ingest_flow(article_model=SubstackArticle)
 
3
  from src.config import settings
4
  from src.infrastructure.supabase.init_session import init_engine
5
  from src.models.article_models import FeedItem
6
+ from src.models.sql_models import FeedArticle
7
  from src.pipelines.tasks.fetch_rss import fetch_rss_entries
8
  from src.pipelines.tasks.ingest_rss import ingest_from_rss
9
  from src.utils.logger_util import setup_logging
 
16
  retries=2,
17
  retry_delay_seconds=120,
18
  )
19
+ def rss_ingest_flow(article_model: type[FeedArticle] = FeedArticle) -> None:
20
  """Fetch and ingest articles from configured RSS feeds concurrently.
21
 
22
  Each feed is fetched in parallel and ingested into the database
 
24
  after completion.
25
 
26
  Args:
27
+ article_model (type[FeedArticle]): SQLAlchemy model for storing articles.
28
 
29
  Returns:
30
  None
 
115
 
116
 
117
  if __name__ == "__main__":
118
+ rss_ingest_flow(article_model=FeedArticle)
src/pipelines/tasks/fetch_rss.py CHANGED
@@ -8,13 +8,13 @@ from sqlalchemy.orm import Session
8
 
9
  from src.infrastructure.supabase.init_session import init_session
10
  from src.models.article_models import ArticleItem, FeedItem
11
- from src.models.sql_models import SubstackArticle
12
  from src.utils.logger_util import setup_logging
13
 
14
 
15
  @task(
16
  task_run_name="fetch_rss_entries-{feed.name}",
17
- description="Fetch RSS entries from a Substack feed.",
18
  retries=2,
19
  retry_delay_seconds=120,
20
  cache_policy=NO_CACHE,
@@ -22,9 +22,9 @@ from src.utils.logger_util import setup_logging
22
  def fetch_rss_entries(
23
  feed: FeedItem,
24
  engine: Engine,
25
- article_model: type[SubstackArticle] = SubstackArticle,
26
  ) -> list[ArticleItem]:
27
- """Fetch all RSS items from a Substack feed and convert them to ArticleItem objects.
28
 
29
  Each task uses its own SQLAlchemy session. Articles already stored in the database
30
  or with empty links/content are skipped. Errors during parsing individual items
@@ -33,8 +33,8 @@ def fetch_rss_entries(
33
  Args:
34
  feed (FeedItem): Metadata for the feed (name, author, URL).
35
  engine (Engine): SQLAlchemy engine for database connection.
36
- article_model (type[SubstackArticle], optional): Model used to persist articles.
37
- Defaults to SubstackArticle.
38
 
39
  Returns:
40
  list[ArticleItem]: List of new ArticleItem objects ready for parsing/ingestion.
 
8
 
9
  from src.infrastructure.supabase.init_session import init_session
10
  from src.models.article_models import ArticleItem, FeedItem
11
+ from src.models.sql_models import FeedArticle
12
  from src.utils.logger_util import setup_logging
13
 
14
 
15
  @task(
16
  task_run_name="fetch_rss_entries-{feed.name}",
17
+ description="Fetch RSS entries from a substack/Medium and top publications feed.",
18
  retries=2,
19
  retry_delay_seconds=120,
20
  cache_policy=NO_CACHE,
 
22
  def fetch_rss_entries(
23
  feed: FeedItem,
24
  engine: Engine,
25
+ article_model: type[FeedArticle] = FeedArticle,
26
  ) -> list[ArticleItem]:
27
+ """Fetch all RSS items from a substack/Medium and top publications feed and convert them to ArticleItem objects.
28
 
29
  Each task uses its own SQLAlchemy session. Articles already stored in the database
30
  or with empty links/content are skipped. Errors during parsing individual items
 
33
  Args:
34
  feed (FeedItem): Metadata for the feed (name, author, URL).
35
  engine (Engine): SQLAlchemy engine for database connection.
36
+ article_model (type[FeedArticle], optional): Model used to persist articles.
37
+ Defaults to FeedArticle.
38
 
39
  Returns:
40
  list[ArticleItem]: List of new ArticleItem objects ready for parsing/ingestion.
src/pipelines/tasks/ingest_rss.py CHANGED
@@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
6
  from src.config import settings
7
  from src.infrastructure.supabase.init_session import init_session
8
  from src.models.article_models import ArticleItem, FeedItem
9
- from src.models.sql_models import SubstackArticle
10
  from src.utils.logger_util import setup_logging
11
 
12
 
@@ -20,7 +20,7 @@ from src.utils.logger_util import setup_logging
20
  def ingest_from_rss(
21
  fetched_articles: list[ArticleItem],
22
  feed: FeedItem,
23
- article_model: type[SubstackArticle],
24
  engine: Engine,
25
  ) -> None:
26
  """Ingest articles fetched from RSS (already Markdownified).
@@ -93,7 +93,7 @@ def ingest_from_rss(
93
  def _persist_batch(
94
  session: Session,
95
  batch: list[ArticleItem],
96
- article_model: type[SubstackArticle],
97
  ) -> None:
98
  """Helper to bulk insert a batch of ArticleItems."""
99
  rows = [
 
6
  from src.config import settings
7
  from src.infrastructure.supabase.init_session import init_session
8
  from src.models.article_models import ArticleItem, FeedItem
9
+ from src.models.sql_models import FeedArticle
10
  from src.utils.logger_util import setup_logging
11
 
12
 
 
20
  def ingest_from_rss(
21
  fetched_articles: list[ArticleItem],
22
  feed: FeedItem,
23
+ article_model: type[FeedArticle],
24
  engine: Engine,
25
  ) -> None:
26
  """Ingest articles fetched from RSS (already Markdownified).
 
93
  def _persist_batch(
94
  session: Session,
95
  batch: list[ArticleItem],
96
+ article_model: type[FeedArticle],
97
  ) -> None:
98
  """Helper to bulk insert a batch of ArticleItems."""
99
  rows = [
tests/integration/test_db_connection.py CHANGED
@@ -8,7 +8,7 @@ setup_logging()
8
 
9
 
10
  def test_connect_to_test_table(db_session: Connection) -> None:
11
- """Test connectivity to the 'substack_test' table and fetch a single row.
12
 
13
  Args:
14
  db_session (Connection): SQLAlchemy Connection object.
@@ -18,12 +18,12 @@ def test_connect_to_test_table(db_session: Connection) -> None:
18
  Exception: If the table does not exist or query fails.
19
 
20
  """
21
- logger.info("Testing connection to 'substack_test' table...")
22
 
23
  try:
24
- result = db_session.execute(text("SELECT * FROM substack_test LIMIT 1")).fetchall()
25
  logger.info(f"Query result: {result}")
26
  assert isinstance(result, list), "Query result is not a list"
27
  except Exception as e:
28
- logger.error(f"Failed to query 'substack_test' table: {e}")
29
  raise
 
8
 
9
 
10
  def test_connect_to_test_table(db_session: Connection) -> None:
11
+ """Test connectivity to the 'feed_test' table and fetch a single row.
12
 
13
  Args:
14
  db_session (Connection): SQLAlchemy Connection object.
 
18
  Exception: If the table does not exist or query fails.
19
 
20
  """
21
+ logger.info("Testing connection to 'feed_test' table...")
22
 
23
  try:
24
+ result = db_session.execute(text("SELECT * FROM feed_test LIMIT 1")).fetchall()
25
  logger.info(f"Query result: {result}")
26
  assert isinstance(result, list), "Query result is not a list"
27
  except Exception as e:
28
+ logger.error(f"Failed to query 'feed_test' table: {e}")
29
  raise
tests/integration/test_rss_pipeline.py CHANGED
@@ -4,7 +4,7 @@ from loguru import logger
4
  from sqlalchemy import text
5
  from sqlalchemy.engine import Engine
6
  from sqlalchemy.orm import Session
7
- from test_models.test_sql_models import SubstackTestArticle # Test-specific table model
8
 
9
  from src.models.article_models import FeedItem
10
  from src.pipelines.tasks.fetch_rss import fetch_rss_entries
@@ -29,12 +29,12 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
29
  """
30
 
31
  # Clear test table
32
- logger.info("Clearing test table 'substack_test'")
33
- db_session.execute(text("DELETE FROM substack_test"))
34
  db_session.commit()
35
 
36
  # Verify table is empty
37
- initial_count = db_session.query(SubstackTestArticle).count()
38
  logger.info(f"Initial article count in test table: {initial_count}")
39
  assert initial_count == 0, "Test table was not cleared"
40
 
@@ -88,7 +88,7 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
88
  fetched_articles = fetch_rss_entries(
89
  test_feed,
90
  engine=db_engine,
91
- article_model=SubstackTestArticle,
92
  )
93
  logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")
94
 
@@ -99,14 +99,14 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
99
  ingest_from_rss(
100
  fetched_articles,
101
  feed=test_feed,
102
- article_model=SubstackTestArticle,
103
  engine=db_engine,
104
  )
105
 
106
  # Verify DB insertion
107
  articles_in_db = (
108
- db_session.query(SubstackTestArticle)
109
- .order_by(SubstackTestArticle.published_at.desc())
110
  .all()
111
  )
112
  logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
@@ -122,7 +122,7 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
122
 
123
  ################################################################################
124
  # The code below calls out to live URLs and is not suitable for CI,
125
- # as Substack may block requests from CI environments.
126
  # It is left here for reference and can be run manually if desired.
127
  # Uncomment to enable live integration test
128
 
@@ -132,7 +132,7 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
132
  # from sqlalchemy import text
133
  # from sqlalchemy.engine import Engine
134
  # from sqlalchemy.orm import Session
135
- # from test_models.test_sql_models import SubstackTestArticle # Test-specific table model
136
 
137
  # from src.models.article_models import FeedItem
138
  # from src.pipelines.tasks.batch_parse_ingest_articles import parse_and_ingest
@@ -153,12 +153,12 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
153
 
154
  # """
155
  # # Clear test table
156
- # logger.info("Clearing test table 'substack_test'")
157
- # db_session.execute(text("DELETE FROM substack_test"))
158
  # db_session.commit()
159
 
160
  # # Verify table is empty
161
- # initial_count = db_session.query(SubstackTestArticle).count()
162
  # logger.info(f"Initial article count in test table: {initial_count}")
163
  # assert initial_count == 0, "Test table was not cleared"
164
 
@@ -173,7 +173,7 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
173
  # fetched_articles = fetch_rss_entries(
174
  # test_feed,
175
  # engine=db_engine,
176
- # article_model=SubstackTestArticle,
177
  # )
178
  # logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")
179
 
@@ -185,14 +185,14 @@ def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine)
185
  # parse_and_ingest(
186
  # fetched_articles,
187
  # feed=test_feed,
188
- # article_model=SubstackTestArticle,
189
  # engine=db_engine,
190
  # )
191
 
192
  # # Verify DB insertion
193
  # articles_in_db = (
194
- # db_session.query(SubstackTestArticle)
195
- # .order_by(SubstackTestArticle.published_at.desc())
196
  # .all()
197
  # )
198
  # logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
 
4
  from sqlalchemy import text
5
  from sqlalchemy.engine import Engine
6
  from sqlalchemy.orm import Session
7
+ from test_models.test_sql_models import FeedTestArticle # Test-specific table model
8
 
9
  from src.models.article_models import FeedItem
10
  from src.pipelines.tasks.fetch_rss import fetch_rss_entries
 
29
  """
30
 
31
  # Clear test table
32
+ logger.info("Clearing test table 'feed_test'")
33
+ db_session.execute(text("DELETE FROM feed_test"))
34
  db_session.commit()
35
 
36
  # Verify table is empty
37
+ initial_count = db_session.query(FeedTestArticle).count()
38
  logger.info(f"Initial article count in test table: {initial_count}")
39
  assert initial_count == 0, "Test table was not cleared"
40
 
 
88
  fetched_articles = fetch_rss_entries(
89
  test_feed,
90
  engine=db_engine,
91
+ article_model=FeedTestArticle,
92
  )
93
  logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")
94
 
 
99
  ingest_from_rss(
100
  fetched_articles,
101
  feed=test_feed,
102
+ article_model=FeedTestArticle,
103
  engine=db_engine,
104
  )
105
 
106
  # Verify DB insertion
107
  articles_in_db = (
108
+ db_session.query(FeedTestArticle)
109
+ .order_by(FeedTestArticle.published_at.desc())
110
  .all()
111
  )
112
  logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
 
122
 
123
  ################################################################################
124
  # The code below calls out to live URLs and is not suitable for CI,
125
+ # as Substack/medium may block requests from CI environments.
126
  # It is left here for reference and can be run manually if desired.
127
  # Uncomment to enable live integration test
128
 
 
132
  # from sqlalchemy import text
133
  # from sqlalchemy.engine import Engine
134
  # from sqlalchemy.orm import Session
135
+ # from test_models.test_sql_models import FeedTestArticle # Test-specific table model
136
 
137
  # from src.models.article_models import FeedItem
138
  # from src.pipelines.tasks.batch_parse_ingest_articles import parse_and_ingest
 
153
 
154
  # """
155
  # # Clear test table
156
+ # logger.info("Clearing test table 'feed_test'")
157
+ # db_session.execute(text("DELETE FROM feed_test"))
158
  # db_session.commit()
159
 
160
  # # Verify table is empty
161
+ # initial_count = db_session.query(FeedTestArticle).count()
162
  # logger.info(f"Initial article count in test table: {initial_count}")
163
  # assert initial_count == 0, "Test table was not cleared"
164
 
 
173
  # fetched_articles = fetch_rss_entries(
174
  # test_feed,
175
  # engine=db_engine,
176
+ # article_model=FeedTestArticle,
177
  # )
178
  # logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")
179
 
 
185
  # parse_and_ingest(
186
  # fetched_articles,
187
  # feed=test_feed,
188
+ # article_model=FeedTestArticle,
189
  # engine=db_engine,
190
  # )
191
 
192
  # # Verify DB insertion
193
  # articles_in_db = (
194
+ # db_session.query(FeedTestArticle)
195
+ # .order_by(FeedTestArticle.published_at.desc())
196
  # .all()
197
  # )
198
  # logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
tests/test_models/test_sql_models.py CHANGED
@@ -10,8 +10,8 @@ class Base(DeclarativeBase):
10
  pass
11
 
12
 
13
- class SubstackTestArticle(Base):
14
- __tablename__ = "substack_test"
15
 
16
  # Primary internal ID
17
  id: Mapped[int] = mapped_column(BigInteger, primary_key=True, index=True)
 
10
  pass
11
 
12
 
13
+ class FeedTestArticle(Base):
14
+ __tablename__ = "feed_test"
15
 
16
  # Primary internal ID
17
  id: Mapped[int] = mapped_column(BigInteger, primary_key=True, index=True)
tests/unit/test_fetch_rss_entries.py CHANGED
@@ -3,7 +3,7 @@ import responses
3
  from loguru import logger
4
  from sqlalchemy import text
5
  from sqlalchemy.orm import Session
6
- from test_models.test_sql_models import SubstackTestArticle
7
 
8
  from src.infrastructure.supabase.init_session import init_engine
9
  from src.models.article_models import ArticleItem, FeedItem
@@ -49,8 +49,8 @@ def test_fetch_rss_mocked_feed() -> None:
49
  try:
50
  # Clear the test table before running
51
  session = Session(bind=engine)
52
- logger.info("Clearing test table 'substack_test' before test")
53
- session.execute(text("DELETE FROM substack_test"))
54
  session.commit()
55
  logger.info("Test table cleared")
56
 
@@ -58,7 +58,7 @@ def test_fetch_rss_mocked_feed() -> None:
58
  articles = fetch_rss_entries(
59
  feed=test_feed,
60
  engine=engine,
61
- article_model=SubstackTestArticle,
62
  )
63
  logger.info(f"Fetched {len(articles)} articles from {test_feed.url}")
64
 
 
3
  from loguru import logger
4
  from sqlalchemy import text
5
  from sqlalchemy.orm import Session
6
+ from test_models.test_sql_models import FeedTestArticle
7
 
8
  from src.infrastructure.supabase.init_session import init_engine
9
  from src.models.article_models import ArticleItem, FeedItem
 
49
  try:
50
  # Clear the test table before running
51
  session = Session(bind=engine)
52
+ logger.info("Clearing test table 'feed_test' before test")
53
+ session.execute(text("DELETE FROM feed_test"))
54
  session.commit()
55
  logger.info("Test table cleared")
56
 
 
58
  articles = fetch_rss_entries(
59
  feed=test_feed,
60
  engine=engine,
61
+ article_model=FeedTestArticle,
62
  )
63
  logger.info(f"Fetched {len(articles)} articles from {test_feed.url}")
64
 
uv.lock CHANGED
@@ -3746,7 +3746,7 @@ wheels = [
3746
  ]
3747
 
3748
  [[package]]
3749
- name = "substack-newsletters-search-course"
3750
  version = "1.0.0"
3751
  source = { editable = "." }
3752
  dependencies = [
 
3746
  ]
3747
 
3748
  [[package]]
3749
+ name = "AISearchEngine"
3750
  version = "1.0.0"
3751
  source = { editable = "." }
3752
  dependencies = [