import json import os import geopandas as gpd import numpy as np import pandas as pd import pyproj import requests from shapely.geometry import Point from shapely.ops import transform import gradio as gr from io import StringIO, BytesIO from pandas import ExcelFile from datetime import datetime import boto3 import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def load_csv(url): """ :param url: path to the csv file. the csv file should have the following format: :return: pandas dataframe """ df = pd.read_csv(url) return df def input_address_return_parcelle(address, postcode, google_api_key): address_response = requests.get(f"https://api-adresse.data.gouv.fr/search/?q={address + ' ' + str(postcode)}") address_response_google = requests.get( f"https://maps.googleapis.com/maps/api/geocode/json?address={address + ' ' + str(postcode)}&key={google_api_key}") try: coords = address_response_google.json()['results'][0]['geometry']['location'] geom = {'type': 'Point', 'coordinates': [coords['lng'], coords['lat']]} except: logging.error("Unable to geocode this address") return None, None, None, None, None, None parcelle_response = requests.get(f"https://apicarto.ign.fr/api/cadastre/parcelle?geom={json.dumps(geom)}") try: parcelle_response.json()['features'][0] except: logging.error("Unable to geocode this address") return None, None, None, None, None, None return parcelle_response.json()['features'][0]['properties']['idu'], \ parcelle_response.json()['features'][0]['properties']['code_dep'], \ parcelle_response.json()['features'][0]['properties']['code_insee'], \ address_response.json()['features'][0]['properties']['citycode'], coords['lng'], coords['lat'] def connect_to_s3(): s3client = boto3.client('s3', region_name='eu-north-1', aws_access_key_id="AKIA2UC3CAYSP2MFO3A3", aws_secret_access_key="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") return s3client def load_and_concatenate_dvf_in_city(department, city_code,lower_date,path_to_root_dvf): s3client = connect_to_s3() if department is None or city_code is None: logging.error("Could not geocode this property and retrieve transactions") return None list_dvf = [] for year in range(lower_date,2024): #list_deps = [x[1] for x in os.walk(f"{path_to_root_dvf}/{year}/communes/")] try: contents = s3client.list_objects(Bucket='khome-fraud2', Delimiter="/", Prefix=f"csv/{year}/communes/", MaxKeys=10000) list_deps = [x['Prefix'].split('/')[3] for x in contents['CommonPrefixes']] except: logging.warning(f"Year {year} not found in directory") continue if department not in list_deps: logging.warning(f"department {department} not in directory for year {year}") continue else: contents_communes = s3client.list_objects_v2(Bucket='khome-fraud2', Prefix=f"csv/{year}/communes/{department}/", MaxKeys=10000) #list_communes = [y[:-4] for x in os.walk(f"{path_to_root_dvf}/{year}/communes/{department}/") for y in x[2]] list_communes = [x['Key'].split('/')[-1].split('.')[0] for x in contents_communes['Contents']] if city_code not in list_communes: logging.warning(f"city {city_code} not in directory for year {year}") continue else: logging.info(f"Appending transactions for year {year} and city {city_code}") response = s3client.get_object(Bucket='khome-fraud2', Key=f"csv/{year}/communes/{department}/{city_code}.csv") body = response['Body'] csv_string = body.read().decode('utf-8') dvf_commune = pd.read_csv(StringIO(csv_string)) list_dvf.append(dvf_commune) if len(list_dvf) == 0: logging.warning("Did not find any transactions in dvf database in this city") return None return pd.concat(list_dvf, axis=0) # creer une geometrie autour du point def get_rows_within_radius(df, latitude, longitude, radius): """ Filters rows in a GeoDataFrame that are within a given radius around a point. Parameters: df (GeoDataFrame): GeoDataFrame with geometry column containing Point objects. latitude (float): Latitude of the center point. longitude (float): Longitude of the center point. radius (float): Radius in meters. Returns: GeoDataFrame: Filtered GeoDataFrame with rows within the radius. """ # Convert df to gdf df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326") # Define the center point center_point = Point(longitude, latitude) # Create a projection for distance calculations proj_wgs84 = pyproj.Proj(init='epsg:4326') aeqd_proj = pyproj.Proj(proj="aeqd", lat_0=latitude, lon_0=longitude) project = pyproj.Transformer.from_proj(proj_wgs84, aeqd_proj, always_xy=True).transform # Buffer the point by the radius (in meters) buffer = transform(project, center_point).buffer(radius) buffer = transform(pyproj.Transformer.from_proj(aeqd_proj, proj_wgs84, always_xy=True).transform, buffer) # Filter the GeoDataFrame to keep only rows within the buffer return df[df.geometry.within(buffer)] def return_list_of_all_transactions(df, parcelle, longitude, latitude): # Convert dates to datetime if df is None: logging.error('No data found returning None') return None if parcelle not in df.id_parcelle.unique(): logging.warning( f'Parcel {parcelle} not found in transactions after {2014} looking for transactions within a {30}m radius') suspects = get_rows_within_radius(df, latitude, longitude, 30).drop(columns='geometry') suspects = suspects[suspects.type_local.isin(['Appartement', 'Maison'])].copy() if suspects.shape[0] == 0: logging.info('No transactions recorded in DVF for this property') else: suspects = df[(df.id_parcelle == parcelle) & (df.type_local.isin(['Appartement', 'Maison']))].copy() if suspects.shape[0] == 0: logging.info('No transactions recorded in DVF for this property') logging.info(f"Parcel {parcelle} found in DVF, detecting fraud at parcel level") return suspects def return_list_of_suspects(df, parcelle, date_transaction, surface, room_count, code_type_local, longitude, latitude): """ Searches for transactions made later than the transaction on the same parcel or within a 10m radius if the parcel does not exist. Filters the transactions matching the characteristics of the property. """ # Convert dates to datetime if df is None: logging.error('No data found returning None') return None df.date_mutation = pd.to_datetime(df.date_mutation) if isinstance(date_transaction,str): date_transaction = datetime.strptime(date_transaction, "%m/%d/%Y") # Filter properties sold from the same parcel or within a 10m radius if parcelle not in df.id_parcelle.unique(): logging.warning( f'Parcel {parcelle} not found in transactions after {date_transaction.year}. Looking for transactions within a {20}m radius') suspects = get_rows_within_radius(df, latitude, longitude, 30).drop(columns='geometry') suspects = suspects[suspects.type_local.isin(['Appartement', 'Maison'])].copy() if suspects.shape[0] == 0: logging.info('No transactions recorded in DVF for this property') else: suspects = df[(df.id_parcelle == parcelle) & (df.type_local.isin(['Appartement', 'Maison']))].copy() if suspects.shape[0] == 0: logging.info('No transactions recorded in DVF for this property') logging.info(f"Parcel {parcelle} found in DVF, detecting fraud at parcel level") # Filter the properties with matching characteristics to get a list of suspicious transactions surface = float(surface) room_count = float(room_count) suspects = suspects[ (suspects.surface_reelle_bati >= surface * 0.9) & (suspects.surface_reelle_bati <= surface * 1.1) & (suspects.nombre_pieces_principales <= room_count + 1) & ( suspects.nombre_pieces_principales >= room_count - 1) & (suspects.date_mutation >= date_transaction)].copy() return suspects def compute_distance(x, row): """ Compute Euclidean distance between property characteristics """ y = np.array([row.surface_reelle_bati, row.nombre_pieces_principales, row.code_type_local]) return np.sum((x - y) ** 2) def get_suspects(address, postcode, surface, room_count, type_local, transaction_date, google_api_key, path_to_root_dvf): """ Function geocoding the adress, loading transactions in city, returning the suspicious transactions, evaluating the probability of fraud and returning the result. """ logging.info( f"Detecting fraudulous activity for transaction: {address}, {postcode}, on {transaction_date}, for a surface of {surface} and {room_count} rooms") parcel, dept, insee, citycode, longitude, latitude = input_address_return_parcelle(address, postcode, google_api_key) if isinstance(transaction_date,str): date_of_sale = datetime.strptime(transaction_date, "%m/%d/%Y") else: date_of_sale = transaction_date df = load_and_concatenate_dvf_in_city(dept, citycode, date_of_sale.year, path_to_root_dvf) if df is None: return ['Unable to evaluate this property', 0] suspects = return_list_of_suspects(df, parcel, transaction_date, surface, room_count, type_local, longitude, latitude) if suspects is None: return ['Unable to evaluate this property', 0] if suspects[suspects.date_mutation > transaction_date].shape[0] == 0: logging.info('No fraud detected from DVF data') return [suspects, 0] dist_vector = np.array([surface, room_count, type_local]) suspects['distance'] = suspects.apply(lambda x: compute_distance(dist_vector, x), axis=1) suspects['probabilite_de_fraude'] = suspects.distance.apply(lambda x: 1 / (1 + np.exp(x - 5))) suspects.date_mutation = pd.to_datetime(suspects.date_mutation) suspects = suspects[suspects.date_mutation >= transaction_date].copy() fraud_proba_suspects = suspects[suspects.date_mutation > transaction_date].copy().probabilite_de_fraude.max() suspects["date_mutation"] = pd.to_datetime(suspects["date_mutation"]).astype(str) return [suspects.loc[:, ['id_mutation', 'date_mutation', 'surface_reelle_bati', 'nombre_pieces_principales', 'type_local', 'valeur_fonciere', 'adresse_numero', 'adresse_nom_voie', 'nom_commune']].to_json(orient='records'), fraud_proba_suspects] def get_suspects_address_only(address, postcode, google_api_key, path_to_root_dvf): logging.info(f"Getting timeline for transactions located at: {address} in DVF database") parcel, dept, insee, citycode, longitude, latitude = input_address_return_parcelle(address, postcode, google_api_key) df = load_and_concatenate_dvf_in_city(dept, citycode, 2014, path_to_root_dvf) if df is None: return ['Unable to evaluate this property', 0] suspects = return_list_of_all_transactions(df, parcel, longitude,latitude) if suspects is None: return ['Unable to evaluate this property', 0] suspects.date_mutation = pd.to_datetime(suspects.date_mutation).astype(str) return suspects.loc[:, ['id_mutation', 'date_mutation', 'surface_reelle_bati', 'nombre_pieces_principales', 'type_local', 'valeur_fonciere', 'adresse_numero','adresse_nom_voie', 'nom_commune']].to_json(orient='records') def get_fraud_from_csv(data, address_only, google_api_key, path_to_root_dvf="/home/aristote/Downloads/csv"): if address_only: data['info_fraude'] = data.loc[:,['addresse_complete','code_postal']].apply(lambda x: get_suspects_address_only(x.addresse_complete, x.code_postal, google_api_key, path_to_root_dvf), axis=1, result_type='expand') return data data['type_local'] = data.isHouse.apply(lambda x : 1 if x is False else 2) data.surface = data.surface.astype('float') data.room_count = data.room_count.astype('int') data[['info_fraude','proba_fraude']] = data.loc[:,['addresse_complete','code_postal','surface','room_count','type_local','date_mutation']].apply(lambda x: get_suspects(x.addresse_complete, x.code_postal, x.surface, x.room_count, x.type_local,x.date_mutation, google_api_key, path_to_root_dvf), axis=1, result_type='expand') return data def process_csv_text(temp_file,address_only,api_key): df = pd.read_csv(BytesIO(temp_file), sep=",") data = get_fraud_from_csv(df, address_only, google_api_key=api_key) data.to_csv("output.csv") return gr.File(value="output.csv", visible=True) if __name__ == "__main__": desc = ("## Cette application vous permet a partir d'une liste de transactions immobilieres sous forme de fichier de csv,\ de savoir si ce bien a ete revendu a partir de la base de donnees DVF. Si seule l'addresse et le code postal sont disponibles, cocher Addresse seulement.") long_desc = ("""Entree: Fichier csv contenant les colonnes suivantes: \n isHouse (Booleen): si le bien est une maison ou pas True/False (cette application fonctionne seulement pour les maisons et les appartements). Dans le cas ou Addresse seuelement n'est pas coche\n addresse_complete (Texte): numero, rue, nom de ville. Champ obligatoire \n code_postal (Integer): code postal. Champ obligatoire \n surface (Float): surface du bien. Dans le cas ou Addresse seuelement n'est pas coche\n room_count (Integer): nombre de pieces. Dans le cas ou Addresse seuelement n'est pas coche \n date_mutation (datetime): format %m/%d/%Y. Dans le cas ou Addresse seuelement n'est pas coche Attention!!! Si le format et le nom des colonnes n'est pas respecte cela risque de causer des erreurs. S'assurer que le fichier est au format csv avec separateur virgule. """) with gr.Blocks() as demo: gr.Markdown("# Detection de transactions frauduleuses") gr.Markdown(desc) gr.Markdown(long_desc) upload_button = gr.File(label="Input Data", interactive=True,file_types=['.csv'],type="binary") api_key = gr.Textbox(label="Cle API Google", info="Veuillez entrer votre cle API Google api") is_address = gr.Checkbox(label="Addresse seulement?", info="Addresse Seulement?") output_file = gr.File(label="Output File", file_types=['.','','.csv'], type="binary") download_button = gr.Button("Obtenir les resultats") download_button.click(process_csv_text,inputs=[upload_button,is_address,api_key], outputs=[output_file]) demo.launch(share=True)