IsmatS commited on
Commit
bcf20be
·
1 Parent(s): c9b6ede
Files changed (1) hide show
  1. scripts/ingest_hackathon_data.py +57 -17
scripts/ingest_hackathon_data.py CHANGED
@@ -1,6 +1,6 @@
1
  """
2
  Ingest ONLY PDFs from hackathon_data folder
3
- Parallel processing with 4 workers
4
  """
5
 
6
  import os
@@ -8,33 +8,40 @@ import sys
8
  import time
9
  import json
10
  from pathlib import Path
11
- from concurrent.futures import ProcessPoolExecutor, as_completed
12
  from dotenv import load_dotenv
13
 
14
- # Add parent directory to path
15
- sys.path.insert(0, str(Path(__file__).parent))
16
-
17
- # Load environment
18
  load_dotenv()
19
 
20
- # Import from the main ingestion script
21
  PROJECT_ROOT = Path(__file__).parent.parent
22
  PDFS_DIR = PROJECT_ROOT / "data" / "hackathon_data" # Changed to hackathon_data
23
  OUTPUT_DIR = PROJECT_ROOT / "output" / "ingestion"
24
 
25
- # Import the ingestion function
26
- import ingest_pdfs
 
27
 
28
  def worker_ingest(pdf_path: str):
29
- """Worker function to ingest a single PDF"""
 
 
 
30
  try:
 
 
 
 
31
  result = ingest_pdfs.ingest_pdf(str(pdf_path))
32
  return result
33
  except Exception as e:
 
34
  return {
35
  "pdf_name": Path(pdf_path).name,
36
  "status": "error",
37
- "error": str(e)
 
38
  }
39
 
40
 
@@ -45,15 +52,38 @@ def main():
45
  print("="*70)
46
  print(f"📂 PDF Directory: {PDFS_DIR}")
47
  print(f"⚡ Workers: 4 PDFs at once")
48
- print(f"🎯 Vector Database: Pinecone ({os.getenv('PINECONE_INDEX_NAME')})")
49
  print("="*70)
50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  # Get all PDFs
52
  all_pdfs = sorted(PDFS_DIR.glob("*.pdf"))
53
  print(f"\n📚 Found {len(all_pdfs)} PDFs in hackathon_data folder")
54
 
55
  if not all_pdfs:
56
  print("\n❌ No PDFs found in hackathon_data folder!")
 
57
  return
58
 
59
  for pdf in all_pdfs:
@@ -62,12 +92,13 @@ def main():
62
  print(f"\n⚡ Starting parallel processing with 4 workers...")
63
  print(f"⏱️ Estimated time: ~{len(all_pdfs) * 80 / 4 / 60:.1f} minutes\n")
64
 
65
- # Process in parallel
 
66
  results = []
67
  completed = 0
68
  start_time = time.time()
69
 
70
- with ProcessPoolExecutor(max_workers=4) as executor:
71
  # Submit all jobs
72
  future_to_pdf = {
73
  executor.submit(worker_ingest, str(pdf)): pdf
@@ -148,10 +179,19 @@ def main():
148
  stats = index.describe_index_stats()
149
 
150
  print(f"\n📊 Final Pinecone Stats:")
151
- print(f" Total Vectors: {stats.get('total_vector_count', 0)}")
152
- print(f" Dimensions: {stats.get('dimension', 0)}")
 
 
 
 
 
 
 
 
153
  except Exception as e:
154
- print(f"\nCould not fetch Pinecone stats: {e}")
 
155
 
156
  print("\n" + "="*70)
157
  print("🎉 HACKATHON DATA INGESTION COMPLETE!")
 
1
  """
2
  Ingest ONLY PDFs from hackathon_data folder
3
+ Parallel processing with 4 workers using ThreadPoolExecutor (better for I/O-bound tasks)
4
  """
5
 
6
  import os
 
8
  import time
9
  import json
10
  from pathlib import Path
11
+ from concurrent.futures import ThreadPoolExecutor, as_completed
12
  from dotenv import load_dotenv
13
 
14
+ # Load environment first (before any imports that need env vars)
 
 
 
15
  load_dotenv()
16
 
17
+ # Project paths
18
  PROJECT_ROOT = Path(__file__).parent.parent
19
  PDFS_DIR = PROJECT_ROOT / "data" / "hackathon_data" # Changed to hackathon_data
20
  OUTPUT_DIR = PROJECT_ROOT / "output" / "ingestion"
21
 
22
+ # Add parent directory to path for imports
23
+ sys.path.insert(0, str(Path(__file__).parent))
24
+
25
 
26
  def worker_ingest(pdf_path: str):
27
+ """
28
+ Worker function to ingest a single PDF.
29
+ Uses lazy imports to avoid issues with multiprocessing/threading.
30
+ """
31
  try:
32
+ # Import here to avoid global state issues in parallel execution
33
+ import ingest_pdfs
34
+
35
+ # Call the ingestion function
36
  result = ingest_pdfs.ingest_pdf(str(pdf_path))
37
  return result
38
  except Exception as e:
39
+ import traceback
40
  return {
41
  "pdf_name": Path(pdf_path).name,
42
  "status": "error",
43
+ "error": str(e),
44
+ "traceback": traceback.format_exc()
45
  }
46
 
47
 
 
52
  print("="*70)
53
  print(f"📂 PDF Directory: {PDFS_DIR}")
54
  print(f"⚡ Workers: 4 PDFs at once")
55
+ print(f"🎯 Vector Database: Pinecone ({os.getenv('PINECONE_INDEX_NAME', 'hackathon')})")
56
  print("="*70)
57
 
58
+ # Validate required environment variables
59
+ required_env_vars = [
60
+ "AZURE_OPENAI_API_KEY",
61
+ "AZURE_OPENAI_ENDPOINT",
62
+ "PINECONE_API_KEY",
63
+ "PINECONE_INDEX_NAME"
64
+ ]
65
+
66
+ missing_vars = [var for var in required_env_vars if not os.getenv(var)]
67
+ if missing_vars:
68
+ print(f"\n❌ Missing required environment variables:")
69
+ for var in missing_vars:
70
+ print(f" - {var}")
71
+ print("\nPlease set these in your .env file.")
72
+ return
73
+
74
+ # Check if directory exists
75
+ if not PDFS_DIR.exists():
76
+ print(f"\n❌ Directory not found: {PDFS_DIR}")
77
+ print(f" Please create the directory and add PDFs to it.")
78
+ return
79
+
80
  # Get all PDFs
81
  all_pdfs = sorted(PDFS_DIR.glob("*.pdf"))
82
  print(f"\n📚 Found {len(all_pdfs)} PDFs in hackathon_data folder")
83
 
84
  if not all_pdfs:
85
  print("\n❌ No PDFs found in hackathon_data folder!")
86
+ print(f" Please add PDF files to: {PDFS_DIR}")
87
  return
88
 
89
  for pdf in all_pdfs:
 
92
  print(f"\n⚡ Starting parallel processing with 4 workers...")
93
  print(f"⏱️ Estimated time: ~{len(all_pdfs) * 80 / 4 / 60:.1f} minutes\n")
94
 
95
+ # Process in parallel using ThreadPoolExecutor
96
+ # (Better for I/O-bound tasks like API calls to Azure and Pinecone)
97
  results = []
98
  completed = 0
99
  start_time = time.time()
100
 
101
+ with ThreadPoolExecutor(max_workers=4) as executor:
102
  # Submit all jobs
103
  future_to_pdf = {
104
  executor.submit(worker_ingest, str(pdf)): pdf
 
179
  stats = index.describe_index_stats()
180
 
181
  print(f"\n📊 Final Pinecone Stats:")
182
+ # Handle both dict-like and object attribute access
183
+ total_vectors = getattr(stats, 'total_vector_count', None) or stats.get('total_vector_count', 0)
184
+ dimension = getattr(stats, 'dimension', None) or stats.get('dimension', 0)
185
+ print(f" Total Vectors: {total_vectors}")
186
+ print(f" Dimensions: {dimension}")
187
+
188
+ # Show namespaces if available
189
+ namespaces = getattr(stats, 'namespaces', None) or stats.get('namespaces', {})
190
+ if namespaces:
191
+ print(f" Namespaces: {len(namespaces)}")
192
  except Exception as e:
193
+ print(f"\n⚠️ Could not fetch Pinecone stats: {e}")
194
+ print(f" (This is non-fatal - ingestion was still successful)")
195
 
196
  print("\n" + "="*70)
197
  print("🎉 HACKATHON DATA INGESTION COMPLETE!")