Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import geopandas as gpd | |
| import numpy as np | |
| import pandas as pd | |
| import pyproj | |
| import requests | |
| from shapely.geometry import Point | |
| from shapely.ops import transform | |
| import gradio as gr | |
| from io import StringIO, BytesIO | |
| from pandas import ExcelFile | |
| from datetime import datetime | |
| import boto3 | |
| import logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def load_csv(url): | |
| """ | |
| :param url: path to the csv file. the csv file should have the following format: | |
| :return: pandas dataframe | |
| """ | |
| df = pd.read_csv(url) | |
| return df | |
| def input_address_return_parcelle(address, postcode, google_api_key): | |
| address_response = requests.get(f"https://api-adresse.data.gouv.fr/search/?q={address + ' ' + str(postcode)}") | |
| address_response_google = requests.get( | |
| f"https://maps.googleapis.com/maps/api/geocode/json?address={address + ' ' + str(postcode)}&key={google_api_key}") | |
| try: | |
| coords = address_response_google.json()['results'][0]['geometry']['location'] | |
| geom = {'type': 'Point', 'coordinates': [coords['lng'], coords['lat']]} | |
| except: | |
| logging.error("Unable to geocode this address") | |
| return None, None, None, None, None, None | |
| parcelle_response = requests.get(f"https://apicarto.ign.fr/api/cadastre/parcelle?geom={json.dumps(geom)}") | |
| try: | |
| parcelle_response.json()['features'][0] | |
| except: | |
| logging.error("Unable to geocode this address") | |
| return None, None, None, None, None, None | |
| return parcelle_response.json()['features'][0]['properties']['idu'], \ | |
| parcelle_response.json()['features'][0]['properties']['code_dep'], \ | |
| parcelle_response.json()['features'][0]['properties']['code_insee'], \ | |
| address_response.json()['features'][0]['properties']['citycode'], coords['lng'], coords['lat'] | |
| def connect_to_s3(): | |
| s3client = boto3.client('s3', region_name='eu-north-1', aws_access_key_id="AKIA2UC3CAYSP2MFO3A3", | |
| aws_secret_access_key="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") | |
| return s3client | |
| def load_and_concatenate_dvf_in_city(department, city_code,lower_date,path_to_root_dvf): | |
| s3client = connect_to_s3() | |
| if department is None or city_code is None: | |
| logging.error("Could not geocode this property and retrieve transactions") | |
| return None | |
| list_dvf = [] | |
| for year in range(lower_date,2024): | |
| #list_deps = [x[1] for x in os.walk(f"{path_to_root_dvf}/{year}/communes/")] | |
| try: | |
| contents = s3client.list_objects(Bucket='khome-fraud2', Delimiter="/", Prefix=f"csv/{year}/communes/", MaxKeys=10000) | |
| list_deps = [x['Prefix'].split('/')[3] for x in contents['CommonPrefixes']] | |
| except: | |
| logging.warning(f"Year {year} not found in directory") | |
| continue | |
| if department not in list_deps: | |
| logging.warning(f"department {department} not in directory for year {year}") | |
| continue | |
| else: | |
| contents_communes = s3client.list_objects_v2(Bucket='khome-fraud2', Prefix=f"csv/{year}/communes/{department}/", | |
| MaxKeys=10000) | |
| #list_communes = [y[:-4] for x in os.walk(f"{path_to_root_dvf}/{year}/communes/{department}/") for y in x[2]] | |
| list_communes = [x['Key'].split('/')[-1].split('.')[0] for x in contents_communes['Contents']] | |
| if city_code not in list_communes: | |
| logging.warning(f"city {city_code} not in directory for year {year}") | |
| continue | |
| else: | |
| logging.info(f"Appending transactions for year {year} and city {city_code}") | |
| response = s3client.get_object(Bucket='khome-fraud2', Key=f"csv/{year}/communes/{department}/{city_code}.csv") | |
| body = response['Body'] | |
| csv_string = body.read().decode('utf-8') | |
| dvf_commune = pd.read_csv(StringIO(csv_string)) | |
| list_dvf.append(dvf_commune) | |
| if len(list_dvf) == 0: | |
| logging.warning("Did not find any transactions in dvf database in this city") | |
| return None | |
| return pd.concat(list_dvf, axis=0) | |
| # creer une geometrie autour du point | |
| def get_rows_within_radius(df, latitude, longitude, radius): | |
| """ | |
| Filters rows in a GeoDataFrame that are within a given radius around a point. | |
| Parameters: | |
| df (GeoDataFrame): GeoDataFrame with geometry column containing Point objects. | |
| latitude (float): Latitude of the center point. | |
| longitude (float): Longitude of the center point. | |
| radius (float): Radius in meters. | |
| Returns: | |
| GeoDataFrame: Filtered GeoDataFrame with rows within the radius. | |
| """ | |
| # Convert df to gdf | |
| df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326") | |
| # Define the center point | |
| center_point = Point(longitude, latitude) | |
| # Create a projection for distance calculations | |
| proj_wgs84 = pyproj.Proj(init='epsg:4326') | |
| aeqd_proj = pyproj.Proj(proj="aeqd", lat_0=latitude, lon_0=longitude) | |
| project = pyproj.Transformer.from_proj(proj_wgs84, aeqd_proj, always_xy=True).transform | |
| # Buffer the point by the radius (in meters) | |
| buffer = transform(project, center_point).buffer(radius) | |
| buffer = transform(pyproj.Transformer.from_proj(aeqd_proj, proj_wgs84, always_xy=True).transform, buffer) | |
| # Filter the GeoDataFrame to keep only rows within the buffer | |
| return df[df.geometry.within(buffer)] | |
| def return_list_of_all_transactions(df, parcelle, longitude, latitude): | |
| # Convert dates to datetime | |
| if df is None: | |
| logging.error('No data found returning None') | |
| return None | |
| if parcelle not in df.id_parcelle.unique(): | |
| logging.warning( | |
| f'Parcel {parcelle} not found in transactions after {2014} looking for transactions within a {30}m radius') | |
| suspects = get_rows_within_radius(df, latitude, longitude, 30).drop(columns='geometry') | |
| suspects = suspects[suspects.type_local.isin(['Appartement', 'Maison'])].copy() | |
| if suspects.shape[0] == 0: | |
| logging.info('No transactions recorded in DVF for this property') | |
| else: | |
| suspects = df[(df.id_parcelle == parcelle) & (df.type_local.isin(['Appartement', 'Maison']))].copy() | |
| if suspects.shape[0] == 0: | |
| logging.info('No transactions recorded in DVF for this property') | |
| logging.info(f"Parcel {parcelle} found in DVF, detecting fraud at parcel level") | |
| return suspects | |
| def return_list_of_suspects(df, parcelle, date_transaction, surface, room_count, code_type_local, longitude, latitude): | |
| """ | |
| Searches for transactions made later than the transaction on the same parcel | |
| or within a 10m radius if the parcel does not exist. | |
| Filters the transactions matching the characteristics of the property. | |
| """ | |
| # Convert dates to datetime | |
| if df is None: | |
| logging.error('No data found returning None') | |
| return None | |
| df.date_mutation = pd.to_datetime(df.date_mutation) | |
| if isinstance(date_transaction,str): | |
| date_transaction = datetime.strptime(date_transaction, "%m/%d/%Y") | |
| # Filter properties sold from the same parcel or within a 10m radius | |
| if parcelle not in df.id_parcelle.unique(): | |
| logging.warning( | |
| f'Parcel {parcelle} not found in transactions after {date_transaction.year}. Looking for transactions within a {20}m radius') | |
| suspects = get_rows_within_radius(df, latitude, longitude, 30).drop(columns='geometry') | |
| suspects = suspects[suspects.type_local.isin(['Appartement', 'Maison'])].copy() | |
| if suspects.shape[0] == 0: | |
| logging.info('No transactions recorded in DVF for this property') | |
| else: | |
| suspects = df[(df.id_parcelle == parcelle) & (df.type_local.isin(['Appartement', 'Maison']))].copy() | |
| if suspects.shape[0] == 0: | |
| logging.info('No transactions recorded in DVF for this property') | |
| logging.info(f"Parcel {parcelle} found in DVF, detecting fraud at parcel level") | |
| # Filter the properties with matching characteristics to get a list of suspicious transactions | |
| surface = float(surface) | |
| room_count = float(room_count) | |
| suspects = suspects[ | |
| (suspects.surface_reelle_bati >= surface * 0.9) & (suspects.surface_reelle_bati <= surface * 1.1) | |
| & (suspects.nombre_pieces_principales <= room_count + 1) & ( | |
| suspects.nombre_pieces_principales >= room_count - 1) & | |
| (suspects.date_mutation >= date_transaction)].copy() | |
| return suspects | |
| def compute_distance(x, row): | |
| """ | |
| Compute Euclidean distance between property characteristics | |
| """ | |
| y = np.array([row.surface_reelle_bati, row.nombre_pieces_principales, row.code_type_local]) | |
| return np.sum((x - y) ** 2) | |
| def get_suspects(address, postcode, surface, room_count, type_local, transaction_date, google_api_key, path_to_root_dvf): | |
| """ | |
| Function geocoding the adress, loading transactions in city, | |
| returning the suspicious transactions, evaluating the probability of fraud | |
| and returning the result. | |
| """ | |
| logging.info( | |
| f"Detecting fraudulous activity for transaction: {address}, {postcode}, on {transaction_date}, for a surface of {surface} and {room_count} rooms") | |
| parcel, dept, insee, citycode, longitude, latitude = input_address_return_parcelle(address, postcode, google_api_key) | |
| if isinstance(transaction_date,str): | |
| date_of_sale = datetime.strptime(transaction_date, "%m/%d/%Y") | |
| else: | |
| date_of_sale = transaction_date | |
| df = load_and_concatenate_dvf_in_city(dept, citycode, date_of_sale.year, path_to_root_dvf) | |
| if df is None: | |
| return ['Unable to evaluate this property', 0] | |
| suspects = return_list_of_suspects(df, parcel, transaction_date, surface, room_count, type_local, longitude, | |
| latitude) | |
| if suspects is None: | |
| return ['Unable to evaluate this property', 0] | |
| if suspects[suspects.date_mutation > transaction_date].shape[0] == 0: | |
| logging.info('No fraud detected from DVF data') | |
| return [suspects, 0] | |
| dist_vector = np.array([surface, room_count, type_local]) | |
| suspects['distance'] = suspects.apply(lambda x: compute_distance(dist_vector, x), axis=1) | |
| suspects['probabilite_de_fraude'] = suspects.distance.apply(lambda x: 1 / (1 + np.exp(x - 5))) | |
| suspects.date_mutation = pd.to_datetime(suspects.date_mutation) | |
| suspects = suspects[suspects.date_mutation >= transaction_date].copy() | |
| fraud_proba_suspects = suspects[suspects.date_mutation > transaction_date].copy().probabilite_de_fraude.max() | |
| suspects["date_mutation"] = pd.to_datetime(suspects["date_mutation"]).astype(str) | |
| return [suspects.loc[:, ['id_mutation', 'date_mutation', 'surface_reelle_bati', | |
| 'nombre_pieces_principales', 'type_local', | |
| 'valeur_fonciere', 'adresse_numero', 'adresse_nom_voie', | |
| 'nom_commune']].to_json(orient='records'), fraud_proba_suspects] | |
| def get_suspects_address_only(address, postcode, google_api_key, path_to_root_dvf): | |
| logging.info(f"Getting timeline for transactions located at: {address} in DVF database") | |
| parcel, dept, insee, citycode, longitude, latitude = input_address_return_parcelle(address, postcode, | |
| google_api_key) | |
| df = load_and_concatenate_dvf_in_city(dept, citycode, 2014, path_to_root_dvf) | |
| if df is None: | |
| return ['Unable to evaluate this property', 0] | |
| suspects = return_list_of_all_transactions(df, parcel, longitude,latitude) | |
| if suspects is None: | |
| return ['Unable to evaluate this property', 0] | |
| suspects.date_mutation = pd.to_datetime(suspects.date_mutation).astype(str) | |
| return suspects.loc[:, ['id_mutation', 'date_mutation', 'surface_reelle_bati', | |
| 'nombre_pieces_principales', 'type_local', | |
| 'valeur_fonciere', 'adresse_numero','adresse_nom_voie', | |
| 'nom_commune']].to_json(orient='records') | |
| def get_fraud_from_csv(data, address_only, google_api_key, path_to_root_dvf="/home/aristote/Downloads/csv"): | |
| if address_only: | |
| data['info_fraude'] = data.loc[:,['addresse_complete','code_postal']].apply(lambda x: | |
| get_suspects_address_only(x.addresse_complete, | |
| x.code_postal, | |
| google_api_key, | |
| path_to_root_dvf), | |
| axis=1, result_type='expand') | |
| return data | |
| data['type_local'] = data.isHouse.apply(lambda x : 1 if x is False else 2) | |
| data.surface = data.surface.astype('float') | |
| data.room_count = data.room_count.astype('int') | |
| data[['info_fraude','proba_fraude']] = data.loc[:,['addresse_complete','code_postal','surface','room_count','type_local','date_mutation']].apply(lambda x: | |
| get_suspects(x.addresse_complete, x.code_postal, x.surface, x.room_count, | |
| x.type_local,x.date_mutation, google_api_key, path_to_root_dvf), axis=1, result_type='expand') | |
| return data | |
| def process_csv_text(temp_file,address_only,api_key): | |
| df = pd.read_csv(BytesIO(temp_file), sep=",") | |
| data = get_fraud_from_csv(df, address_only, google_api_key=api_key) | |
| data.to_csv("output.csv") | |
| return gr.File(value="output.csv", visible=True) | |
| if __name__ == "__main__": | |
| desc = ("## Cette application vous permet a partir d'une liste de transactions immobilieres sous forme de fichier de csv,\ | |
| de savoir si ce bien a ete revendu a partir de la base de donnees DVF. Si seule l'addresse et le code postal sont disponibles, cocher Addresse seulement.") | |
| long_desc = ("""Entree: Fichier csv contenant les colonnes suivantes: \n | |
| isHouse (Booleen): si le bien est une maison ou pas True/False (cette application fonctionne seulement pour les maisons et les appartements). Dans le cas ou Addresse seuelement n'est pas coche\n | |
| addresse_complete (Texte): numero, rue, nom de ville. Champ obligatoire \n | |
| code_postal (Integer): code postal. Champ obligatoire \n | |
| surface (Float): surface du bien. Dans le cas ou Addresse seuelement n'est pas coche\n | |
| room_count (Integer): nombre de pieces. Dans le cas ou Addresse seuelement n'est pas coche \n | |
| date_mutation (datetime): format %m/%d/%Y. Dans le cas ou Addresse seuelement n'est pas coche | |
| Attention!!! Si le format et le nom des colonnes n'est pas respecte cela risque de causer des erreurs. S'assurer que le fichier est au format csv avec separateur virgule. | |
| """) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Detection de transactions frauduleuses") | |
| gr.Markdown(desc) | |
| gr.Markdown(long_desc) | |
| upload_button = gr.File(label="Input Data", interactive=True,file_types=['.csv'],type="binary") | |
| api_key = gr.Textbox(label="Cle API Google", info="Veuillez entrer votre cle API Google api") | |
| is_address = gr.Checkbox(label="Addresse seulement?", info="Addresse Seulement?") | |
| output_file = gr.File(label="Output File", file_types=['.','','.csv'], type="binary") | |
| download_button = gr.Button("Obtenir les resultats") | |
| download_button.click(process_csv_text,inputs=[upload_button,is_address,api_key], outputs=[output_file]) | |
| demo.launch(share=True) | |