|
|
import requests |
|
|
import gradio as gr |
|
|
from bs4 import BeautifulSoup |
|
|
import re |
|
|
import pandas as pd |
|
|
import json |
|
|
|
|
|
BASE_URL = "http://www.fertimap.ma" |
|
|
|
|
|
|
|
|
def get_location_info(lon, lat): |
|
|
url = f"{BASE_URL}/php/info.phtml" |
|
|
params = {'x': lon, 'y': lat} |
|
|
|
|
|
try: |
|
|
response = requests.get(url, params=params, timeout=15) |
|
|
response.encoding = 'utf-8' |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
with open("location_info.html", "w", encoding="utf-8") as f: |
|
|
f.write(response.text) |
|
|
|
|
|
location_data = { |
|
|
'province_id': None, |
|
|
'province': '', |
|
|
'region': '', |
|
|
'commune': '', |
|
|
'soil_type': '', |
|
|
'texture': '', |
|
|
} |
|
|
|
|
|
full_text = soup.get_text() |
|
|
|
|
|
region_match = re.search(r'Région\s*[:\s]+([^\n\r]+)', full_text, re.IGNORECASE) |
|
|
if region_match: |
|
|
location_data['region'] = region_match.group(1).strip() |
|
|
|
|
|
province_match = re.search(r'Préfecture\s*/\s*Province\s*[:\s]+([^\n\r]+)', full_text, re.IGNORECASE) |
|
|
if province_match: |
|
|
location_data['province'] = province_match.group(1).strip() |
|
|
|
|
|
commune_match = re.search(r'Commune\s*[:\s]+([^\n\r]+)', full_text, re.IGNORECASE) |
|
|
if commune_match: |
|
|
location_data['commune'] = commune_match.group(1).strip() |
|
|
|
|
|
province_id_match = re.search(r'id_province["\s:=]+(\d+)', response.text) |
|
|
if province_id_match: |
|
|
location_data['province_id'] = province_id_match.group(1) |
|
|
|
|
|
province_select = soup.find('select', {'name': re.compile('province', re.I)}) |
|
|
if province_select: |
|
|
selected = province_select.find('option', {'selected': True}) |
|
|
if selected: |
|
|
location_data['province_id'] = selected.get('value') |
|
|
|
|
|
province_input = soup.find('input', {'name': re.compile('province', re.I)}) |
|
|
if province_input: |
|
|
location_data['province_id'] = province_input.get('value') |
|
|
|
|
|
soil_match = re.search(r'Type de sol\s*[:\s]+([^\n\r]+)', full_text, re.IGNORECASE) |
|
|
if soil_match: |
|
|
location_data['soil_type'] = soil_match.group(1).strip() |
|
|
|
|
|
texture_match = re.search(r'Texture\s*[:\s]+([^\n\r]+)', full_text, re.IGNORECASE) |
|
|
if texture_match: |
|
|
location_data['texture'] = texture_match.group(1).strip() |
|
|
|
|
|
return location_data, response.text |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': str(e)}, None |
|
|
|
|
|
|
|
|
def get_fertilizer_recommendation(lon, lat, province_id, culture_id, ph, mo, p, k, rdt): |
|
|
url = f"{BASE_URL}/php/calcul.php" |
|
|
params = { |
|
|
'id_province': province_id, |
|
|
'culture': culture_id, |
|
|
'ph': ph, |
|
|
'mo': mo, |
|
|
'p': p, |
|
|
'k': k, |
|
|
'rdt': rdt, |
|
|
'x_coord': round(lon, 4), |
|
|
'y_coord': round(lat, 4) |
|
|
} |
|
|
|
|
|
try: |
|
|
response = requests.get(url, params=params, timeout=15) |
|
|
response.encoding = 'utf-8' |
|
|
|
|
|
with open("recommendation.html", "w", encoding="utf-8") as f: |
|
|
f.write(response.text) |
|
|
|
|
|
return parse_recommendation_response(response.text) |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': str(e)} |
|
|
|
|
|
|
|
|
def parse_recommendation_response(html): |
|
|
soup = BeautifulSoup(html, 'html.parser') |
|
|
text = soup.get_text() |
|
|
|
|
|
recommendation = { |
|
|
'N': None, |
|
|
'P': None, |
|
|
'K': None, |
|
|
'recommendations': { |
|
|
'regional': [], |
|
|
'selected_yield': [], |
|
|
'generic': [] |
|
|
}, |
|
|
'cost': None, |
|
|
'details': text, |
|
|
} |
|
|
|
|
|
|
|
|
n_match = re.search(r'N\s*\(kg\s*N/ha\)\s*[:\t\s]*(\d+\.?\d*)', text, re.IGNORECASE) |
|
|
p_match = re.search(r'P\s*\(kg\s*P/ha\)\s*[:\t\s]*(\d+\.?\d*)', text, re.IGNORECASE) |
|
|
k_match = re.search(r'K\s*\(kg\s*K/ha\)\s*[:\t\s]*(\d+\.?\d*)', text, re.IGNORECASE) |
|
|
|
|
|
if n_match: |
|
|
recommendation['N'] = float(n_match.group(1)) |
|
|
if p_match: |
|
|
recommendation['P'] = float(p_match.group(1)) |
|
|
if k_match: |
|
|
recommendation['K'] = float(k_match.group(1)) |
|
|
|
|
|
|
|
|
cost_match = re.search(r'(\d+\.?\d*)\s*dh/ha', text, re.IGNORECASE) |
|
|
if cost_match: |
|
|
recommendation['cost'] = float(cost_match.group(1)) |
|
|
|
|
|
|
|
|
sections = {} |
|
|
|
|
|
|
|
|
regional_match = re.search( |
|
|
r'Recommandations basées sur la formule régionale\s*:(.*?)(?=Les recommandations|Recommandations basées sur les formules génériques|pour un cout|$)', |
|
|
text, |
|
|
re.IGNORECASE | re.DOTALL |
|
|
) |
|
|
if regional_match: |
|
|
sections['regional'] = regional_match.group(1) |
|
|
|
|
|
|
|
|
selected_match = re.search( |
|
|
r'Les recommandations pour le rendement s[ée]l[ée]ctionn[ée]\s*:(.*?)(?=Recommandations basées sur les formules génériques|pour un cout|$)', |
|
|
text, |
|
|
re.IGNORECASE | re.DOTALL |
|
|
) |
|
|
if selected_match: |
|
|
sections['selected_yield'] = selected_match.group(1) |
|
|
|
|
|
|
|
|
generic_match = re.search( |
|
|
r'Recommandations basées sur les formules génériques\s*:(.*?)(?=pour un cout|$)', |
|
|
text, |
|
|
re.IGNORECASE | re.DOTALL |
|
|
) |
|
|
if generic_match: |
|
|
sections['generic'] = generic_match.group(1) |
|
|
|
|
|
|
|
|
product_patterns = [ |
|
|
r'(\d+\.?\d*)\s*qx/ha\s+du\s+([^\n\r]+?)(?:\s+comme\s+engrais\s+de\s+(fond|couverture)|\n|$)', |
|
|
r'(\d+\.?\d*)\s*qx/ha\s+d\'([^\n\r]+?)(?:\s+comme\s+engrais\s+de\s+(fond|couverture)|\n|$)', |
|
|
r'(\d+\.?\d*)\s*qx/ha\s+de\s+([^\n\r]+?)(?:\s+comme\s+engrais\s+de\s+(fond|couverture)|\n|$)' |
|
|
] |
|
|
|
|
|
for section_name, section_text in sections.items(): |
|
|
found_products = set() |
|
|
|
|
|
for pattern in product_patterns: |
|
|
matches = re.findall(pattern, section_text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
qty = match[0] |
|
|
product = match[1].strip() |
|
|
product_type = match[2] if len(match) > 2 and match[2] else '' |
|
|
|
|
|
if product and (qty, product) not in found_products: |
|
|
found_products.add((qty, product)) |
|
|
|
|
|
recommendation['recommendations'][section_name].append({ |
|
|
'quantity': float(qty), |
|
|
'name': product, |
|
|
'type': f'engrais de {product_type}' if product_type else '' |
|
|
}) |
|
|
|
|
|
return recommendation |
|
|
|
|
|
|
|
|
def discover_crops_from_website(): |
|
|
try: |
|
|
url = f"{BASE_URL}/php/info.phtml?x=-7.5&y=33.5" |
|
|
response = requests.get(url, timeout=10) |
|
|
response.encoding = 'utf-8' |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
crops = {} |
|
|
|
|
|
for select in soup.find_all('select'): |
|
|
select_name = select.get('name', '').lower() |
|
|
select_id = select.get('id', '').lower() |
|
|
|
|
|
if 'culture' in select_name or 'culture' in select_id: |
|
|
for option in select.find_all('option'): |
|
|
value = option.get('value', '').strip() |
|
|
|
|
|
if option.string: |
|
|
text = option.string.strip() |
|
|
else: |
|
|
texts = [s.strip() for s in option.stripped_strings] |
|
|
text = texts[0] if texts else '' |
|
|
|
|
|
text = text.replace('\n', ' ').strip() |
|
|
|
|
|
if value and text and value.isdigit(): |
|
|
crops[text] = int(value) |
|
|
|
|
|
if crops: |
|
|
crops_sorted = dict(sorted(crops.items(), key=lambda x: x[1])) |
|
|
with open("discovered_crops.json", "w", encoding="utf-8") as f: |
|
|
json.dump(crops_sorted, f, indent=2, ensure_ascii=False) |
|
|
return crops_sorted |
|
|
else: |
|
|
return get_fallback_crop_mapping() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠ Error discovering crops: {e}") |
|
|
return get_fallback_crop_mapping() |
|
|
|
|
|
|
|
|
def get_fallback_crop_mapping(): |
|
|
return { |
|
|
'Blé bour': 1, |
|
|
'Blé irrigué': 2, |
|
|
'Orge bour': 3, |
|
|
'Tournesol': 4, |
|
|
'Colza': 5, |
|
|
'Maïs grains': 6, |
|
|
'Maïs ensilage': 7, |
|
|
'Pomme de terre': 8, |
|
|
'Fraisier': 9, |
|
|
'Oignion': 11, |
|
|
'Olivier Bour': 13, |
|
|
'Olivier Irrigué': 14, |
|
|
} |
|
|
|
|
|
|
|
|
def complete_recommendation(lon, lat, ph, mo, p, k, crop_name, rdt): |
|
|
location_data, location_html = get_location_info(lon, lat) |
|
|
|
|
|
if 'error' in location_data: |
|
|
return { |
|
|
'error': f"Location lookup failed: {location_data['error']}", |
|
|
'location': None, |
|
|
'recommendation': None |
|
|
} |
|
|
|
|
|
province_id = location_data.get('province_id') |
|
|
|
|
|
if not province_id: |
|
|
return { |
|
|
'error': 'Could not determine province ID. Check location_info.html', |
|
|
'location': location_data, |
|
|
'recommendation': None, |
|
|
} |
|
|
|
|
|
crop_mapping = discover_crops_from_website() |
|
|
culture_id = crop_mapping.get(crop_name) |
|
|
|
|
|
if culture_id is None: |
|
|
return { |
|
|
'error': f'Unknown crop: {crop_name}', |
|
|
'location': location_data, |
|
|
'recommendation': None |
|
|
} |
|
|
|
|
|
recommendation = get_fertilizer_recommendation( |
|
|
lon, lat, province_id, culture_id, ph, mo, p, k, rdt |
|
|
) |
|
|
|
|
|
return { |
|
|
'location': location_data, |
|
|
'recommendation': recommendation, |
|
|
'error': None |
|
|
} |
|
|
|
|
|
|
|
|
def create_gradio_app(): |
|
|
|
|
|
print("🔍 Discovering crops from website...") |
|
|
crop_mapping = discover_crops_from_website() |
|
|
crop_names = sorted(crop_mapping.keys()) |
|
|
|
|
|
def recommend_wrapper(lon, lat, ph, mo, p, k, crop_name, rdt): |
|
|
result = complete_recommendation(lon, lat, ph, mo, p, k, crop_name, rdt) |
|
|
|
|
|
if result['error']: |
|
|
return f"## ❌ Error\n\n{result['error']}\n\n**Debug files:**\n- location_info.html\n- recommendation.html\n- discovered_crops.json" |
|
|
|
|
|
loc = result['location'] |
|
|
rec = result['recommendation'] |
|
|
|
|
|
output = f""" |
|
|
# 🌾 FERTIMAP - Fertilizer Recommendation |
|
|
|
|
|
## 📍 Location Information |
|
|
- **Coordinates**: {lon:.4f}, {lat:.4f} |
|
|
- **Région**: {loc.get('region', 'N/A')} |
|
|
- **Préfecture/Province**: {loc.get('province', 'N/A')} |
|
|
- **Commune**: {loc.get('commune', 'N/A')} |
|
|
- **Province ID**: {loc.get('province_id', 'N/A')} |
|
|
|
|
|
**Soil Information:** |
|
|
- **Type**: {loc.get('soil_type', 'N/A')} |
|
|
- **Texture**: {loc.get('texture', 'N/A')} |
|
|
|
|
|
--- |
|
|
|
|
|
## 🧪 Input Parameters |
|
|
- **Culture**: {crop_name} |
|
|
- **pH**: {ph} |
|
|
- **Matière organique (MO)**: {mo}% |
|
|
- **Phosphore (P₂O₅)**: {p} mg/kg |
|
|
- **Potassium (K₂O)**: {k} mg/kg |
|
|
- **Rendement espéré**: {rdt} qx/ha |
|
|
|
|
|
--- |
|
|
|
|
|
## 🌱 Fertilizer Requirements (Besoins) |
|
|
|
|
|
| Element | Amount (kg/ha) | |
|
|
|---------|----------------| |
|
|
| **Nitrogen (N)** | {rec.get('N', 'N/A')} | |
|
|
| **Phosphorus (P)** | {rec.get('P', 'N/A')} | |
|
|
| **Potassium (K)** | {rec.get('K', 'N/A')} | |
|
|
|
|
|
--- |
|
|
|
|
|
## 📦 Recommended Products (Recommandations) |
|
|
""" |
|
|
|
|
|
recommendations = rec.get('recommendations', {}) |
|
|
|
|
|
|
|
|
if recommendations.get('regional'): |
|
|
output += "\n### 🌍 Recommandations basées sur la formule régionale\n" |
|
|
output += "*Pour un rendement optimal de la région*\n\n" |
|
|
for product in recommendations['regional']: |
|
|
prod_type = f" *({product['type']})*" if product.get('type') else "" |
|
|
output += f"- **{product['quantity']} qx/ha** de **{product['name']}**{prod_type}\n" |
|
|
|
|
|
|
|
|
if recommendations.get('selected_yield'): |
|
|
output += f"\n### 🎯 Recommandations pour le rendement sélectionné ({rdt} qx/ha)\n\n" |
|
|
for product in recommendations['selected_yield']: |
|
|
prod_type = f" *({product['type']})*" if product.get('type') else "" |
|
|
output += f"- **{product['quantity']} qx/ha** de **{product['name']}**{prod_type}\n" |
|
|
|
|
|
|
|
|
if recommendations.get('generic'): |
|
|
output += "\n### 🧪 Recommandations basées sur les formules génériques\n\n" |
|
|
for product in recommendations['generic']: |
|
|
prod_type = f" *({product['type']})*" if product.get('type') else "" |
|
|
output += f"- **{product['quantity']} qx/ha** de **{product['name']}**{prod_type}\n" |
|
|
|
|
|
if not any(recommendations.values()): |
|
|
output += "\n*Aucun produit trouvé - vérifiez recommendation.html*\n" |
|
|
|
|
|
output += f""" |
|
|
|
|
|
--- |
|
|
|
|
|
## 💰 Estimated Cost (Coût) |
|
|
**{rec.get('cost', 'N/A')} DH/ha** |
|
|
|
|
|
--- |
|
|
|
|
|
*💡 Note: Plusieurs recommandations peuvent être fournies selon la méthodologie (régionale, rendement sélectionné, formules génériques)* |
|
|
|
|
|
*Debug files: `location_info.html`, `recommendation.html`, `discovered_crops.json`* |
|
|
""" |
|
|
|
|
|
return output |
|
|
|
|
|
with gr.Blocks(title="FERTIMAP Morocco", theme=gr.themes.Soft()) as app: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# 🇲🇦 FERTIMAP Morocco - Fertilizer Recommendation System |
|
|
### Système de recommandation en fertilisation basé sur l'analyse du sol |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 📍 Location") |
|
|
lon = gr.Number(label="Longitude", value=-8.6106, precision=4) |
|
|
lat = gr.Number(label="Latitude", value=31.9631, precision=4) |
|
|
|
|
|
gr.Markdown("### 🧪 Soil Analysis") |
|
|
ph = gr.Slider(minimum=4.0, maximum=9.0, value=7.0, step=0.1, label="pH") |
|
|
mo = gr.Number(label="Matière organique (%)", value=1.25, precision=2) |
|
|
p = gr.Number(label="Phosphore P₂O₅ (mg/kg)", value=30.2, precision=1) |
|
|
k = gr.Number(label="Potassium K₂O (mg/kg)", value=411.2, precision=1) |
|
|
|
|
|
gr.Markdown("### 🌾 Crop Planning") |
|
|
crop = gr.Dropdown( |
|
|
choices=crop_names, |
|
|
label="Culture", |
|
|
value=crop_names[0] if crop_names else "Blé irrigué", |
|
|
filterable=True |
|
|
) |
|
|
rdt = gr.Number(label="Rendement espéré (qx/ha)", value=40, precision=0) |
|
|
|
|
|
btn = gr.Button("🔬 Get Recommendation", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
output = gr.Markdown() |
|
|
|
|
|
btn.click( |
|
|
fn=recommend_wrapper, |
|
|
inputs=[lon, lat, ph, mo, p, k, crop, rdt], |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
with gr.Accordion("📖 Instructions", open=False): |
|
|
gr.Markdown(f""" |
|
|
### Cultures disponibles ({len(crop_names)}): |
|
|
{', '.join(crop_names[:10])}... (voir discovered_crops.json) |
|
|
|
|
|
### Types de recommandations: |
|
|
|
|
|
1. **Formule régionale** 🌍 |
|
|
- Basée sur les données régionales |
|
|
- Rendement optimal de la région |
|
|
|
|
|
2. **Rendement sélectionné** 🎯 |
|
|
- Adapté à votre objectif de rendement |
|
|
- Plus précis pour votre cas |
|
|
|
|
|
3. **Formules génériques** 🧪 |
|
|
- Basées sur des formules standards |
|
|
- Engrais simples (TSP, Ammonitrates, etc.) |
|
|
|
|
|
### Utilisation: |
|
|
1. **Coordonnées**: Longitude/Latitude |
|
|
2. **Analyse du sol**: Résultats de laboratoire |
|
|
- **pH**: 4.0 - 9.0 |
|
|
- **MO**: 0.5 - 5% |
|
|
- **P₂O₅**: 5 - 100 mg/kg |
|
|
- **K₂O**: 50 - 500 mg/kg |
|
|
3. **Culture**: Choisir irrigué vs Bour |
|
|
4. **Rendement**: Objectif en qx/ha |
|
|
|
|
|
### Fichiers de debug: |
|
|
- `location_info.html` - Informations géographiques |
|
|
- `recommendation.html` - Calcul complet |
|
|
- `discovered_crops.json` - Liste des cultures |
|
|
""") |
|
|
|
|
|
with gr.Accordion("🗺️ Emplacements rapides", open=False): |
|
|
gr.Markdown(""" |
|
|
| Ville | Longitude | Latitude | |
|
|
|-------|-----------|----------| |
|
|
| Marrakech | -8.0089 | 31.6295 | |
|
|
| Casablanca | -7.6114 | 33.5731 | |
|
|
| Rabat | -6.8498 | 34.0209 | |
|
|
| Fès | -5.0003 | 34.0181 | |
|
|
| Meknès | -5.5471 | 33.8935 | |
|
|
| Agadir | -9.5981 | 30.4278 | |
|
|
""") |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
def batch_recommendations(csv_file): |
|
|
df = pd.read_csv(csv_file) |
|
|
results = [] |
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
print(f"Processing {idx+1}/{len(df)}...") |
|
|
|
|
|
result = complete_recommendation( |
|
|
row['lon'], row['lat'], row['ph'], row['mo'], |
|
|
row['p'], row['k'], row['crop'], row['rdt'] |
|
|
) |
|
|
|
|
|
if result['error']: |
|
|
results.append({ |
|
|
'lon': row['lon'], |
|
|
'lat': row['lat'], |
|
|
'error': result['error'] |
|
|
}) |
|
|
else: |
|
|
loc = result['location'] |
|
|
rec = result['recommendation'] |
|
|
|
|
|
|
|
|
all_products = [] |
|
|
for rec_type, products in rec['recommendations'].items(): |
|
|
for p in products: |
|
|
all_products.append(f"[{rec_type}] {p['quantity']} qx/ha {p['name']} ({p.get('type', '')})") |
|
|
|
|
|
products_str = ' | '.join(all_products) |
|
|
|
|
|
results.append({ |
|
|
'lon': row['lon'], |
|
|
'lat': row['lat'], |
|
|
'region': loc.get('region'), |
|
|
'province': loc.get('province'), |
|
|
'commune': loc.get('commune'), |
|
|
'province_id': loc.get('province_id'), |
|
|
'N_kg_ha': rec.get('N'), |
|
|
'P_kg_ha': rec.get('P'), |
|
|
'K_kg_ha': rec.get('K'), |
|
|
'cost_dh_ha': rec.get('cost'), |
|
|
'products': products_str |
|
|
}) |
|
|
|
|
|
results_df = pd.DataFrame(results) |
|
|
results_df.to_csv('fertilizer_recommendations_batch.csv', index=False) |
|
|
print(f"✅ Saved {len(results_df)} recommendations") |
|
|
|
|
|
return results_df |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("🧪 Testing API...") |
|
|
test_result = complete_recommendation( |
|
|
lon=-8.6106, |
|
|
lat=31.9631, |
|
|
ph=7.0, |
|
|
mo=1.25, |
|
|
p=30.2, |
|
|
k=411.2, |
|
|
crop_name='Blé irrigué', |
|
|
rdt=40 |
|
|
) |
|
|
|
|
|
print("\n📊 Test Result:") |
|
|
print(json.dumps(test_result, indent=2, ensure_ascii=False)) |
|
|
|
|
|
print("\n🚀 Launching Gradio interface...") |
|
|
app = create_gradio_app() |
|
|
|
|
|
app.launch() |