Koaris's picture
Update app.py
6b14f10 verified
import json
import os
import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import requests
from shapely.geometry import Point
from shapely.ops import transform
import gradio as gr
from io import StringIO, BytesIO
from pandas import ExcelFile
from datetime import datetime
import boto3
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def load_csv(url):
"""
:param url: path to the csv file. the csv file should have the following format:
:return: pandas dataframe
"""
df = pd.read_csv(url)
return df
def input_address_return_parcelle(address, postcode, google_api_key):
address_response = requests.get(f"https://api-adresse.data.gouv.fr/search/?q={address + ' ' + str(postcode)}")
address_response_google = requests.get(
f"https://maps.googleapis.com/maps/api/geocode/json?address={address + ' ' + str(postcode)}&key={google_api_key}")
try:
coords = address_response_google.json()['results'][0]['geometry']['location']
geom = {'type': 'Point', 'coordinates': [coords['lng'], coords['lat']]}
except:
logging.error("Unable to geocode this address")
return None, None, None, None, None, None
parcelle_response = requests.get(f"https://apicarto.ign.fr/api/cadastre/parcelle?geom={json.dumps(geom)}")
try:
parcelle_response.json()['features'][0]
except:
logging.error("Unable to geocode this address")
return None, None, None, None, None, None
return parcelle_response.json()['features'][0]['properties']['idu'], \
parcelle_response.json()['features'][0]['properties']['code_dep'], \
parcelle_response.json()['features'][0]['properties']['code_insee'], \
address_response.json()['features'][0]['properties']['citycode'], coords['lng'], coords['lat']
def connect_to_s3():
s3client = boto3.client('s3', region_name='eu-north-1', aws_access_key_id="AKIA2UC3CAYSP2MFO3A3",
aws_secret_access_key="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")
return s3client
def load_and_concatenate_dvf_in_city(department, city_code,lower_date,path_to_root_dvf):
s3client = connect_to_s3()
if department is None or city_code is None:
logging.error("Could not geocode this property and retrieve transactions")
return None
list_dvf = []
for year in range(lower_date,2024):
#list_deps = [x[1] for x in os.walk(f"{path_to_root_dvf}/{year}/communes/")]
try:
contents = s3client.list_objects(Bucket='khome-fraud2', Delimiter="/", Prefix=f"csv/{year}/communes/", MaxKeys=10000)
list_deps = [x['Prefix'].split('/')[3] for x in contents['CommonPrefixes']]
except:
logging.warning(f"Year {year} not found in directory")
continue
if department not in list_deps:
logging.warning(f"department {department} not in directory for year {year}")
continue
else:
contents_communes = s3client.list_objects_v2(Bucket='khome-fraud2', Prefix=f"csv/{year}/communes/{department}/",
MaxKeys=10000)
#list_communes = [y[:-4] for x in os.walk(f"{path_to_root_dvf}/{year}/communes/{department}/") for y in x[2]]
list_communes = [x['Key'].split('/')[-1].split('.')[0] for x in contents_communes['Contents']]
if city_code not in list_communes:
logging.warning(f"city {city_code} not in directory for year {year}")
continue
else:
logging.info(f"Appending transactions for year {year} and city {city_code}")
response = s3client.get_object(Bucket='khome-fraud2', Key=f"csv/{year}/communes/{department}/{city_code}.csv")
body = response['Body']
csv_string = body.read().decode('utf-8')
dvf_commune = pd.read_csv(StringIO(csv_string))
list_dvf.append(dvf_commune)
if len(list_dvf) == 0:
logging.warning("Did not find any transactions in dvf database in this city")
return None
return pd.concat(list_dvf, axis=0)
# creer une geometrie autour du point
def get_rows_within_radius(df, latitude, longitude, radius):
"""
Filters rows in a GeoDataFrame that are within a given radius around a point.
Parameters:
df (GeoDataFrame): GeoDataFrame with geometry column containing Point objects.
latitude (float): Latitude of the center point.
longitude (float): Longitude of the center point.
radius (float): Radius in meters.
Returns:
GeoDataFrame: Filtered GeoDataFrame with rows within the radius.
"""
# Convert df to gdf
df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326")
# Define the center point
center_point = Point(longitude, latitude)
# Create a projection for distance calculations
proj_wgs84 = pyproj.Proj(init='epsg:4326')
aeqd_proj = pyproj.Proj(proj="aeqd", lat_0=latitude, lon_0=longitude)
project = pyproj.Transformer.from_proj(proj_wgs84, aeqd_proj, always_xy=True).transform
# Buffer the point by the radius (in meters)
buffer = transform(project, center_point).buffer(radius)
buffer = transform(pyproj.Transformer.from_proj(aeqd_proj, proj_wgs84, always_xy=True).transform, buffer)
# Filter the GeoDataFrame to keep only rows within the buffer
return df[df.geometry.within(buffer)]
def return_list_of_all_transactions(df, parcelle, longitude, latitude):
# Convert dates to datetime
if df is None:
logging.error('No data found returning None')
return None
if parcelle not in df.id_parcelle.unique():
logging.warning(
f'Parcel {parcelle} not found in transactions after {2014} looking for transactions within a {30}m radius')
suspects = get_rows_within_radius(df, latitude, longitude, 30).drop(columns='geometry')
suspects = suspects[suspects.type_local.isin(['Appartement', 'Maison'])].copy()
if suspects.shape[0] == 0:
logging.info('No transactions recorded in DVF for this property')
else:
suspects = df[(df.id_parcelle == parcelle) & (df.type_local.isin(['Appartement', 'Maison']))].copy()
if suspects.shape[0] == 0:
logging.info('No transactions recorded in DVF for this property')
logging.info(f"Parcel {parcelle} found in DVF, detecting fraud at parcel level")
return suspects
def return_list_of_suspects(df, parcelle, date_transaction, surface, room_count, code_type_local, longitude, latitude):
"""
Searches for transactions made later than the transaction on the same parcel
or within a 10m radius if the parcel does not exist.
Filters the transactions matching the characteristics of the property.
"""
# Convert dates to datetime
if df is None:
logging.error('No data found returning None')
return None
df.date_mutation = pd.to_datetime(df.date_mutation)
if isinstance(date_transaction,str):
date_transaction = datetime.strptime(date_transaction, "%m/%d/%Y")
# Filter properties sold from the same parcel or within a 10m radius
if parcelle not in df.id_parcelle.unique():
logging.warning(
f'Parcel {parcelle} not found in transactions after {date_transaction.year}. Looking for transactions within a {20}m radius')
suspects = get_rows_within_radius(df, latitude, longitude, 30).drop(columns='geometry')
suspects = suspects[suspects.type_local.isin(['Appartement', 'Maison'])].copy()
if suspects.shape[0] == 0:
logging.info('No transactions recorded in DVF for this property')
else:
suspects = df[(df.id_parcelle == parcelle) & (df.type_local.isin(['Appartement', 'Maison']))].copy()
if suspects.shape[0] == 0:
logging.info('No transactions recorded in DVF for this property')
logging.info(f"Parcel {parcelle} found in DVF, detecting fraud at parcel level")
# Filter the properties with matching characteristics to get a list of suspicious transactions
surface = float(surface)
room_count = float(room_count)
suspects = suspects[
(suspects.surface_reelle_bati >= surface * 0.9) & (suspects.surface_reelle_bati <= surface * 1.1)
& (suspects.nombre_pieces_principales <= room_count + 1) & (
suspects.nombre_pieces_principales >= room_count - 1) &
(suspects.date_mutation >= date_transaction)].copy()
return suspects
def compute_distance(x, row):
"""
Compute Euclidean distance between property characteristics
"""
y = np.array([row.surface_reelle_bati, row.nombre_pieces_principales, row.code_type_local])
return np.sum((x - y) ** 2)
def get_suspects(address, postcode, surface, room_count, type_local, transaction_date, google_api_key, path_to_root_dvf):
"""
Function geocoding the adress, loading transactions in city,
returning the suspicious transactions, evaluating the probability of fraud
and returning the result.
"""
logging.info(
f"Detecting fraudulous activity for transaction: {address}, {postcode}, on {transaction_date}, for a surface of {surface} and {room_count} rooms")
parcel, dept, insee, citycode, longitude, latitude = input_address_return_parcelle(address, postcode, google_api_key)
if isinstance(transaction_date,str):
date_of_sale = datetime.strptime(transaction_date, "%m/%d/%Y")
else:
date_of_sale = transaction_date
df = load_and_concatenate_dvf_in_city(dept, citycode, date_of_sale.year, path_to_root_dvf)
if df is None:
return ['Unable to evaluate this property', 0]
suspects = return_list_of_suspects(df, parcel, transaction_date, surface, room_count, type_local, longitude,
latitude)
if suspects is None:
return ['Unable to evaluate this property', 0]
if suspects[suspects.date_mutation > transaction_date].shape[0] == 0:
logging.info('No fraud detected from DVF data')
return [suspects, 0]
dist_vector = np.array([surface, room_count, type_local])
suspects['distance'] = suspects.apply(lambda x: compute_distance(dist_vector, x), axis=1)
suspects['probabilite_de_fraude'] = suspects.distance.apply(lambda x: 1 / (1 + np.exp(x - 5)))
suspects.date_mutation = pd.to_datetime(suspects.date_mutation)
suspects = suspects[suspects.date_mutation >= transaction_date].copy()
fraud_proba_suspects = suspects[suspects.date_mutation > transaction_date].copy().probabilite_de_fraude.max()
suspects["date_mutation"] = pd.to_datetime(suspects["date_mutation"]).astype(str)
return [suspects.loc[:, ['id_mutation', 'date_mutation', 'surface_reelle_bati',
'nombre_pieces_principales', 'type_local',
'valeur_fonciere', 'adresse_numero', 'adresse_nom_voie',
'nom_commune']].to_json(orient='records'), fraud_proba_suspects]
def get_suspects_address_only(address, postcode, google_api_key, path_to_root_dvf):
logging.info(f"Getting timeline for transactions located at: {address} in DVF database")
parcel, dept, insee, citycode, longitude, latitude = input_address_return_parcelle(address, postcode,
google_api_key)
df = load_and_concatenate_dvf_in_city(dept, citycode, 2014, path_to_root_dvf)
if df is None:
return ['Unable to evaluate this property', 0]
suspects = return_list_of_all_transactions(df, parcel, longitude,latitude)
if suspects is None:
return ['Unable to evaluate this property', 0]
suspects.date_mutation = pd.to_datetime(suspects.date_mutation).astype(str)
return suspects.loc[:, ['id_mutation', 'date_mutation', 'surface_reelle_bati',
'nombre_pieces_principales', 'type_local',
'valeur_fonciere', 'adresse_numero','adresse_nom_voie',
'nom_commune']].to_json(orient='records')
def get_fraud_from_csv(data, address_only, google_api_key, path_to_root_dvf="/home/aristote/Downloads/csv"):
if address_only:
data['info_fraude'] = data.loc[:,['addresse_complete','code_postal']].apply(lambda x:
get_suspects_address_only(x.addresse_complete,
x.code_postal,
google_api_key,
path_to_root_dvf),
axis=1, result_type='expand')
return data
data['type_local'] = data.isHouse.apply(lambda x : 1 if x is False else 2)
data.surface = data.surface.astype('float')
data.room_count = data.room_count.astype('int')
data[['info_fraude','proba_fraude']] = data.loc[:,['addresse_complete','code_postal','surface','room_count','type_local','date_mutation']].apply(lambda x:
get_suspects(x.addresse_complete, x.code_postal, x.surface, x.room_count,
x.type_local,x.date_mutation, google_api_key, path_to_root_dvf), axis=1, result_type='expand')
return data
def process_csv_text(temp_file,address_only,api_key):
df = pd.read_csv(BytesIO(temp_file), sep=",")
data = get_fraud_from_csv(df, address_only, google_api_key=api_key)
data.to_csv("output.csv")
return gr.File(value="output.csv", visible=True)
if __name__ == "__main__":
desc = ("## Cette application vous permet a partir d'une liste de transactions immobilieres sous forme de fichier de csv,\
de savoir si ce bien a ete revendu a partir de la base de donnees DVF. Si seule l'addresse et le code postal sont disponibles, cocher Addresse seulement.")
long_desc = ("""Entree: Fichier csv contenant les colonnes suivantes: \n
isHouse (Booleen): si le bien est une maison ou pas True/False (cette application fonctionne seulement pour les maisons et les appartements). Dans le cas ou Addresse seuelement n'est pas coche\n
addresse_complete (Texte): numero, rue, nom de ville. Champ obligatoire \n
code_postal (Integer): code postal. Champ obligatoire \n
surface (Float): surface du bien. Dans le cas ou Addresse seuelement n'est pas coche\n
room_count (Integer): nombre de pieces. Dans le cas ou Addresse seuelement n'est pas coche \n
date_mutation (datetime): format %m/%d/%Y. Dans le cas ou Addresse seuelement n'est pas coche
Attention!!! Si le format et le nom des colonnes n'est pas respecte cela risque de causer des erreurs. S'assurer que le fichier est au format csv avec separateur virgule.
""")
with gr.Blocks() as demo:
gr.Markdown("# Detection de transactions frauduleuses")
gr.Markdown(desc)
gr.Markdown(long_desc)
upload_button = gr.File(label="Input Data", interactive=True,file_types=['.csv'],type="binary")
api_key = gr.Textbox(label="Cle API Google", info="Veuillez entrer votre cle API Google api")
is_address = gr.Checkbox(label="Addresse seulement?", info="Addresse Seulement?")
output_file = gr.File(label="Output File", file_types=['.','','.csv'], type="binary")
download_button = gr.Button("Obtenir les resultats")
download_button.click(process_csv_text,inputs=[upload_button,is_address,api_key], outputs=[output_file])
demo.launch(share=True)