Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| from azure.cosmos import CosmosClient, PartitionKey | |
| from azure.storage.blob import BlobServiceClient | |
| import requests | |
| # Environment Variables | |
| COSMOS_CONNECTION_STRING = os.getenv('COSMOS_CONNECTION_STRING') | |
| BLOB_STORAGE_CONNECTION_STRING = os.getenv('BLOB_STORAGE_CONNECTION_STRING') | |
| # Initialize Azure Cosmos DB Client | |
| cosmos_client = CosmosClient.from_connection_string(COSMOS_CONNECTION_STRING) | |
| # Initialize Azure Blob Storage Client | |
| blob_service = BlobServiceClient.from_connection_string(BLOB_STORAGE_CONNECTION_STRING) | |
| # Streamlit UI | |
| st.title('Azure Services Integration with Streamlit') | |
| # Azure Cosmos DB - CRUD Operations | |
| st.subheader('Azure Cosmos DB - CRUD Operations') | |
| cosmos_db = st.text_input('Database Name') | |
| cosmos_container = st.text_input('Container Name') | |
| item_id = st.text_input("Item ID (for Read, Update, Delete)") | |
| item_data = st.text_area("Item Data (JSON format, for Create and Update)") | |
| if st.button('Create Item in Cosmos DB'): | |
| container = cosmos_client.get_database_client(cosmos_db).get_container_client(cosmos_container) | |
| container.create_item(item_data) | |
| if st.button('Read Item from Cosmos DB'): | |
| container = cosmos_client.get_database_client(cosmos_db).get_container_client(cosmos_container) | |
| item = container.read_item(item_id, partition_key=PartitionKey(item_id)) | |
| st.json(item) | |
| # Azure Blob Storage - Upload/Download | |
| st.subheader('Azure Blob Storage - Upload/Download') | |
| blob_container = st.text_input('Blob Container') | |
| blob_file = st.file_uploader('Upload file to Blob') | |
| if blob_file is not None and st.button('Upload to Blob'): | |
| blob_client = blob_service.get_blob_client(container=blob_container, blob=blob_file.name) | |
| blob_client.upload_blob(blob_file.getvalue()) | |
| # Azure Functions - Trigger | |
| st.subheader('Azure Functions - Trigger') | |
| function_url = st.text_input('Function URL') | |
| if st.button('Call Azure Function'): | |
| response = requests.get(function_url) | |
| st.write('Function Response:', response.text) | |