File size: 6,808 Bytes
7860c2e
c83b398
 
7860c2e
 
 
7c8002a
c83b398
7860c2e
 
 
 
 
 
 
 
 
 
c83b398
 
 
 
 
 
 
7860c2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c83b398
7860c2e
962e6f5
 
 
 
7860c2e
 
 
 
c83b398
962e6f5
c83b398
7860c2e
 
c83b398
 
7860c2e
c83b398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97cf335
 
 
 
 
 
 
 
 
 
 
 
c83b398
97cf335
 
 
 
 
 
 
 
 
 
 
 
 
c83b398
 
 
 
 
 
97cf335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c83b398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
962e6f5
 
c83b398
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import os
import zipfile
from io import BytesIO

import pandas as pd
import streamlit as st

# === Fonctions ===


def find_header_row(df, keyword="Dist_Name"):
    for i in range(min(20, len(df))):
        row = df.iloc[i].astype(str).str.strip().str.lower()
        if any(keyword.lower() in str(cell) for cell in row):
            return i
    raise ValueError(f"No row with '{keyword}' found.")


def read_sheet_fallback(file_bytes, sheet):
    file_bytes.seek(0)
    return pd.read_excel(file_bytes, sheet_name=sheet, header=None, engine="calamine")


def load_clean_df(file_bytes, sheet):
    df_raw = read_sheet_fallback(file_bytes, sheet)
    header_row = find_header_row(df_raw)
    df_raw.columns = df_raw.iloc[header_row]
    df = df_raw.drop(index=list(range(header_row + 1)))
    df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns]
    df = df.astype(str).apply(lambda col: col.str.strip())
    return df


def detect_dist_col(columns):
    for col in columns:
        if "dist" in col.lower() and "name" in col.lower():
            return col
    raise ValueError("Dist_Name column not found.")


# === Interface Streamlit ===

st.title("📊 Dump Compare Tool")
st.markdown(
    ":blue[**Upload the old and new dumps, then input the object class (comma-separated) to compare**]"
)

old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old")
new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new")

sheet_list_input = st.text_input(
    "Enter object class (comma-separated)", placeholder="e.g. BCF, BTS, CELL"
)

if st.button("Run Comparison", type="primary", use_container_width=True):
    if not all([old_file, new_file, sheet_list_input.strip()]):
        st.warning("Please upload both files and provide at least one sheet name.")
    else:
        sheet_names = [s.strip() for s in sheet_list_input.split(",") if s.strip()]
        old_bytes = BytesIO(old_file.read())
        new_bytes = BytesIO(new_file.read())

        logs = []
        total = 0
        all_results = {}

        for sheet in sheet_names:
            try:
                df_old = load_clean_df(old_bytes, sheet)
                old_bytes.seek(0)
                df_new = load_clean_df(new_bytes, sheet)
                new_bytes.seek(0)

                dist_col_old = detect_dist_col(df_old.columns)
                dist_col_new = detect_dist_col(df_new.columns)

                df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old)
                df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new)

                missing_in_new = df_old.index.difference(df_new.index)
                missing_in_old = df_new.index.difference(df_old.index)

                if not missing_in_new.empty:
                    logs.append(
                        f"{len(missing_in_new)} entrées présentes dans l'ancien dump mais absentes dans le nouveau pour '{sheet}'."
                    )
                if not missing_in_old.empty:
                    logs.append(
                        f"{len(missing_in_old)} entrées présentes dans le nouveau dump mais absentes dans l'ancien pour '{sheet}'."
                    )

                common = df_old.index.intersection(df_new.index)
                if common.empty:
                    logs.append(f"Aucun Dist_Name commun trouvé pour '{sheet}'.")
                    continue

                common_cols = df_old.columns.intersection(df_new.columns)
                if common_cols.empty:
                    logs.append(
                        f"Aucune colonne commune entre les dumps pour '{sheet}'."
                    )
                    continue

                df_old_common = df_old.loc[common, common_cols]
                df_new_common = df_new.loc[common, common_cols]

                mask = (df_old_common != df_new_common) & ~(
                    df_old_common.isna() & df_new_common.isna()
                )

                changes = []
                for dist in missing_in_new:
                    changes.append(
                        {
                            "Dist_Name": dist,
                            "Parameter": "Présence ligne",
                            os.path.basename(old_file.name): "Présent",
                            os.path.basename(new_file.name): "Manquant",
                        }
                    )
                for dist in missing_in_old:
                    changes.append(
                        {
                            "Dist_Name": dist,
                            "Parameter": "Présence ligne",
                            os.path.basename(old_file.name): "Manquant",
                            os.path.basename(new_file.name): "Présent",
                        }
                    )
                for dist in mask.index:
                    for param in mask.columns[mask.loc[dist]]:
                        if param.strip().lower() == "file_name":
                            continue
                        changes.append(
                            {
                                "Dist_Name": dist,
                                "Parameter": param,
                                os.path.basename(old_file.name): df_old_common.loc[
                                    dist, param
                                ],
                                os.path.basename(new_file.name): df_new_common.loc[
                                    dist, param
                                ],
                            }
                        )

                df_changes = pd.DataFrame(changes)
                if not df_changes.empty:
                    all_results[sheet] = df_changes
                    logs.append(f"{len(df_changes)} changes in '{sheet}'")
                    total += len(df_changes)
                else:
                    logs.append(f"No changes in '{sheet}'")

            except Exception as e:
                logs.append(f"❌ Error in '{sheet}': {e}")

        st.success(f"✅ Comparison completed. Total changes: {total}")
        for log in logs:
            st.write(log)

        if all_results:
            output_buffer = BytesIO()
            with zipfile.ZipFile(output_buffer, mode="w") as zf:
                for sheet, df in all_results.items():
                    file_buffer = BytesIO()
                    df.to_excel(file_buffer, index=False)
                    zf.writestr(f"{sheet}_differences.xlsx", file_buffer.getvalue())

            st.download_button(
                "Download Results (.zip)",
                data=output_buffer.getvalue(),
                file_name="differences.zip",
                mime="application/zip",
                type="primary",
                on_click="ignore",
            )