Spaces:
Running
Running
| # FILE: web/backend/python/dna_analyzer.py (FINAL VERSION - Shows ID and Name) | |
| import sys | |
| import json | |
| import subprocess | |
| import os | |
| def get_species_details(blast_id): | |
| """ | |
| Looks up the ugly BLAST ID and returns a rich data structure | |
| containing the pretty name and other info. | |
| """ | |
| species_info_db = { | |
| "KY045437.1": { | |
| "species": "Salmo trutta", | |
| "commonName": "Brown Trout", | |
| "kingdom": "Animalia", | |
| "phylum": "Chordata", | |
| "ecologicalRole": "Top predator", | |
| "conservationStatus": "Least Concern", | |
| "indicators": ["Healthy fish population", "Good water quality"] | |
| }, | |
| "LC143821.1": { | |
| "species": "Escherichia coli", | |
| "commonName": "E. coli", | |
| "kingdom": "Bacteria", | |
| "phylum": "Proteobacteria", | |
| "ecologicalRole": "Decomposer", | |
| "conservationStatus": "Pathogen Indicator", | |
| "indicators": ["Fecal contamination", "Health risk"] | |
| } | |
| } | |
| for known_id, data in species_info_db.items(): | |
| if known_id in blast_id: | |
| # --- THIS IS THE CHANGE --- | |
| # We add the raw blast_id to the data we return. | |
| data['blastId'] = blast_id | |
| return data | |
| # If we don't find a match, we still return the ID. | |
| return {"species": blast_id, "commonName": "Unknown Species", "blastId": blast_id} | |
| def run_real_dna_analysis(file_path): | |
| try: | |
| script_dir = os.path.dirname(__file__) | |
| blast_db_dir = os.path.join(script_dir, 'blast_db') | |
| db_name = 'biostream_db' | |
| output_path = os.path.join(script_dir, '..', 'temp', f"{os.path.basename(file_path)}.csv") | |
| absolute_input_path = os.path.abspath(file_path) | |
| blast_command = [ | |
| 'blastn', '-query', absolute_input_path, '-db', db_name, | |
| '-out', output_path, '-outfmt', "10 sseqid pident", '-subject_besthit' | |
| ] | |
| process = subprocess.run( | |
| blast_command, check=True, capture_output=True, text=True, cwd=blast_db_dir | |
| ) | |
| species_hits = {} | |
| if not os.path.exists(output_path): | |
| return {"error": "BLAST did not produce an output file."} | |
| with open(output_path, 'r') as f: | |
| for line in f: | |
| parts = line.strip().split(',') | |
| if len(parts) < 2: continue | |
| species_id, identity = parts[0], float(parts[1]) | |
| if species_id not in species_hits: | |
| species_hits[species_id] = {'count': 0, 'total_identity': 0} | |
| species_hits[species_id]['count'] += 1 | |
| species_hits[species_id]['total_identity'] += identity | |
| if not species_hits: | |
| return { "detectedSpecies": [], "biodiversityMetrics": { "biodiversityScore": 0, "ecosystemHealth": "No Match Found" }} | |
| detected_species_list = [] | |
| for species_id, data in species_hits.items(): | |
| details = get_species_details(species_id) | |
| avg_identity = data['total_identity'] / data['count'] | |
| details['confidence'] = round(avg_identity, 2) | |
| details['abundance'] = "Medium" | |
| details['dnaFragments'] = data['count'] | |
| detected_species_list.append(details) | |
| os.remove(output_path) | |
| return { | |
| "detectedSpecies": detected_species_list, | |
| "biodiversityMetrics": { "speciesRichness": len(detected_species_list), "biodiversityScore": 85, "ecosystemHealth": "Good"}, | |
| "waterQualityAssessment": { "overallQuality": "Good", "recommendations": ["Analysis complete."] } | |
| } | |
| except subprocess.CalledProcessError as e: | |
| return {"error": f"BLAST analysis failed. Details: {e.stderr}"} | |
| except Exception as e: | |
| return {"error": f"An error occurred in Python: {str(e)}"} | |
| if __name__ == "__main__": | |
| try: | |
| dna_file_path = sys.argv[1] | |
| analysis_report = run_real_dna_analysis(dna_file_path) | |
| print(json.dumps(analysis_report)) | |
| except Exception as e: | |
| print(json.dumps({"error": str(e)}), file=sys.stderr) | |
| sys.exit(1) |