bitbcpy / bitbcpy.py
mah000's picture
Initial commit for Hugging Face Space
9167aa8
import streamlit as st
import re
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
import time
import openpyxl
import os
from dotenv import load_dotenv
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
def parse_bitable_url(url):
"""
Extracts app_token and table_id from a Bitable URL.
Expected format: .../base/<app_token>?table=<table_id>...
"""
try:
# Extract app_token
app_token_match = re.search(r'/base/([^/?]+)', url)
if not app_token_match:
return None, None
app_token = app_token_match.group(1)
# Extract table_id
table_id_match = re.search(r'table=([^&]+)', url)
if not table_id_match:
return None, None
table_id = table_id_match.group(1)
return app_token, table_id
except Exception:
return None, None
def get_bitable_client(app_id, app_secret):
return lark.Client.builder().app_id(app_id).app_secret(app_secret).build()
def get_table_fields(client, app_token, table_id):
logger.info(f"Fetching fields for app_token={app_token}, table_id={table_id}")
fields = []
page_token = None
has_more = True
while has_more:
req = ListAppTableFieldRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(100)
if page_token:
req.page_token(page_token)
req = req.build()
try:
resp = client.bitable.v1.app_table_field.list(req)
except Exception as e:
logger.error(f"API call failed: {e}", exc_info=True)
raise
if not resp.success():
logger.error(f"Failed to list fields: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}")
raise Exception(f"Failed to list fields: {resp.msg}, log_id: {resp.get_log_id()}")
if resp.data and resp.data.items:
fields.extend(resp.data.items)
logger.info(f"Fetched {len(resp.data.items)} fields. Current total: {len(fields)}")
logger.info(f"Page token: {page_token} -> {resp.data.page_token}, Has more: {resp.data.has_more}")
# Determine if we should continue based on result count
# Some APIs return has_more=True even if they return fewer items than page_size on the last page.
# But generally, if len(items) < page_size, it's the last page.
# Or if len(items) == 0, we are done.
items_count = len(resp.data.items) if (resp.data and resp.data.items) else 0
if items_count < 100:
has_more = False
else:
has_more = resp.data.has_more
# Still keep the infinite loop safeguard just in case
new_page_token = resp.data.page_token
if has_more and new_page_token == page_token:
logger.warning("Infinite loop detected: page_token did not update. Breaking.")
has_more = False
page_token = new_page_token
logger.info(f"Total fields fetched: {len(fields)}")
return fields
def validate_primary_keys(excel_columns, target_fields):
# For Excel, we assume the first column is the primary key (or at least the one to check against)
# But usually Excel doesn't have strict schema.
# However, the user asked to check "Primary Key Name Type".
# We can check if the Target PK name exists in Excel columns.
target_pk = next((f for f in target_fields if f.is_primary), None)
if not target_pk:
return False, "Target table has no primary key."
if target_pk.field_name not in excel_columns:
return False, f"Target Primary Key '{target_pk.field_name}' not found in Excel columns."
# We can't strictly validate TYPE from Excel as Excel types are loose,
# but we validated the existence of the PK column.
return True, f"Primary key '{target_pk.field_name}' found in Excel."
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _send_batch(client, app_token, table_id, records):
req = BatchCreateAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.request_body(BatchCreateAppTableRecordRequestBody.builder().records(records).build()) \
.build()
resp = client.bitable.v1.app_table_record.batch_create(req)
if not resp.success():
logger.error(f"Batch create failed: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}")
if resp.error:
logger.error(f"Error details: {resp.error}")
raise Exception(f"Batch create failed: {resp.msg}, log_id: {resp.get_log_id()}")
def sync_data(client, excel_rows, excel_columns, special_col_name, manual_source_value, excel_file_name, target_app_token, target_table_id, target_fields):
target_field_names = {f.field_name: f for f in target_fields}
batch_records = []
total_records = len(excel_rows)
if total_records == 0:
st.warning("No records to sync.")
return
progress_bar = st.progress(0)
status_text = st.empty()
processed_count = 0
for row in excel_rows:
new_fields = {}
# row is a tuple/list matching excel_columns order
row_dict = dict(zip(excel_columns, row))
# 1. Copy common fields
for col_name, val in row_dict.items():
if col_name in target_field_names and val is not None:
# Basic type conversion for API
# Excel reads everything as is (int, float, datetime, string)
# API expects specific formats sometimes.
# For simplicity, we send as is, assuming Lark SDK or API handles basic types.
# DateTime might need conversion to timestamp if API requires it.
# Lark API expects Unix timestamp (ms) for Date fields usually.
target_type = target_field_names[col_name].type
if target_type == 5: # Date
if hasattr(val, 'timestamp'): # datetime object
new_fields[col_name] = int(val.timestamp() * 1000)
else:
new_fields[col_name] = val
elif target_type == 4: # MultiSelect
# Excel might have comma separated string?
if isinstance(val, str):
new_fields[col_name] = [v.strip() for v in val.split(',')]
else:
new_fields[col_name] = val
elif target_type == 11: # User
# User field is complex (list of objects). Excel might just have names.
# We might skip or try to pass simple text?
# Writing text to User field usually fails or requires specific format.
# Let's skip complex fields for now or assume user knows what they are doing.
pass
else:
new_fields[col_name] = val
# 2. Handle Special "Source Record" Column
if special_col_name and special_col_name in target_field_names:
target_col_def = target_field_names[special_col_name]
# Use manual value provided by user if available, otherwise fallback to filename
final_val = manual_source_value if manual_source_value else f"Imported from {excel_file_name}"
if target_col_def.type == 15: # Hyperlink
# If final_val is a string that looks like a URL, use it
# If it's just text, treat as text
link_url = str(final_val) if final_val else ""
if not link_url.startswith("http"):
link_url = "" # Can't link if no URL
new_fields[special_col_name] = {
"link": link_url,
"text": str(final_val)
}
else:
new_fields[special_col_name] = final_val
batch_records.append(AppTableRecord.builder().fields(new_fields).build())
if len(batch_records) >= 100:
_send_batch(client, target_app_token, target_table_id, batch_records)
processed_count += len(batch_records)
progress_bar.progress(processed_count / total_records)
status_text.text(f"Synced {processed_count}/{total_records} records")
batch_records = []
if batch_records:
_send_batch(client, target_app_token, target_table_id, batch_records)
processed_count += len(batch_records)
progress_bar.progress(1.0)
status_text.text(f"Synced {processed_count}/{total_records} records")
def main():
st.set_page_config(page_title="Bitbcpy - 多维表格合并工具", layout="wide")
st.title("Bitbcpy - 多维表格合并脚本")
st.markdown("将本地 Excel 的行抄到目标表,同名列合并,忽略目标表不存在的列。")
if 'target_fields' not in st.session_state:
st.session_state.target_fields = []
if 'target_fields_loaded' not in st.session_state:
st.session_state.target_fields_loaded = False
with st.sidebar:
st.header("配置")
default_app_id = os.getenv("LARK_OAPI_APPID", "")
default_app_secret = os.getenv("LARK_OAPI_SECRET", "")
app_id = st.text_input("App ID", value=default_app_id)
app_secret = st.text_input("App Secret", type="password", value=default_app_secret)
target_url = st.text_input("目标文档链接")
if st.button("读取目标表字段"):
if not app_id or not app_secret:
st.error("请填写 App ID 和 App Secret")
elif not target_url:
st.error("请填写目标文档链接")
else:
t_token, t_table = parse_bitable_url(target_url)
if not (t_token and t_table):
st.error("目标链接解析失败")
else:
client = get_bitable_client(app_id, app_secret)
try:
with st.spinner("Fetching schema..."):
fields = get_table_fields(client, t_token, t_table)
st.session_state.target_fields = fields
st.session_state.target_fields_loaded = True
st.success("字段加载成功")
except Exception as e:
st.error(f"Error: {str(e)}")
source_col = None
manual_source_val = None
if st.session_state.target_fields_loaded:
field_names = [f.field_name for f in st.session_state.target_fields]
source_col = st.selectbox("哪一列记录来源", options=field_names)
manual_source_val = st.text_input("该列统一填写的来源信息", help="例如:2023年Q1销售数据")
uploaded_file = st.file_uploader("上传来源 Excel (xlsx)", type=['xlsx'])
if st.button("开始合并"):
if not app_id or not app_secret:
st.error("请在左侧侧边栏填写 App ID 和 App Secret")
return
if not uploaded_file:
st.error("请上传来源 Excel 文件")
return
t_token, t_table = parse_bitable_url(target_url)
if not (t_token and t_table):
st.error("目标链接解析失败,请检查链接格式")
return
client = get_bitable_client(app_id, app_secret)
try:
# Read Excel
wb = openpyxl.load_workbook(uploaded_file, data_only=True)
sheet = wb.active
# Assuming first row is header
rows = list(sheet.iter_rows(values_only=True))
if not rows:
st.error("Excel 文件为空")
return
header = rows[0]
data_rows = rows[1:]
# Filter None headers
excel_columns = [str(h) for h in header if h is not None]
# Use cached fields if available, else fetch
t_fields = st.session_state.target_fields
if not t_fields:
with st.spinner("Fetching target table schema..."):
t_fields = get_table_fields(client, t_token, t_table)
valid, msg = validate_primary_keys(excel_columns, t_fields)
if not valid:
st.error(f"主键校验失败: {msg}")
return
else:
st.success("主键校验通过")
st.info(f"Found {len(data_rows)} records in Excel.")
sync_data(client, data_rows, excel_columns, source_col, manual_source_val, uploaded_file.name, t_token, t_table, t_fields)
st.success("合并完成!")
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
st.error(f"Error: {str(e)}")
if __name__ == "__main__":
main()