File size: 8,184 Bytes
329b91e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""
ML Model Loader and Utilities
Handles loading and using the conflict prediction model and package embeddings.
"""

import json
import pickle
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import numpy as np
from packaging.requirements import Requirement


class ConflictPredictor:
    """Load and use the conflict prediction model."""
    
    def __init__(self, model_path: Optional[Path] = None):
        """Initialize the conflict predictor."""
        if model_path is None:
            model_path = Path(__file__).parent / "models" / "conflict_predictor.pkl"
        
        self.model = None
        self.model_path = model_path
        
        if model_path.exists():
            try:
                with open(model_path, 'rb') as f:
                    self.model = pickle.load(f)
                print(f"✅ Loaded conflict prediction model from {model_path}")
            except Exception as e:
                print(f"⚠️ Could not load conflict prediction model: {e}")
        else:
            print(f"⚠️ Conflict prediction model not found at {model_path}")
    
    def extract_features(self, requirements_text: str) -> np.ndarray:
        """Extract features from requirements text (same as training)."""
        features = []
        
        packages = {}
        lines = requirements_text.strip().split('\n')
        num_packages = 0
        has_pins = 0
        version_specificity = []
        
        for line in lines:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            
            try:
                req = Requirement(line)
                pkg_name = req.name.lower()
                specifier = str(req.specifier) if req.specifier else ''
                
                if pkg_name in packages:
                    features.append(1)  # has_duplicate flag
                else:
                    packages[pkg_name] = specifier
                    num_packages += 1
                    
                    if specifier:
                        has_pins += 1
                        if '==' in specifier:
                            version_specificity.append(3)
                        elif '>=' in specifier or '<=' in specifier:
                            version_specificity.append(2)
                        else:
                            version_specificity.append(1)
                    else:
                        version_specificity.append(0)
            except:
                pass
        
        feature_vec = []
        feature_vec.append(min(num_packages / 20.0, 1.0))
        feature_vec.append(has_pins / max(num_packages, 1))
        feature_vec.append(np.mean(version_specificity) / 3.0 if version_specificity else 0)
        feature_vec.append(1 if len(packages) < num_packages else 0)
        
        common_packages = [
            'torch', 'pytorch-lightning', 'tensorflow', 'keras', 'fastapi', 'pydantic',
            'numpy', 'pandas', 'scipy', 'scikit-learn', 'matplotlib', 'seaborn',
            'requests', 'httpx', 'sqlalchemy', 'alembic', 'uvicorn', 'starlette',
            'langchain', 'openai', 'chromadb', 'redis', 'celery', 'gunicorn',
            'pillow', 'opencv-python', 'beautifulsoup4', 'scrapy', 'plotly', 'jax'
        ]
        
        for pkg in common_packages:
            feature_vec.append(1 if pkg in packages else 0)
        
        has_torch = 'torch' in packages
        has_pl = 'pytorch-lightning' in packages
        has_tf = 'tensorflow' in packages
        has_keras = 'keras' in packages
        has_fastapi = 'fastapi' in packages
        has_pydantic = 'pydantic' in packages
        
        feature_vec.append(1 if (has_torch and has_pl) else 0)
        feature_vec.append(1 if (has_tf and has_keras) else 0)
        feature_vec.append(1 if (has_fastapi and has_pydantic) else 0)
        
        return np.array(feature_vec)
    
    def predict(self, requirements_text: str) -> Tuple[bool, float]:
        """
        Predict if requirements have conflicts.
        
        Returns:
            (has_conflict, confidence_score)
        """
        if self.model is None:
            return False, 0.0
        
        try:
            features = self.extract_features(requirements_text)
            features = features.reshape(1, -1)
            
            prediction = self.model.predict(features)[0]
            probability = self.model.predict_proba(features)[0]
            
            has_conflict = bool(prediction)
            confidence = float(probability[1] if has_conflict else probability[0])
            
            return has_conflict, confidence
        except Exception as e:
            print(f"Error in conflict prediction: {e}")
            return False, 0.0


class PackageEmbeddings:
    """Load and use package embeddings for similarity matching."""
    
    def __init__(self, embeddings_path: Optional[Path] = None):
        """Initialize package embeddings."""
        if embeddings_path is None:
            embeddings_path = Path(__file__).parent / "models" / "package_embeddings.json"
        
        self.embeddings = {}
        self.embeddings_path = embeddings_path
        self.model = None
        
        if embeddings_path.exists():
            try:
                with open(embeddings_path, 'r') as f:
                    self.embeddings = json.load(f)
                print(f"✅ Loaded {len(self.embeddings)} package embeddings from {embeddings_path}")
            except Exception as e:
                print(f"⚠️ Could not load embeddings: {e}")
        else:
            print(f"⚠️ Embeddings not found at {embeddings_path}")
    
    def _load_model(self):
        """Lazy load the sentence transformer model."""
        if self.model is None:
            try:
                from sentence_transformers import SentenceTransformer
                self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
            except ImportError:
                print("⚠️ sentence-transformers not available, embedding similarity disabled")
                return None
        return self.model
    
    def get_embedding(self, package_name: str) -> Optional[np.ndarray]:
        """Get embedding for a package (from cache or compute on-the-fly)."""
        package_lower = package_name.lower()
        
        # Check cache first
        if package_lower in self.embeddings:
            return np.array(self.embeddings[package_lower])
        
        # Compute on-the-fly if model available
        model = self._load_model()
        if model is not None:
            embedding = model.encode([package_name])[0]
            # Cache it
            self.embeddings[package_lower] = embedding.tolist()
            return embedding
        
        return None
    
    def find_similar(self, package_name: str, top_k: int = 5, threshold: float = 0.6) -> List[Tuple[str, float]]:
        """
        Find similar packages using cosine similarity.
        
        Returns:
            List of (package_name, similarity_score) tuples
        """
        query_emb = self.get_embedding(package_name)
        if query_emb is None:
            return []
        
        similarities = []
        
        for pkg, emb in self.embeddings.items():
            if pkg == package_name.lower():
                continue
            
            emb_array = np.array(emb)
            # Cosine similarity
            similarity = np.dot(query_emb, emb_array) / (
                np.linalg.norm(query_emb) * np.linalg.norm(emb_array)
            )
            
            if similarity >= threshold:
                similarities.append((pkg, float(similarity)))
        
        # Sort by similarity and return top_k
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    def get_best_match(self, package_name: str, threshold: float = 0.7) -> Optional[str]:
        """Get the best matching package name."""
        similar = self.find_similar(package_name, top_k=1, threshold=threshold)
        if similar:
            return similar[0][0]
        return None