Spaces:
Paused
Paused
| ##Sets up the flask server for viewing locally at {ip_address}:3001 | |
| #/* DARNA.HI | |
| # * Copyright (c) 2023 Seapoe1809 <https://github.com/seapoe1809> | |
| # * Copyright (c) 2023 pnmeka <https://github.com/pnmeka> | |
| # * | |
| # * | |
| # * This program is free software: you can redistribute it and/or modify | |
| # * it under the terms of the GNU General Public License as published by | |
| # * the Free Software Foundation, either version 3 of the License, or | |
| # * (at your option) any later version. | |
| # * | |
| # * This program is distributed in the hope that it will be useful, | |
| # * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # * GNU General Public License for more details. | |
| # * | |
| # * You should have received a copy of the GNU General Public License | |
| # * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| # */ | |
| from flask import Flask, render_template, send_file, send_from_directory, session, request, redirect, jsonify, url_for, Response, flash | |
| from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user | |
| #from flask_bcrypt import Bcrypt | |
| #from flask_sqlalchemy import SQLAlchemy | |
| # Standard library | |
| import os | |
| import sqlite3 | |
| import json | |
| import subprocess | |
| from subprocess import run, CalledProcessError | |
| import getpass | |
| import webbrowser | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from functools import wraps | |
| from urllib.parse import quote, unquote | |
| import io | |
| # Third-party packages | |
| import requests | |
| import qrcode | |
| import pyzipper | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from pdf2image import convert_from_path | |
| from cryptography.fernet import Fernet | |
| # DICOM handling | |
| import pydicom | |
| from pydicom.pixel_data_handlers.util import apply_voi_lut | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import variables.variables as variables | |
| ##UPDATE ZIP PASSWORD HERE | |
| create_zip_password = "2023" | |
| app = Flask(__name__) | |
| HS_path= os.getcwd() | |
| """ | |
| #app.config.update( | |
| #SESSION_COOKIE_SECURE=True, | |
| #SESSION_COOKIE_SAMESITE='None', | |
| #) | |
| # Initialize Flask extensions and Flask_login | |
| bcrypt = Bcrypt(app) | |
| login_manager = LoginManager() | |
| login_manager.init_app(app) | |
| login_manager.login_view = 'login' | |
| #generates a app.secret_key that is variable and encrypted | |
| key = Fernet.generate_key() | |
| cipher_suite = Fernet(key) | |
| app.secret_key = cipher_suite.encrypt(os.getcwd().encode()) | |
| # Configure the SQLAlchemy part to use SQLite database | |
| from flask_sqlalchemy import SQLAlchemy | |
| # Replace 'your_password' and 'your_database_name' with actual values | |
| app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://seapoe1809:darnausers@seapoe1809.mysql.pythonanywhere-services.com/seapoe1809$users' | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # To suppress a warning | |
| #app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' | |
| db = SQLAlchemy(app) | |
| # Define User model, access | |
| class User(db.Model, UserMixin): | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(80), unique=True) | |
| password = db.Column(db.String(120)) | |
| def get_paths(self): | |
| base_path = os.getcwd() | |
| if self.username == 'ADMIN': | |
| folder_name = 'Health_files' | |
| elif self.username == 'USER1': | |
| folder_name = 'Health_files2' | |
| else: | |
| folder_name = f'Health_files_{self.username}' # fallback | |
| folderpath = os.path.join(base_path, folder_name) | |
| APP_dir = os.path.join(base_path, 'install_module') | |
| ocr_files = os.path.join(folderpath, 'ocr_files') | |
| upload_dir = os.path.join(folderpath, 'upload') | |
| summary_dir = os.path.join(folderpath, 'summary') | |
| return { | |
| 'HS_path': base_path, | |
| 'folderpath': folderpath, | |
| 'APP_dir': APP_dir, | |
| 'ocr_files': ocr_files, | |
| 'upload_dir': upload_dir, | |
| 'summary_dir': summary_dir | |
| } | |
| @login_manager.user_loader | |
| def load_user(user_id): | |
| return db.session.get(User, int(user_id)) | |
| """ | |
| #importing variables from variables.py | |
| # Label current dir and parent dir | |
| HS_path = os.getcwd() | |
| ip_address = variables.ip_address | |
| APP_dir = f"{HS_path}/install_module" | |
| # Configure static folder path | |
| app.static_folder = 'static' | |
| ########### TEST | |
| app.secret_key = 'your_secret_key' | |
| # Simulating users in memory for testing | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| users = { | |
| 'ADMIN': generate_password_hash('health'), | |
| 'USER1': generate_password_hash('wellness') | |
| } | |
| # Initialize Flask extensions and Flask-Login | |
| login_manager = LoginManager() | |
| login_manager.init_app(app) | |
| login_manager.login_view = 'login' | |
| # Simulating users in memory for testing | |
| users = { | |
| 'ADMIN': generate_password_hash('health'), | |
| 'USER1': generate_password_hash('wellness') | |
| } | |
| # User class for Flask-Login | |
| class User(UserMixin): | |
| def __init__(self, username): | |
| self.id = username # Use the username as the user ID | |
| def get_paths(self): | |
| base_path= os.getcwd() | |
| HS_path= os.getcwd() | |
| # Assign folder name based on the username | |
| if self.id == 'ADMIN': # Use self.id since it's assigned in the constructor | |
| folder_name = 'Health_files' | |
| elif self.id == 'USER1': | |
| folder_name = 'Health_files2' | |
| else: | |
| folder_name = f'Health_files_{self.id}' # fallback for other usernames | |
| # Define paths | |
| folderpath = os.path.join(base_path, folder_name) | |
| APP_dir = os.path.join(base_path, 'install_module') | |
| ocr_files = os.path.join(folderpath, 'ocr_files') | |
| upload_dir = os.path.join(folderpath, 'upload') | |
| summary_dir = os.path.join(folderpath, 'summary') | |
| # Return paths as a dictionary | |
| return { | |
| 'HS_path': base_path, | |
| 'folderpath': folderpath, | |
| 'APP_dir': APP_dir, | |
| 'ocr_files': ocr_files, | |
| 'upload_dir': upload_dir, | |
| 'summary_dir': summary_dir | |
| } | |
| def load_user(username): | |
| if username in users: | |
| return User(username) # Return User object if username exists | |
| return None | |
| def login(): | |
| error_message = None | |
| if request.method == 'POST': | |
| username = request.form['username'] | |
| password = request.form['password'] | |
| # Check if user exists and password is correct | |
| if username in users and check_password_hash(users[username], password): | |
| user = User(username) # Create a user object | |
| login_user(user) # Log the user in | |
| # Assign folder paths based on the user | |
| if username == "ADMIN": | |
| session['folderpath'] = f"{os.getcwd()}/Health_files" | |
| elif username == "USER1": | |
| session['folderpath'] = f"{os.getcwd()}/Health_files2" | |
| return redirect(url_for('protected')) | |
| else: | |
| error_message = "Password ⚠️" | |
| return render_template('login.html', error_message=error_message) | |
| """ | |
| #@app.before_first_request | |
| def create_users(): | |
| db.create_all() | |
| #admin = User(username='ADMIN', password='health') | |
| admin = User(username='ADMIN', password=bcrypt.generate_password_hash('health').decode('utf-8')) | |
| #admin = User(username='USER1', password='wellness') | |
| user1 = User(username='USER1', password=bcrypt.generate_password_hash('wellness').decode('utf-8')) | |
| db.session.add(admin) | |
| db.session.add(user1) | |
| db.session.commit() | |
| @app.route('/login', methods=['GET', 'POST']) | |
| def login(): | |
| error_message = None | |
| if request.method == 'POST': | |
| username = request.form['username'] | |
| password = request.form['password'] | |
| user = User.query.filter_by(username=username).first() | |
| if user and bcrypt.check_password_hash(user.password, password): | |
| login_user(user) | |
| if username == "ADMIN": | |
| session['folderpath'] = f"{os.getcwd()}/Health_files" | |
| elif username == "USER1": | |
| session['folderpath'] = f"{os.getcwd()}/Health_files2" | |
| return redirect(url_for('protected')) | |
| else: | |
| error_message = "Password ⚠️" | |
| return render_template('login.html', error_message=error_message) | |
| """ | |
| def protected(): | |
| return redirect(url_for('home')) | |
| def unauthorized(): | |
| return render_template('login.html') | |
| def logout(): | |
| logout_user() | |
| return redirect('/login') | |
| def shutdown(): | |
| messages = [] | |
| # Attempt to gracefully shut down the mainapp.py process | |
| try: | |
| subprocess.run(["pkill", "-f", "python.*darnabot.py"], check=True) | |
| messages.append("mainapp.py shut down successfully.") | |
| subprocess.run(["pkill", "-f", "python.*darna.py"], check=True) | |
| messages.append("app2.py shut down successfully.") | |
| except subprocess.CalledProcessError as e: | |
| messages.append(f"Failed to shut down processes: {e}") | |
| # List of ports to shut down processes on | |
| ports = [3001, 3012] | |
| for port in ports: | |
| try: | |
| pid = subprocess.check_output(["lsof", "-t", "-i:{}".format(port)]).decode().strip() | |
| if pid: | |
| # Splitting PIDs in case multiple PIDs are found | |
| pids = pid.split('\n') | |
| for pid in pids: | |
| subprocess.run(["kill", pid], check=True) | |
| messages.append(f"Process on port {port} shut down successfully.") | |
| else: | |
| messages.append(f"No process found on port {port}.") | |
| except subprocess.CalledProcessError: | |
| try: | |
| # If the first kill fails, attempt a forceful kill | |
| for pid in pids: | |
| subprocess.run(["kill", "-9", pid], check=True) | |
| messages.append(f"Process on port {port} force-killed successfully.") | |
| except subprocess.CalledProcessError as e: | |
| messages.append(f"Forceful kill failed on port {port}: {e}") | |
| return ' '.join(messages) | |
| def home(): | |
| if current_user.is_authenticated: | |
| # User is logged in | |
| return render_template('index.html') | |
| else: | |
| # User is not logged in | |
| return redirect(url_for('login')) | |
| #making links for folder directory and files | |
| def folder_index(): | |
| folder_path = session.get('folderpath') | |
| if folder_path is None: | |
| return "No folder path set in the session, please log in again.", 400 | |
| files = [] | |
| files = os.listdir(folder_path) | |
| file_links = [] | |
| for filename in files: | |
| file_path = os.path.join(folder_path, filename) | |
| is_directory = os.path.isdir(file_path) | |
| if is_directory: | |
| file_links.append({'filename': filename, 'path': f'/folder/{filename}', 'is_folder': True}) | |
| else: | |
| file_links.append({'filename': filename, 'path': f'/{filename}', 'is_folder': False}) | |
| return render_template('folder_index.html', files=file_links) | |
| #serving files from folder directory | |
| def serve_file(filename): | |
| if not current_user.is_authenticated: | |
| return redirect('/login') | |
| folderpath = session.get('folderpath') | |
| decoded_filename = unquote(filename) | |
| return send_from_directory(folderpath, decoded_filename, as_attachment=False) | |
| #making file links in subdirectory | |
| def subfolder_index(subfolder): | |
| folderpath = session.get('folderpath') | |
| folder_path = os.path.join(folderpath, subfolder) | |
| files = [] | |
| if os.path.exists(folder_path): | |
| files = os.listdir(folder_path) | |
| file_links = [] | |
| for filename in files: | |
| file_path = os.path.join(folder_path, filename) | |
| is_directory = os.path.isdir(file_path) | |
| if is_directory: | |
| file_links.append({'filename': filename, 'path': f'/folder/{subfolder}/{filename}', 'is_folder': True}) | |
| else: | |
| file_links.append({'filename': filename, 'path': f'/folder/{subfolder}/{filename}', 'is_folder': False}) | |
| return render_template('folder_index.html', files=file_links) | |
| def serve_file_or_subfolder(subfolder, filename, nested_subfolder=''): | |
| folderpath = session.get('folderpath') | |
| folder_path = os.path.join(folderpath, subfolder, nested_subfolder) | |
| decoded_filename = unquote(filename) | |
| if os.path.isdir(os.path.join(folder_path, decoded_filename)): | |
| # Render subfolder index | |
| files = os.listdir(os.path.join(folder_path, decoded_filename)) | |
| file_links = [] | |
| for file in files: | |
| file_path = os.path.join(folder_path, decoded_filename, file) | |
| is_directory = os.path.isdir(file_path) | |
| if is_directory: | |
| file_links.append({'filename': file, 'path': f'/folder/{subfolder}/{nested_subfolder}/{decoded_filename}/{file}', 'is_folder': True}) | |
| else: | |
| file_links.append({'filename': file, 'path': f'/folder/{subfolder}/{nested_subfolder}/{decoded_filename}/{file}', 'is_folder': False}) | |
| return render_template('folder_index.html', files=file_links) | |
| else: | |
| # Serve file | |
| return send_from_directory(folder_path, decoded_filename, as_attachment=False) | |
| def zip_summary(): | |
| folderpath = session.get('folderpath') | |
| folder = folderpath | |
| zip_password = f'{create_zip_password}'.encode('utf-8') # PASSWORD TO YOUR CHOICE HERE FOR ZIP ENCRYPT | |
| folder_to_zip = f'{folder}/summary' | |
| zip_filename = f'{folder}/my_summary.zip' | |
| try: | |
| """ | |
| with pyzipper.AESZipFile(zip_filename, | |
| 'w', | |
| compression=pyzipper.ZIP_DEFLATED, | |
| encryption=pyzipper.WZ_AES) as zipf: | |
| zipf.setpassword(zip_password) | |
| for root, _, files in os.walk(folder_to_zip): | |
| for file in files: | |
| zipf.write(os.path.join(root, file), | |
| os.path.relpath(os.path.join(root, file), | |
| folder_to_zip)) | |
| """ | |
| return render_template('success.html', message="ZIP file created and encrypted.") | |
| except Exception as e: | |
| return render_template('error.html', message=str(e)) | |
| #The Following 3 sections are removed in ver 2.2 as sudo in app is security risk | |
| #the following are to chart your medications and past medical history in fhir format | |
| def chart(): | |
| chart_json_url = url_for('custom_static', filename='chart.json') | |
| #vitals_json_url = url_for('custom_static', filename='vitals.json') | |
| return render_template('chart.html', chart_json_url=chart_json_url) | |
| def save_edits(): | |
| folderpath = session.get('folderpath', '') | |
| destination_dir3 = os.path.join(folderpath, 'summary') | |
| file_path = os.path.join(destination_dir3, 'chart.json') | |
| updatedData = request.json # Get the updated data from the request | |
| try: | |
| with open(file_path, 'w') as jsonFile: | |
| json.dump(updatedData, jsonFile, indent=4) | |
| return jsonify({"status": "success", "message": "Data saved successfully"}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| #Portal access viewer in Sky | |
| def pabv(): | |
| if current_user.is_authenticated: | |
| # User is logged in | |
| return render_template('pabv.html') | |
| else: | |
| # User is not logged in | |
| return redirect(url_for('login')) | |
| #Launches the darnabot with user ID | |
| def gradio_user(): | |
| user_id = current_user.id # Get the user's ID | |
| current_ip = request.host.split(':')[0] # Extract the IP | |
| return redirect(f"http://{current_ip}:3012?user={user_id}") | |
| # Keep the original custom_static function | |
| def custom_static(filename): | |
| folderpath = session.get('folderpath', '') | |
| directory = os.path.join(folderpath, 'summary') | |
| return send_from_directory(directory, filename) | |
| #AI TAG ASSISTANCE | |
| def get_db_connection(db_path): | |
| """Create a database connection to medical_records.db""" | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def format_file_size(size_in_bytes): | |
| """Convert bytes to human readable format""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_in_bytes < 1024.0: | |
| return f"{size_in_bytes:.1f} {unit}" | |
| size_in_bytes /= 1024.0 | |
| return f"{size_in_bytes:.1f} TB" | |
| def format_date(date_string): | |
| """Format date string to readable format""" | |
| try: | |
| dt = datetime.fromisoformat(date_string) | |
| return dt.strftime("%B %d, %Y at %I:%M %p") | |
| except: | |
| return date_string | |
| def format_json_metadata(json_string): | |
| """Format JSON string for display""" | |
| try: | |
| if json_string: | |
| data = json.loads(json_string) | |
| return json.dumps(data, indent=2) | |
| return None | |
| except: | |
| return json_string | |
| def search_files(search_term, folderpath): | |
| """Search files across all fields with folder path filtering""" | |
| # Get the summary directory path | |
| db_path = f"{folderpath}/medical_records.db" | |
| print("db_path", db_path) | |
| # Check if database exists | |
| if not os.path.exists(db_path): | |
| raise FileNotFoundError(f"Database not found at {db_path}") | |
| conn = get_db_connection(db_path) | |
| cursor = conn.cursor() | |
| # First, let's check what paths are actually in the database | |
| # (You can remove this debug code later) | |
| cursor.execute("SELECT DISTINCT file_path FROM files LIMIT 5") | |
| sample_paths = cursor.fetchall() | |
| print("Sample paths in DB:", sample_paths) | |
| if not search_term: | |
| # Return all files if no search term | |
| # Remove the file_path filter since we're already using the correct database | |
| query = """ | |
| SELECT | |
| filename, | |
| file_path, | |
| file_size, | |
| file_type, | |
| upload_date, | |
| embedded_metadata, | |
| ai_metadata | |
| FROM files | |
| WHERE filename != '.db' | |
| AND filename != 'chart.json' | |
| AND filename != 'medical_records.db' | |
| ORDER BY upload_date DESC | |
| """ | |
| cursor.execute(query) | |
| else: | |
| # Search across all relevant fields | |
| query = ''' | |
| SELECT | |
| filename, | |
| file_path, | |
| file_size, | |
| file_type, | |
| upload_date, | |
| embedded_metadata, | |
| ai_metadata | |
| FROM files | |
| WHERE (filename LIKE ? | |
| OR embedded_metadata LIKE ? | |
| OR ai_metadata LIKE ?) | |
| AND filename != '.db' | |
| AND filename != 'chart.json' | |
| AND filename != 'medical_records.db' | |
| ORDER BY upload_date DESC | |
| ''' | |
| search_pattern = f"%{search_term}%" | |
| cursor.execute(query, [search_pattern, search_pattern, search_pattern]) | |
| files = cursor.fetchall() | |
| conn.close() | |
| # Convert to list of dictionaries and add formatted fields | |
| file_list = [] | |
| for file in files: | |
| file_dict = dict(file) | |
| # Add formatted versions | |
| file_dict['file_size_formatted'] = format_file_size(file_dict.get('file_size', 0)) | |
| file_dict['upload_date_formatted'] = format_date(file_dict.get('upload_date', '')) | |
| file_dict['embedded_metadata_formatted'] = format_json_metadata(file_dict.get('embedded_metadata', '')) | |
| file_dict['ai_metadata_formatted'] = format_json_metadata(file_dict.get('ai_metadata', '')) | |
| file_list.append(file_dict) | |
| return file_list | |
| def categorize_files_from_db(files): | |
| """Categorize files from database based on their extensions""" | |
| categories = { | |
| 'pdf_files': [], | |
| 'xml_files': [], | |
| 'dicom_files': [], | |
| 'image_files': [], | |
| 'other_files': [] | |
| } | |
| # Define file extensions for each category | |
| pdf_extensions = {'.pdf'} | |
| xml_extensions = {'.xml'} | |
| dicom_extensions = {'.dcm', '.dicom'} | |
| image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.svg', '.webp'} | |
| for file in files: | |
| filename = file['filename'].lower() | |
| extension = Path(filename).suffix.lower() | |
| if extension in pdf_extensions: | |
| categories['pdf_files'].append(file) | |
| elif extension in xml_extensions: | |
| categories['xml_files'].append(file) | |
| elif extension in dicom_extensions: | |
| categories['dicom_files'].append(file) | |
| elif extension in image_extensions: | |
| categories['image_files'].append(file) | |
| else: | |
| categories['other_files'].append(file) | |
| return categories | |
| # View dicom files in Health files - MAIN SUMMARY ROUTE | |
| def dicom_files(): | |
| """Main summary page with search functionality""" | |
| folderpath = session.get('folderpath', '') | |
| directory = os.path.join(folderpath, 'summary') | |
| # Get search term from query parameters | |
| search_term = request.args.get('search', '').strip() | |
| # Try database first, then fallback to filesystem | |
| use_database = False | |
| total_results = 0 | |
| try: | |
| # Check if database exists and try to use it | |
| db_path = f"{directory}/medical_records.db" | |
| if os.path.exists(db_path): | |
| # Try to get files from database | |
| if search_term: | |
| files = search_files(search_term, directory) | |
| else: | |
| files = search_files(None, directory) # Get all files | |
| # Categorize files from database | |
| categories = categorize_files_from_db(files) | |
| # Calculate total results | |
| total_results = len(files) | |
| use_database = True | |
| # Extract the categorized files | |
| pdf_files = categories['pdf_files'] | |
| xml_files = categories['xml_files'] | |
| dicom_files = categories['dicom_files'] | |
| image_files = categories['image_files'] | |
| other_files = categories['other_files'] | |
| else: | |
| raise FileNotFoundError("Database not found, using filesystem") | |
| except Exception as e: | |
| # Fallback to filesystem if database fails | |
| print(f"Database error, falling back to filesystem: {str(e)}") | |
| # Lists to store file names (original method) | |
| pdf_files = [] | |
| xml_files = [] | |
| dicom_files = [] | |
| image_files = [] | |
| other_files = [] | |
| # Scan directory and categorize files by extension | |
| if os.path.exists(directory): | |
| for f in os.listdir(directory): | |
| # Skip database files | |
| if f in ['medical_records.db', '.db', 'chart.json']: | |
| continue | |
| # Create file dict for compatibility with template | |
| file_dict = {'filename': f} | |
| if f.lower().endswith(('.pdf', '.PDF')): | |
| pdf_files.append(file_dict) | |
| elif f.lower().endswith(('.xml', '.XML')): | |
| xml_files.append(file_dict) | |
| elif f.lower().endswith(('.dcm', '.dicom', '.DCM')): | |
| dicom_files.append(file_dict) | |
| elif f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.svg', '.webp')): | |
| image_files.append(file_dict) | |
| else: | |
| # Other files | |
| other_files.append(file_dict) | |
| use_database = False | |
| search_term = None | |
| total_results = len(pdf_files) + len(xml_files) + len(dicom_files) + len(image_files) + len(other_files) | |
| # Debug prints (like original) | |
| print(f"PDF files: {len(pdf_files)}") | |
| print(f"DICOM files: {len(dicom_files)}") | |
| print(f"Using database: {use_database}") | |
| # Pass the lists to the template | |
| return render_template('summary.html', | |
| pdf_files=pdf_files, | |
| xml_files=xml_files, | |
| dicom_files=dicom_files, | |
| image_files=image_files, | |
| other_files=other_files, | |
| search_term=search_term, | |
| total_results=total_results, | |
| use_database=use_database) | |
| def display_file(filename): | |
| """Display individual file""" | |
| folderpath = session.get('folderpath', '') | |
| directory = os.path.join(folderpath, 'summary') | |
| file_path = os.path.join(directory, filename) | |
| # Handle DICOM files | |
| if filename.lower().endswith(('.dcm', '.dicom', '.DCM')): | |
| # Load the DICOM file to compute max_slice | |
| dicom_data = pydicom.dcmread(file_path) | |
| # Ensure it's a multi-dimensional dataset to calculate max_slice | |
| if dicom_data.pixel_array.ndim > 2: | |
| max_slice = dicom_data.pixel_array.shape[0] - 1 | |
| else: | |
| max_slice = 0 # Handle single-slice (2D) DICOM images as well | |
| return render_template('view_dicom.html', filename=filename, max_slice=max_slice) | |
| # Directly serve PDF files | |
| if filename.lower().endswith('.pdf'): | |
| return send_from_directory(directory, filename, mimetype='application/pdf') | |
| # Directly serve XML files | |
| elif filename.lower().endswith('.xml'): | |
| return send_from_directory(directory, filename, mimetype='application/xml') | |
| # Serve image files | |
| elif filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.svg', '.webp')): | |
| # Determine appropriate mimetype | |
| extension = Path(filename).suffix.lower() | |
| mimetypes = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.bmp': 'image/bmp', | |
| '.tiff': 'image/tiff', | |
| '.svg': 'image/svg+xml', | |
| '.webp': 'image/webp' | |
| } | |
| return send_from_directory(directory, filename, mimetype=mimetypes.get(extension, 'application/octet-stream')) | |
| # Fallback for unsupported file types | |
| return 'Unsupported file type', 404 | |
| def serve_dicom_slice(filename, slice_index): | |
| folderpath = session.get('folderpath', '') | |
| directory = os.path.join(folderpath, 'summary') | |
| dicom_file_path = os.path.join(directory, filename) | |
| dicom_data = pydicom.dcmread(dicom_file_path) | |
| image_3d = apply_voi_lut(dicom_data.pixel_array, dicom_data) | |
| # Selecting the requested slice | |
| try: | |
| image_slice = image_3d[slice_index] | |
| except IndexError: | |
| return "Slice index out of range", 400 | |
| # Normalize and convert to uint8 | |
| image_slice = np.interp(image_slice, (image_slice.min(), image_slice.max()), (0, 255)) | |
| image_slice = image_slice.astype(np.uint8) | |
| # Convert to PNG | |
| buf = io.BytesIO() | |
| plt.imsave(buf, image_slice, cmap='gray', format='png') | |
| buf.seek(0) | |
| return send_file(buf, mimetype='image/png') | |
| def upload_file(): | |
| folderpath = session.get('folderpath', '') | |
| destination_dir1 = os.path.join(folderpath, 'ocr_files') | |
| upload_dir = os.path.join(folderpath, 'upload') | |
| destination_dir3 = os.path.join(folderpath, 'summary') | |
| if request.method == 'POST': | |
| file_type = request.form.get('Type') | |
| file = request.files.get('File') | |
| if file: | |
| filename = file.filename | |
| file_path1 = os.path.join(destination_dir1 if file_type == 'HL_File' else folderpath, filename) | |
| file.save(file_path1) # Save the file temporarily | |
| try: | |
| # Run clamscan on the uploaded file | |
| result = subprocess.run(['clamscan', '-r', file_path1], capture_output=True, text=True, check=True) | |
| if "Infected files: 0" in result.stdout: | |
| if file_type == 'HL_File': | |
| file_path2 = os.path.join(destination_dir3, filename) | |
| # Use subprocess to copy the file | |
| if os.name == 'posix': # Unix-based system | |
| subprocess.run(['cp', file_path1, file_path2], check=True) | |
| elif os.name == 'nt': # Windows | |
| subprocess.run(['copy', file_path1, file_path2], shell=True, check=True) | |
| return render_template('success.html') | |
| else: | |
| os.remove(file_path1) # Remove the infected file | |
| return render_template('upload.html', message='File is infected!') | |
| except subprocess.CalledProcessError as e: | |
| print(f"An error occurred during file operation: {e}") | |
| os.remove(file_path1) # Ensure the source file is removed in case of error | |
| return render_template('upload.html', message='An error occurred during file processing!') | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| if file_type == 'HL_File': | |
| file_path2 = os.path.join(destination_dir3, filename) | |
| # Use subprocess to copy the file | |
| if os.name == 'posix': # Unix-based system | |
| subprocess.run(['cp', file_path1, file_path2], check=True) | |
| elif os.name == 'nt': # Windows | |
| subprocess.run(['copy', file_path1, file_path2], shell=True, check=True) | |
| return render_template('success.html') # Proceed if ClamAV scan is not performed | |
| return render_template('upload.html') | |
| def connect_nc(): | |
| url = request.remote_addr | |
| client_ip = request.remote_addr | |
| if url == '0.0.0.0': | |
| url = f"http://{ip_address}:7860" | |
| else: | |
| url = f"http://{url}:7860" | |
| print(url) | |
| qr = qrcode.QRCode() | |
| qr.add_data(url) | |
| qr.make() | |
| image = qr.make_image() | |
| image.save(f'{HS_path}/static/qrcode.png') | |
| return render_template('connect_nc.html') | |
| #update password in connect_nc | |
| def register(): | |
| if request.method == 'POST': | |
| # Get the new password from the form | |
| new_password = request.form['password'] | |
| # Hash the new password | |
| hashed_new_password = bcrypt.generate_password_hash(new_password).decode('utf-8') | |
| # Look up the current user in your database | |
| user = User.query.filter_by(username=current_user.username).first() | |
| if user: | |
| # Update the password in the database | |
| user.password = hashed_new_password | |
| db.session.commit() | |
| flash('Password change successful. You can now log in.') | |
| return redirect(url_for('login')) | |
| else: | |
| flash('Error: User not found.') | |
| return redirect(url_for('register')) | |
| return render_template('register.html') | |
| def pi(): | |
| with open(f'/{HS_path}/install_module/templates/index2.html', 'r') as f: | |
| content = f.read() | |
| return Response(content, content_type='text/html') | |
| """ | |
| @app.route('/analyze', methods=['GET', 'POST']) | |
| @login_required | |
| def analyze(): | |
| if request.method == 'POST': | |
| age = request.form['age'] | |
| sex = request.form['sex'] | |
| ignore_words = request.form['ignore-words'] | |
| print(f"Age: {age}") | |
| print(f"Sex: {sex}") | |
| print(f"Ignore Words: {ignore_words}") | |
| formatted_ignore_words = ignore_words.replace(' ', '|') | |
| content = f"age = '{age}'\nsex = '{sex}'\nignore_words = '{formatted_ignore_words}'\n" | |
| file_path = f"{HS_path}/variables/variables2.py" | |
| try: | |
| with open(file_path, 'w') as file: | |
| file.write(content) | |
| except Exception as e: | |
| print(f"Error writing to variables2.py: {str(e)}") | |
| return str(e) | |
| # Run the analyze script asynchronously using nohup and pass session cookie | |
| folderpath = session.get('folderpath') | |
| env_vars = os.environ.copy() | |
| env_vars['FOLDERPATH'] = folderpath | |
| command = f'python3 {HS_path}/analyze.py' | |
| #command = f'nohup python3 {HS_path}/analyze.py > /dev/null 2>&1 &' | |
| #subprocess.Popen(command, shell=True, env=env_vars) | |
| print("Process time is 3 minutes") | |
| try: | |
| subprocess.Popen(command, shell=True, env=env_vars) | |
| except Exception as e: | |
| print(f"Error running analyze.py: {str(e)}") | |
| return str(e) | |
| return render_template('success.html', message="Process time is 3 minutes") | |
| return render_template('analyze.html', submitted=True) | |
| """ | |
| def serve_install_module(filename): | |
| return send_from_directory('install_module', filename) | |
| def install(): | |
| print("Inside /install") | |
| app_name_file = request.args.get('app_name_file') + '.js' | |
| app_folder = request.args.get('app_folder') | |
| session['app_name_file']= app_name_file | |
| app_name = session.get('app_name_file', '').replace('.js', '') | |
| print(f"Session variables: {session}") | |
| # Additional logic to process the app_name_file and app_folder if needed | |
| return render_template('install.html', app_name_file=app_name_file, app_folder=app_folder) | |
| # executes the install script | |
| def execute_script(): | |
| try: | |
| print("Inside /execute_script") | |
| # List of allowed app names | |
| allowed_app_names = ['Ibs_Module', 'Immunization_Tracker', 'Weight_Tracker', 'Tailscale', 'Dock', 'Strep_Module', 'Anxiety_Module'] | |
| app_name = session.get('app_name_file', '').replace('.js', '') | |
| if not app_name: | |
| return jsonify(success=False, error="Missing app_name") | |
| # Validate app_name | |
| if app_name not in allowed_app_names: | |
| return jsonify(success=False, error=f"Invalid app_name. Allowed values are: {', '.join(allowed_app_names)}") | |
| app_name_file = session['app_name_file'] | |
| # import ip_address from variables and pass to env_var to install app_name | |
| url = f"http://{ip_address}" | |
| env_vars = os.environ.copy() | |
| env_vars['URL'] = url | |
| #cmd = ['python3', f'install_module/{app_name}/{app_name}.py'] | |
| print(cmd) | |
| proc = subprocess.Popen(cmd, env=env_vars, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) | |
| stdout, stderr = proc.communicate() | |
| if proc.returncode == 0: | |
| return jsonify(success=True, message="Please Refresh App") | |
| else: | |
| print(f'Subprocess output: {stdout}') | |
| print(f'Subprocess error: {stderr}') | |
| return jsonify(success=False, error="Non-zero return code") | |
| print(f"Session variables: {session}") | |
| except Exception as e: | |
| return jsonify(success=False, error=str(e)) | |
| def page_not_found(error): | |
| print("Error 404 Encountered") | |
| return render_template('errors.html', error_message='Page not found'), 404 | |
| if __name__== '__main__': | |
| app.run('0.0.0.0', port=7860) | |