Spaces:
Runtime error
Runtime error
| import json | |
| import re | |
| from specklepy.transports.server import ServerTransport | |
| from specklepy.api import operations | |
| import time | |
| def get_database_properties(database_pages): | |
| # Iterate through the results (each page corresponds to a row in the database) | |
| propList = [] | |
| for page in database_pages: | |
| # Print the name and type of each property in this page | |
| for prop_name, prop_data in page['properties'].items(): | |
| prop_type = prop_data['type'] | |
| propList.append(prop_name) | |
| break | |
| return propList | |
| # query full database | |
| def fetch_all_database_pages(client, database_id): | |
| """ | |
| Fetches all pages from a specified Notion database. | |
| :param client: Initialized Notion client. | |
| :param database_id: The ID of the Notion database to query. | |
| :return: A list containing all pages from the database. | |
| """ | |
| start_cursor = None | |
| all_pages = [] | |
| while True: | |
| response = client.databases.query( | |
| **{ | |
| "database_id": database_id, | |
| "start_cursor": start_cursor | |
| } | |
| ) | |
| all_pages.extend(response['results']) | |
| # Check if there's more data to fetch | |
| if response['has_more']: | |
| start_cursor = response['next_cursor'] | |
| else: | |
| break | |
| return all_pages | |
| def get_property_value(page, property_name): | |
| """ | |
| Extracts the value from a specific property in a Notion page based on its type. | |
| :param page: The Notion page data as retrieved from the API. | |
| :param property_name: The name of the property whose value is to be fetched. | |
| :return: The value or values contained in the specified property, depending on type. | |
| """ | |
| try: | |
| # Check if the property exists in the page | |
| if property_name not in page['properties']: | |
| return None # or raise an error if you prefer | |
| property_data = page['properties'][property_name] | |
| prop_type = property_data['type'] | |
| # Handle 'title' and 'rich_text' types | |
| if prop_type in ['title', 'rich_text']: | |
| return ''.join(text_block['text']['content'] for text_block in property_data[prop_type]) | |
| # Handle 'number' type | |
| elif prop_type == 'number': | |
| return property_data[prop_type] | |
| # Handle 'select' type | |
| elif prop_type == 'select': | |
| return property_data[prop_type]['name'] if property_data[prop_type] else None | |
| # Handle 'multi_select' type | |
| elif prop_type == 'multi_select': | |
| return [option['name'] for option in property_data[prop_type]] | |
| # Handle 'date' type | |
| elif prop_type == 'date': | |
| if property_data[prop_type]['end']: | |
| return (property_data[prop_type]['start'], property_data[prop_type]['end']) | |
| else: | |
| return property_data[prop_type]['start'] | |
| # Handle 'relation' type | |
| elif prop_type == 'relation': | |
| return [relation['id'] for relation in property_data[prop_type]] | |
| # Handle 'people' type | |
| elif prop_type == 'people': | |
| return [person['name'] for person in property_data[prop_type] if 'name' in person] | |
| # Add more handlers as needed for other property types | |
| else: | |
| # Return None or raise an error for unsupported property types | |
| return "NA" | |
| except: | |
| print("atttribute probably not found") | |
| return "NA" | |
| def parse_invalid_json(json_string): | |
| if json_string == None or json_string == "none": | |
| return None | |
| # Replace fancy quotes and single quotes with standard double quotes | |
| json_string = re.sub(r"[‘’“”]", '"', json_string) | |
| json_string = json_string.replace("'", '"') | |
| # Add quotes around any unquoted keys | |
| json_string = re.sub(r'(?<!")(\b\w+\b)(?!"):', r'"\1":', json_string) | |
| # Handle unquoted numeric values or booleans if necessary | |
| # This part can be customized based on specific requirements | |
| try: | |
| # Try parsing the corrected string | |
| return json.loads(json_string) | |
| except json.JSONDecodeError as e: | |
| # Handle parsing error (or re-raise the exception) | |
| print("JSON parsing error:", e) | |
| print(json_string) | |
| return None | |
| def notionTable2JSON(databaseFUll_pages, kpi_database_pages): | |
| attributeMetaData = {} | |
| availableAttributes = {} | |
| cnt = 0 | |
| # skip if isUsed == False | |
| for page in databaseFUll_pages: | |
| curAttrName = get_property_value(page, "name") | |
| print(curAttrName) | |
| lev1 = get_property_value(page, "level_1") | |
| lev2 = get_property_value(page, "level_2") | |
| lev3= get_property_value(page, "level_3") | |
| try: | |
| if get_property_value(page, "isUsed") == "False" or get_property_value(page, "isUsed") == "false" or get_property_value(page, "isUsed") == False: | |
| # skip attribute | |
| continue | |
| except: | |
| pass | |
| if lev1 != None and lev1 != "NA": | |
| if lev1 not in availableAttributes: | |
| availableAttributes[lev1] = {"sub-levels": {}, "values": []} | |
| if lev2 != None and lev2 != "NA": | |
| if lev2 not in availableAttributes[lev1]["sub-levels"]: | |
| availableAttributes[lev1]["sub-levels"][lev2] = {"sub-levels": {}, "values": []} | |
| if lev3 != None and lev3 != "NA": | |
| if lev3 not in availableAttributes[lev1]["sub-levels"][lev2]["sub-levels"]: | |
| availableAttributes[lev1]["sub-levels"][lev2]["sub-levels"][lev3] = {"values": []} | |
| availableAttributes[lev1]["sub-levels"][lev2]["sub-levels"][lev3]["values"].append(curAttrName) | |
| else: | |
| availableAttributes[lev1]["sub-levels"][lev2]["values"].append(curAttrName) | |
| else: | |
| availableAttributes[lev1]["values"].append(curAttrName) | |
| # attributeMetadata | |
| attributeData = { | |
| "name": get_property_value(page, "name"), | |
| "nameShort": get_property_value(page, "nameShort"), | |
| "nameLong": get_property_value(page, "nameLong"), | |
| "description": get_property_value(page, "description"), | |
| "indicator": get_property_value(page, "indicator"), | |
| "unit": get_property_value(page, "unit"), | |
| "unitShort": get_property_value(page, "unitShort"), | |
| "spatialUnit": get_property_value(page, "spatialUnit"), | |
| "method": get_property_value(page, "method"), | |
| "type": get_property_value(page, "type"), | |
| "colorMapping": parse_invalid_json(get_property_value(page, "colorMapping")), | |
| "parameter": parse_invalid_json(get_property_value(page, "parameter")), | |
| "level_1": get_property_value(page, "level_1"), | |
| "level_2": get_property_value(page, "level_2"), | |
| "level_3": get_property_value(page, "level_3"), | |
| "isUsed": get_property_value(page, "isUsed"), | |
| "dataType": get_property_value(page, "dataType"), | |
| "dataSet": "NA", | |
| "dataSource": "NA", | |
| "isWeight": get_property_value(page, "isWeight"), | |
| "KPI": [], | |
| "visualisation": [] | |
| } | |
| try: | |
| attributeData["categoryData"] = parse_invalid_json(get_property_value(page, "categoryData")) | |
| except: | |
| attributeData["categoryData"]= "NA" | |
| # iterated through list of KPI ref. Ids | |
| kpiIDs = get_property_value(page, "KPI") | |
| for kpiID in kpiIDs: | |
| curKPI = get_page_by_id(kpi_database_pages, kpiID) | |
| KPI_template ={ | |
| "name":get_property_value(curKPI, "name"), | |
| "type":get_property_value(curKPI, "type"), | |
| "unit":get_property_value(curKPI, "unit"), | |
| "color":parse_invalid_json(get_property_value(curKPI, "color")), | |
| "nameShort": get_property_value(curKPI, "nameShort"), | |
| "quality":get_property_value(curKPI, "quality"), | |
| "args": parse_invalid_json(get_property_value(curKPI, "args")), | |
| "description":get_property_value(curKPI, "description"), | |
| "interpretrationHigh":get_property_value(curKPI, "interpretrationHigh"), | |
| "interpretationLow":get_property_value(curKPI, "interpretationLow"), | |
| } | |
| # add KPI data to attributeData | |
| attributeData["KPI"].append(KPI_template) | |
| # add to main dictioanry | |
| attributeMetaData[get_property_value(page, "name")] = attributeData | |
| print("processed pages:", cnt) | |
| return attributeMetaData, availableAttributes | |
| def get_page_by_id(notion_db_pages, page_id): | |
| for pg in notion_db_pages: | |
| if pg["id"] == page_id: | |
| return pg | |
| def getSpeckleStream(stream_id, | |
| branch_name, | |
| client, | |
| commit_id="" | |
| ): | |
| """ | |
| Retrieves data from a specific branch of a speckle stream. | |
| Args: | |
| stream_id (str): The ID of the speckle stream. | |
| branch_name (str): The name of the branch within the speckle stream. | |
| client (specklepy.api.client.Client, optional): A speckle client. Defaults to a global `client`. | |
| commit_id (str): id of a commit, if nothing is specified, the latest commit will be fetched | |
| Returns: | |
| dict: The speckle stream data received from the specified branch. | |
| This function retrieves the last commit from a specific branch of a speckle stream. | |
| It uses the provided speckle client to get the branch and commit information, and then | |
| retrieves the speckle stream data associated with the last commit. | |
| It prints out the branch details and the creation dates of the last three commits for debugging purposes. | |
| """ | |
| print("updated A") | |
| # set stream and branch | |
| try: | |
| branch = client.branch.get(stream_id, branch_name, 3) | |
| print(branch) | |
| except: | |
| branch = client.branch.get(stream_id, branch_name, 1) | |
| print(branch) | |
| print("last three commits:") | |
| [print(ite.createdAt) for ite in branch.commits.items] | |
| if commit_id == "": | |
| latest_commit = branch.commits.items[0] | |
| choosen_commit_id = latest_commit.id | |
| commit = client.commit.get(stream_id, choosen_commit_id) | |
| print("latest commit ", branch.commits.items[0].createdAt, " was choosen") | |
| elif type(commit_id) == type("s"): # string, commit uuid | |
| choosen_commit_id = commit_id | |
| commit = client.commit.get(stream_id, choosen_commit_id) | |
| print("provided commit ", choosen_commit_id, " was choosen") | |
| elif type(commit_id) == type(1): #int | |
| latest_commit = branch.commits.items[commit_id] | |
| choosen_commit_id = latest_commit.id | |
| commit = client.commit.get(stream_id, choosen_commit_id) | |
| print(commit) | |
| print(commit.referencedObject) | |
| # get transport | |
| transport = ServerTransport(client=client, stream_id=stream_id) | |
| #speckle stream | |
| res = operations.receive(commit.referencedObject, transport) | |
| return res, choosen_commit_id |