|
|
""" |
|
|
Python Dependency Compatibility Board |
|
|
A tool to parse, analyze, and resolve Python package dependencies. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
import tempfile |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Tuple, Optional, Set |
|
|
from packaging.requirements import Requirement |
|
|
from packaging.specifiers import SpecifierSet |
|
|
from packaging.version import Version |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
class DependencyParser: |
|
|
"""Parse requirements.txt and library lists into structured dependencies.""" |
|
|
|
|
|
@staticmethod |
|
|
def parse_requirements_text(text: str) -> List[Dict]: |
|
|
"""Parse requirements.txt content into structured format.""" |
|
|
dependencies = [] |
|
|
seen_packages = {} |
|
|
|
|
|
for line in text.strip().split('\n'): |
|
|
line = line.strip() |
|
|
if not line or line.startswith('#'): |
|
|
continue |
|
|
|
|
|
|
|
|
if '#' in line: |
|
|
line = line[:line.index('#')].strip() |
|
|
|
|
|
try: |
|
|
req = Requirement(line) |
|
|
package_name = req.name.lower() |
|
|
|
|
|
|
|
|
if package_name in seen_packages: |
|
|
|
|
|
existing = seen_packages[package_name] |
|
|
if existing['specifier'] != str(req.specifier): |
|
|
dependencies.append({ |
|
|
'package': package_name, |
|
|
'specifier': str(req.specifier) if req.specifier else '', |
|
|
'extras': list(req.extras) if req.extras else [], |
|
|
'marker': str(req.marker) if req.marker else '', |
|
|
'original': line, |
|
|
'conflict': f"Duplicate: {existing['original']} vs {line}" |
|
|
}) |
|
|
continue |
|
|
|
|
|
dep = { |
|
|
'package': package_name, |
|
|
'specifier': str(req.specifier) if req.specifier else '', |
|
|
'extras': list(req.extras) if req.extras else [], |
|
|
'marker': str(req.marker) if req.marker else '', |
|
|
'original': line, |
|
|
'conflict': None |
|
|
} |
|
|
dependencies.append(dep) |
|
|
seen_packages[package_name] = dep |
|
|
except Exception as e: |
|
|
|
|
|
dependencies.append({ |
|
|
'package': line.split('==')[0].split('>=')[0].split('<=')[0].split('[')[0].strip(), |
|
|
'specifier': '', |
|
|
'extras': [], |
|
|
'marker': '', |
|
|
'original': line, |
|
|
'conflict': f"Parse error: {str(e)}" |
|
|
}) |
|
|
|
|
|
return dependencies |
|
|
|
|
|
@staticmethod |
|
|
def parse_library_list(text: str) -> List[Dict]: |
|
|
"""Parse a simple list of library names.""" |
|
|
dependencies = [] |
|
|
for line in text.strip().split('\n'): |
|
|
line = line.strip() |
|
|
if not line or line.startswith('#'): |
|
|
continue |
|
|
|
|
|
|
|
|
package_name = re.split(r'[<>=!]', line)[0].strip() |
|
|
package_name = re.split(r'\[', package_name)[0].strip() |
|
|
|
|
|
if package_name: |
|
|
dependencies.append({ |
|
|
'package': package_name.lower(), |
|
|
'specifier': '', |
|
|
'extras': [], |
|
|
'marker': '', |
|
|
'original': package_name, |
|
|
'conflict': None |
|
|
}) |
|
|
|
|
|
return dependencies |
|
|
|
|
|
|
|
|
class DependencyResolver: |
|
|
"""Resolve dependencies and check compatibility.""" |
|
|
|
|
|
def __init__(self, python_version: str = "3.10", platform: str = "any", device: str = "cpu"): |
|
|
self.python_version = python_version |
|
|
self.platform = platform |
|
|
self.device = device |
|
|
|
|
|
def build_dependency_graph(self, dependencies: List[Dict], deep_mode: bool = False) -> Dict: |
|
|
"""Build dependency graph (simplified - in production would query PyPI).""" |
|
|
graph = { |
|
|
'nodes': {}, |
|
|
'edges': [], |
|
|
'conflicts': [] |
|
|
} |
|
|
|
|
|
for dep in dependencies: |
|
|
package = dep['package'] |
|
|
graph['nodes'][package] = { |
|
|
'specifier': dep['specifier'], |
|
|
'extras': dep['extras'], |
|
|
'marker': dep['marker'], |
|
|
'conflict': dep.get('conflict') |
|
|
} |
|
|
|
|
|
if dep.get('conflict'): |
|
|
graph['conflicts'].append({ |
|
|
'package': package, |
|
|
'reason': dep['conflict'] |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return graph |
|
|
|
|
|
def check_compatibility(self, graph: Dict) -> Tuple[bool, List[str]]: |
|
|
"""Check version compatibility across the graph.""" |
|
|
issues = [] |
|
|
|
|
|
|
|
|
for conflict in graph['conflicts']: |
|
|
issues.append(f"Conflict in {conflict['package']}: {conflict['reason']}") |
|
|
|
|
|
|
|
|
nodes = graph['nodes'] |
|
|
|
|
|
|
|
|
if 'pytorch-lightning' in nodes and 'torch' in nodes: |
|
|
pl_spec = nodes['pytorch-lightning']['specifier'] |
|
|
torch_spec = nodes['torch']['specifier'] |
|
|
|
|
|
|
|
|
if '==2.' in pl_spec or '>=2.' in pl_spec: |
|
|
if '==1.' in torch_spec or ('<2.' in torch_spec and '==1.' in torch_spec): |
|
|
issues.append("pytorch-lightning>=2.0 requires torch>=2.0, but torch<2.0 is specified") |
|
|
|
|
|
|
|
|
if 'fastapi' in nodes and 'pydantic' in nodes: |
|
|
fastapi_spec = nodes['fastapi']['specifier'] |
|
|
pydantic_spec = nodes['pydantic']['specifier'] |
|
|
|
|
|
if '==0.78' in fastapi_spec or '==0.7' in fastapi_spec: |
|
|
if '==2.' in pydantic_spec or '>=2.' in pydantic_spec: |
|
|
issues.append("fastapi==0.78.x requires pydantic v1, but pydantic v2 is specified") |
|
|
|
|
|
|
|
|
if 'tensorflow' in nodes and 'keras' in nodes: |
|
|
tf_spec = nodes['tensorflow']['specifier'] |
|
|
keras_spec = nodes['keras']['specifier'] |
|
|
|
|
|
if '==1.' in tf_spec: |
|
|
if '==3.' in keras_spec or '>=3.' in keras_spec: |
|
|
issues.append("keras>=3.0 requires TensorFlow 2.x, but TensorFlow 1.x is specified") |
|
|
|
|
|
return len(issues) == 0, issues |
|
|
|
|
|
def resolve_dependencies( |
|
|
self, |
|
|
dependencies: List[Dict], |
|
|
strategy: str = "latest_compatible" |
|
|
) -> Tuple[str, List[str]]: |
|
|
"""Resolve dependencies using specified strategy.""" |
|
|
|
|
|
seen_packages = {} |
|
|
clean_dependencies = [] |
|
|
|
|
|
for dep in dependencies: |
|
|
if dep.get('conflict'): |
|
|
continue |
|
|
|
|
|
package = dep['package'] |
|
|
if package in seen_packages: |
|
|
|
|
|
existing = seen_packages[package] |
|
|
if dep['specifier'] and not existing['specifier']: |
|
|
clean_dependencies.remove(existing) |
|
|
clean_dependencies.append(dep) |
|
|
seen_packages[package] = dep |
|
|
continue |
|
|
|
|
|
clean_dependencies.append(dep) |
|
|
seen_packages[package] = dep |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: |
|
|
req_lines = [] |
|
|
for dep in clean_dependencies: |
|
|
req_lines.append(dep['original']) |
|
|
f.write('\n'.join(req_lines)) |
|
|
temp_req_file = f.name |
|
|
|
|
|
warnings = [] |
|
|
|
|
|
try: |
|
|
|
|
|
result = subprocess.run( |
|
|
['pip', 'install', '--dry-run', '--report', '-', '-r', temp_req_file], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
if result.returncode == 0 and result.stdout.strip(): |
|
|
|
|
|
try: |
|
|
report = json.loads(result.stdout) |
|
|
resolved = [] |
|
|
for package in report.get('install', []): |
|
|
name = package.get('metadata', {}).get('name', '') |
|
|
version = package.get('metadata', {}).get('version', '') |
|
|
if name and version: |
|
|
resolved.append(f"{name}=={version}") |
|
|
|
|
|
if resolved: |
|
|
return '\n'.join(sorted(resolved)), warnings |
|
|
except json.JSONDecodeError: |
|
|
warnings.append("Could not parse pip resolution report. Using original requirements.") |
|
|
except Exception as e: |
|
|
warnings.append(f"Error parsing resolution: {str(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
result = subprocess.run( |
|
|
['pip-compile', '--dry-run', '--output-file', '-', temp_req_file], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=60 |
|
|
) |
|
|
if result.returncode == 0: |
|
|
return result.stdout.strip(), warnings |
|
|
except FileNotFoundError: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
resolved_lines = [] |
|
|
for dep in clean_dependencies: |
|
|
line = dep['original'] |
|
|
|
|
|
if strategy == "stable/pinned" and not dep['specifier']: |
|
|
|
|
|
line = f"{dep['package']} # Version not specified" |
|
|
elif strategy == "keep_existing_pins": |
|
|
|
|
|
pass |
|
|
resolved_lines.append(line) |
|
|
|
|
|
if not warnings: |
|
|
warnings.append("Using original requirements. For full resolution, ensure pip>=22.2 is installed.") |
|
|
|
|
|
return '\n'.join(resolved_lines), warnings |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
warnings.append("Resolution timed out. Showing original requirements.") |
|
|
return '\n'.join([d['original'] for d in clean_dependencies]), warnings |
|
|
except Exception as e: |
|
|
warnings.append(f"Resolution error: {str(e)}") |
|
|
return '\n'.join([d['original'] for d in clean_dependencies]), warnings |
|
|
finally: |
|
|
Path(temp_req_file).unlink(missing_ok=True) |
|
|
|
|
|
|
|
|
def process_dependencies( |
|
|
library_list: str, |
|
|
requirements_text: str, |
|
|
uploaded_file, |
|
|
python_version: str, |
|
|
device: str, |
|
|
os_type: str, |
|
|
mode: str, |
|
|
resolution_strategy: str |
|
|
) -> Tuple[str, str]: |
|
|
"""Main processing function for Gradio interface.""" |
|
|
|
|
|
|
|
|
all_dependencies = [] |
|
|
|
|
|
|
|
|
if library_list: |
|
|
parser = DependencyParser() |
|
|
deps = parser.parse_library_list(library_list) |
|
|
all_dependencies.extend(deps) |
|
|
|
|
|
|
|
|
if requirements_text: |
|
|
parser = DependencyParser() |
|
|
deps = parser.parse_requirements_text(requirements_text) |
|
|
all_dependencies.extend(deps) |
|
|
|
|
|
|
|
|
if uploaded_file: |
|
|
try: |
|
|
with open(uploaded_file, 'r') as f: |
|
|
content = f.read() |
|
|
parser = DependencyParser() |
|
|
deps = parser.parse_requirements_text(content) |
|
|
all_dependencies.extend(deps) |
|
|
except Exception as e: |
|
|
return f"Error reading file: {str(e)}", "" |
|
|
|
|
|
if not all_dependencies: |
|
|
return "Please provide at least one input: library list, requirements text, or uploaded file.", "" |
|
|
|
|
|
|
|
|
resolver = DependencyResolver(python_version=python_version, platform=os_type, device=device) |
|
|
deep_mode = (mode == "Deep (with transitive dependencies)") |
|
|
graph = resolver.build_dependency_graph(all_dependencies, deep_mode=deep_mode) |
|
|
|
|
|
|
|
|
is_compatible, issues = resolver.check_compatibility(graph) |
|
|
|
|
|
|
|
|
resolved_text, warnings = resolver.resolve_dependencies(all_dependencies, resolution_strategy) |
|
|
|
|
|
|
|
|
output_parts = [] |
|
|
output_parts.append("## Dependency Analysis Results\n\n") |
|
|
|
|
|
if issues: |
|
|
output_parts.append("### β οΈ Compatibility Issues Found:\n") |
|
|
for issue in issues: |
|
|
output_parts.append(f"- {issue}\n") |
|
|
output_parts.append("\n") |
|
|
|
|
|
if warnings: |
|
|
output_parts.append("### βΉοΈ Warnings:\n") |
|
|
for warning in warnings: |
|
|
output_parts.append(f"- {warning}\n") |
|
|
output_parts.append("\n") |
|
|
|
|
|
if is_compatible and not issues: |
|
|
output_parts.append("### β
No compatibility issues detected!\n\n") |
|
|
|
|
|
output_parts.append(f"### π¦ Resolved Requirements ({len(all_dependencies)} packages):\n") |
|
|
output_parts.append("```\n") |
|
|
output_parts.append(resolved_text) |
|
|
output_parts.append("\n```\n") |
|
|
|
|
|
return ''.join(output_parts), resolved_text |
|
|
|
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
"""Create and return the Gradio interface.""" |
|
|
|
|
|
with gr.Blocks(title="Python Dependency Compatibility Board", theme=gr.themes.Soft()) as app: |
|
|
gr.Markdown(""" |
|
|
# π Python Dependency Compatibility Board |
|
|
|
|
|
Analyze and resolve Python package dependencies. Input your requirements in multiple ways: |
|
|
- List library names (one per line) |
|
|
- Paste requirements.txt content |
|
|
- Upload a requirements.txt file |
|
|
|
|
|
The tool will check for compatibility issues and generate a resolved requirements.txt file. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Input Methods") |
|
|
|
|
|
library_input = gr.Textbox( |
|
|
label="Library Names (one per line)", |
|
|
placeholder="pandas\ntorch\nlangchain\nfastapi", |
|
|
lines=5, |
|
|
info="Enter package names, one per line" |
|
|
) |
|
|
|
|
|
requirements_input = gr.Textbox( |
|
|
label="Requirements.txt Content", |
|
|
placeholder="pandas==2.0.3\ntorch>=2.0.0\nlangchain==0.1.0", |
|
|
lines=10, |
|
|
info="Paste your requirements.txt content here" |
|
|
) |
|
|
|
|
|
file_upload = gr.File( |
|
|
label="Upload requirements.txt", |
|
|
file_types=[".txt"] |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Environment Settings") |
|
|
|
|
|
python_version = gr.Dropdown( |
|
|
choices=["3.8", "3.9", "3.10", "3.11", "3.12"], |
|
|
value="3.10", |
|
|
label="Python Version", |
|
|
info="Target Python version" |
|
|
) |
|
|
|
|
|
device = gr.Dropdown( |
|
|
choices=["CPU only", "NVIDIA GPU (CUDA)", "Apple Silicon (MPS)", "Custom / other"], |
|
|
value="CPU only", |
|
|
label="Device", |
|
|
info="Target device/platform" |
|
|
) |
|
|
|
|
|
os_type = gr.Dropdown( |
|
|
choices=["Any / generic", "Linux (x86_64)", "Windows (x86_64)", "MacOS (Intel)", "MacOS (Apple Silicon)"], |
|
|
value="Any / generic", |
|
|
label="Operating System", |
|
|
info="Target operating system" |
|
|
) |
|
|
|
|
|
mode = gr.Radio( |
|
|
choices=["Quick (top-level only)", "Deep (with transitive dependencies)"], |
|
|
value="Quick (top-level only)", |
|
|
label="Analysis Mode", |
|
|
info="Quick mode is faster, Deep mode includes all dependencies" |
|
|
) |
|
|
|
|
|
resolution_strategy = gr.Dropdown( |
|
|
choices=["latest_compatible", "stable/pinned", "keep_existing_pins", "minimal_changes"], |
|
|
value="latest_compatible", |
|
|
label="Resolution Strategy", |
|
|
info="How to resolve version conflicts" |
|
|
) |
|
|
|
|
|
process_btn = gr.Button("Analyze & Resolve Dependencies", variant="primary", size="lg") |
|
|
|
|
|
with gr.Row(): |
|
|
output_display = gr.Markdown( |
|
|
label="Analysis Results", |
|
|
value="Results will appear here after processing..." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
resolved_output = gr.Textbox( |
|
|
label="Resolved requirements.txt", |
|
|
lines=15, |
|
|
info="Copy this content to use as your requirements.txt file" |
|
|
) |
|
|
|
|
|
download_btn = gr.File( |
|
|
label="Download requirements.txt", |
|
|
value=None, |
|
|
visible=True |
|
|
) |
|
|
|
|
|
def process_and_download(*args): |
|
|
result_text, resolved_text = process_dependencies(*args) |
|
|
|
|
|
|
|
|
temp_file = None |
|
|
if resolved_text and resolved_text.strip(): |
|
|
try: |
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: |
|
|
f.write(resolved_text) |
|
|
temp_file = f.name |
|
|
except Exception as e: |
|
|
print(f"Error creating download file: {e}") |
|
|
|
|
|
return result_text, resolved_text, temp_file if temp_file else None |
|
|
|
|
|
process_btn.click( |
|
|
fn=process_and_download, |
|
|
inputs=[library_input, requirements_input, file_upload, python_version, device, os_type, mode, resolution_strategy], |
|
|
outputs=[output_display, resolved_output, download_btn] |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
### How to Use |
|
|
|
|
|
1. **Input your dependencies** using any of the three methods (or combine them) |
|
|
2. **Configure your environment** (Python version, device, OS) |
|
|
3. **Choose analysis mode**: Quick for fast results, Deep for complete dependency tree |
|
|
4. **Select resolution strategy**: How to handle version conflicts |
|
|
5. **Click "Analyze & Resolve Dependencies"** |
|
|
6. **Review the results** and download the resolved requirements.txt |
|
|
|
|
|
### Features |
|
|
|
|
|
- β
Parse multiple input formats |
|
|
- β
Detect version conflicts |
|
|
- β
Check compatibility across dependency graph |
|
|
- β
Resolve dependencies using pip |
|
|
- β
Generate clean, pip-compatible requirements.txt |
|
|
- β
Environment-aware (Python version, platform, device) |
|
|
""") |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app = create_interface() |
|
|
|
|
|
|
|
|
app.launch() |
|
|
|
|
|
|