File size: 12,554 Bytes
354441c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317

import os
import json
import asyncio
from typing import Dict, List, Tuple
import logging
from pathlib import Path
from vector_db import SimpleVectorDB

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EnhancedRAGPipeline:
    def __init__(self, groq_client=None):
        self.groq_client = groq_client
        self.vector_db = None
        self.knowledge_base_file = "atlan_knowledge_base.json"
        self.vector_db_file = "atlan_vector_db.pkl"
        self.initialize_vector_db()
    
    def initialize_vector_db(self):
        self.vector_db = SimpleVectorDB()
        
        # Try to load existing database
        if not self.vector_db.load_database():
            logger.info("No existing vector database found. Checking for knowledge base...")
            
            # Try to load from knowledge base
            if Path(self.knowledge_base_file).exists():
                logger.info("Found knowledge base. Building vector database...")
                if self.vector_db.load_knowledge_base(self.knowledge_base_file):
                    self.vector_db.create_embeddings()
                    self.vector_db.save_database()
                    logger.info("Vector database built and saved")
                else:
                    logger.error("Failed to load knowledge base")
            else:
                logger.warning("No knowledge base found. RAG will use fallback responses.")
    
    def is_rag_available(self) -> bool:
        """Check if RAG system is properly initialized"""
        return self.vector_db is not None and len(self.vector_db.documents) > 0
    
    def should_use_rag(self, topic_tags: List[str]) -> bool:
        """Determine if RAG should be used based on topic tags"""
        rag_topics = ["How-to", "Product", "Best practices", "API/SDK", "SSO"]
        return any(tag in rag_topics for tag in topic_tags)
    
    def get_relevant_context(self, question: str, max_chars: int = 3000) -> Tuple[str, List[str]]:
        """Get relevant context from the vector database"""
        if not self.is_rag_available():
            return self._get_fallback_context(question), self._get_fallback_sources()
        
        try:
            context, sources = self.vector_db.get_context_for_query(question, max_chars)
            
            if not context:
                return self._get_fallback_context(question), self._get_fallback_sources()
            
            return context, sources
        
        except Exception as e:
            logger.error(f"Error retrieving context: {str(e)}")
            return self._get_fallback_context(question), self._get_fallback_sources()
    
    def _get_fallback_context(self, question: str) -> str:
        """Provide fallback context when vector DB is not available"""
        question_lower = question.lower()
        
        if "snowflake" in question_lower and "connect" in question_lower:
            return """
            To connect Snowflake to Atlan:
            1. You need the following Snowflake permissions: USAGE on warehouse, database, and schema; SELECT on tables; MONITOR on warehouse
            2. Create a service account with these permissions
            3. In Atlan, go to Admin > Connectors > Add Snowflake
            4. Provide connection details: account URL, username, password, warehouse, database
            5. Test the connection and run the crawler
            
            Common issues:
            - Authentication failures: Check username/password and network access
            - Permission errors: Ensure service account has required privileges
            - Network issues: Verify Snowflake account URL and firewall settings
            """
        
        elif "api" in question_lower or "sdk" in question_lower:
            return """
            Atlan provides comprehensive APIs for programmatic access:
            
            REST API endpoints:
            - Assets API: Create, read, update assets
            - Search API: Search across the catalog
            - Lineage API: Retrieve lineage information
            - Glossary API: Manage business terms
            
            Authentication: Use API tokens (available in your profile settings)
            Base URL: https://your-tenant.atlan.com/api/meta
            
            Python SDK: pip install pyatlan
            Java SDK: Available via Maven Central
            
            Common operations:
            - Create assets: POST /entity/bulk
            - Search assets: POST /search/indexsearch
            - Get lineage: GET /lineage/{guid}
            """
        
        elif "sso" in question_lower or "saml" in question_lower:
            return """
            Setting up SSO with Atlan:
            
            SAML 2.0 Configuration:
            1. In Atlan Admin > Settings > Authentication
            2. Enable SAML SSO
            3. Configure Identity Provider details:
               - SSO URL, Entity ID, Certificate
            4. Map SAML attributes to Atlan user fields
            5. Test with a pilot user before full deployment
            
            Supported Identity Providers:
            - Okta, Azure AD, Google Workspace
            - Generic SAML 2.0 providers
            
            Troubleshooting:
            - Attribute mapping issues: Check SAML response format
            - Group assignment: Verify group claims in SAML assertions
            - Certificate errors: Ensure valid and properly formatted certificates
            """
        
        elif "lineage" in question_lower:
            return """
            Data Lineage in Atlan:
            
            Automatic lineage capture:
            - dbt: Connects via dbt Cloud or Core metadata
            - SQL-based tools: Snowflake, BigQuery, Redshift, etc.
            - ETL tools: Airflow, Fivetran, Matillion
            
            Manual lineage:
            - Use the lineage editor in the UI
            - API endpoints for programmatic lineage creation
            
            Lineage export:
            - Currently available through API calls
            - UI export features in development
            
            Troubleshooting missing lineage:
            - Check connector configuration
            - Verify SQL parsing is enabled
            - Review crawler logs for errors
            """
        
        else:
            return """
            Atlan is a modern data catalog that helps organizations:
            - Discover and understand their data assets
            - Implement data governance at scale
            - Enable self-service analytics
            - Ensure data quality and compliance
            
            Key features:
            - Automated metadata discovery
            - Data lineage visualization
            - Business glossary management
            - Data quality monitoring
            - Collaborative data stewardship
            """
    
    def _get_fallback_sources(self) -> List[str]:
        """Provide fallback sources when vector DB is not available"""
        return [
            "https://docs.atlan.com/",
            "https://developer.atlan.com/",
            "https://docs.atlan.com/connectors/",
            "https://docs.atlan.com/guide/"
        ]
    
    async def generate_answer(self, question: str, topic_tags: List[str]) -> Dict:
        """Generate an answer using RAG pipeline"""
        
        if not self.should_use_rag(topic_tags):
            return {
                "type": "routing",
                "message": f"This ticket has been classified as a '{topic_tags[0] if topic_tags else 'General'}' issue and routed to the appropriate team."
            }
        
        # Get relevant context
        context, sources = self.get_relevant_context(question)
        
        if not self.groq_client:
            # Fallback response without LLM
            return {
                "type": "direct_answer",
                "answer": f"Based on the documentation, here's information about your question: {context[:500]}...",
                "sources": sources
            }
        
        # Generate response using LLM
        try:
            response = await self._generate_llm_response(question, context, sources)
            return response
        
        except Exception as e:
            logger.error(f"Error generating LLM response: {str(e)}")
            # Fallback to context-based response
            return {
                "type": "direct_answer", 
                "answer": f"Based on the available documentation: {context[:800]}",
                "sources": sources
            }
    
    async def _generate_llm_response(self, question: str, context: str, sources: List[str]) -> Dict:
        """Generate response using the LLM with retrieved context"""
        
        prompt = f"""
You are an expert Atlan support agent. Use the provided documentation context to answer the user's question comprehensively and accurately.

User Question: {question}

Documentation Context:
{context}

Instructions:
- Provide a direct, helpful, and detailed answer
- Use the context to inform your response
- Be specific about steps, requirements, and configurations when applicable
- If the question is about troubleshooting, include common solutions
- If the question is about setup/configuration, provide step-by-step guidance
- Maintain a professional and helpful tone
- Only use information from the provided context
- If the context doesn't fully answer the question, acknowledge the limitation

Format your response as a comprehensive answer that directly addresses the user's question.
"""
        
        try:
            response = self.groq_client.chat.completions.create(
                model="openai/gpt-oss-120b", 
                messages=[
                    {"role": "system", "content": "You are an expert Atlan support agent. Provide helpful, accurate responses based on the documentation context."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.2,
                max_tokens=1000
            )
            
            answer = response.choices[0].message.content.strip()
            
            return {
                "type": "direct_answer",
                "answer": answer,
                "sources": sources
            }
            
        except Exception as e:
            logger.error(f"LLM generation failed: {str(e)}")
            raise

def setup_rag_system():
    """Setup the RAG system - run scraper if needed"""
    print("🤖 Setting up Enhanced RAG System...")
    print("=" * 45)
    
    # Check if knowledge base exists
    kb_file = Path("atlan_knowledge_base.json")
    db_file = Path("atlan_vector_db.pkl")
    
    if not kb_file.exists():
        print("📚 Knowledge base not found. Please run the scraper first:")
        print("   python scraper.py")
        return False
    
    if not db_file.exists():
        print("🔧 Vector database not found. Building from knowledge base...")
        from vector_db import build_vector_database
        vector_db = build_vector_database()
        if not vector_db:
            print("❌ Failed to build vector database")
            return False
    
    print("✅ RAG system ready!")
    return True

async def test_rag_pipeline():
    """Test the RAG pipeline"""
    print("\n🧪 Testing Enhanced RAG Pipeline...")
    print("=" * 40)
    
    # Initialize without Groq client for testing
    rag = EnhancedRAGPipeline()
    
    test_questions = [
        ("How do I connect Snowflake to Atlan?", ["How-to", "Connector"]),
        ("Show me API documentation for creating assets", ["API/SDK"]),
        ("Our lineage is not showing up", ["Lineage", "Troubleshooting"]),
        ("How to configure SAML SSO?", ["SSO", "How-to"])
    ]
    
    for question, topics in test_questions:
        print(f"\nQuestion: {question}")
        print(f"Topics: {topics}")
        
        result = await rag.generate_answer(question, topics)
        
        print(f"Response Type: {result['type']}")
        if result['type'] == 'direct_answer':
            print(f"Answer Length: {len(result['answer'])} characters")
            print(f"Sources: {len(result['sources'])}")
            print(f"Answer Preview: {result['answer'][:200]}...")
        else:
            print(f"Routing: {result['message']}")

if __name__ == "__main__":
    if setup_rag_system():
        asyncio.run(test_rag_pipeline())
    else:
        print("❌ RAG system setup failed")