file_table_reader/services/upload_file/utils/geometry_detector.py

460 lines
16 KiB
Python
Raw Normal View History

2025-10-29 10:07:48 +00:00
import geopandas as gpd
from shapely.geometry import Point, LineString
import pandas as pd
2025-10-30 03:37:45 +00:00
import numpy as np
2025-10-29 10:07:48 +00:00
import re
2025-10-30 04:05:55 +00:00
import os
2025-10-29 10:07:48 +00:00
from shapely import wkt
from rapidfuzz import process, fuzz
from sqlalchemy import create_engine
from shapely.geometry.base import BaseGeometry
2025-11-06 07:23:24 +00:00
from core.config import REFERENCE_DB_URL, REFERENCE_SCHEMA, DESA_REF, KEC_REF, KAB_REF
2025-10-29 10:07:48 +00:00
# ============================================================
# KONFIGURASI DAN KONSTANTA
# ============================================================
COLUMN_ALIASES = {
'desa': ['desa', 'kelurahan', 'desa_kelurahan', 'desa/kelurahan', 'nama_desa', 'nama_kelurahan', 'Desa/Kel'],
'kecamatan': ['kec', 'kecamatan', 'nama_kec', 'nama_kecamatan'],
'kabupaten': ['kab', 'kabupaten', 'kota', 'kabupaten_kota', 'kota_kabupaten', 'kab/kota', 'kota/kabupaten', 'kota/kab']
}
# ============================================================
# FUNGSI BANTU ADMINISTRATIF
# ============================================================
def find_admin_column(df, aliases):
"""Mencari kolom yang paling cocok untuk tiap level admin (desa/kec/kab)"""
matched = {}
for level, alias_list in aliases.items():
for col in df.columns:
col_norm = col.strip().lower().replace(' ', '_').replace('/', '_')
if any(alias in col_norm for alias in alias_list):
matched[level] = col
break
return matched
def detect_smallest_admin_level(df):
"""Mendeteksi level administratif terkecil yang ada di DataFrame"""
cols = [c.lower() for c in df.columns]
if any('desa' in c or 'kelurahan' in c for c in cols):
return 'desa'
elif any('kecamatan' in c for c in cols):
return 'kecamatan'
elif any('kab' in c or 'kota' in c for c in cols):
return 'kabupaten'
return None
def fuzzy_merge(df, master, left_key, right_key, threshold=85):
"""Melakukan fuzzy matching antar nama wilayah"""
matches = df[left_key].apply(
lambda x: process.extractOne(str(x), master[right_key], score_cutoff=threshold)
)
df['match'] = matches.apply(lambda m: m[0] if m else None)
merged = df.merge(master, left_on='match', right_on=right_key, how='left')
return merged
def normalize_name(name: str, level: str = None):
if not isinstance(name, str):
return None
name = name.strip()
if not name:
return None
2025-11-04 15:19:25 +00:00
name = re.sub(r'\s*\([^)]*\)\s*', '', name)
2025-10-29 10:07:48 +00:00
raw = name.lower()
raw = re.sub(r'^(desa|kelurahan|kel|dusun|kampung)\s+', '', raw)
raw = re.sub(r'^(kecamatan|kec)\s+', '', raw)
raw = re.sub(r'^(kabupaten|kab\.?|kab)\s+', '', raw)
if level in ["kabupaten", "kota"]:
raw = re.sub(r'^(kota\s+)', '', raw)
raw = re.sub(r'[^a-z\s]', '', raw)
raw = re.sub(r'\s+', ' ', raw).strip()
tokens = raw.split()
merged_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1:
sim = fuzz.ratio(tokens[i], tokens[i + 1])
if sim > 75:
merged_tokens.append(tokens[i] + tokens[i + 1])
i += 2
continue
merged_tokens.append(tokens[i])
i += 1
cleaned_tokens = []
prev = None
for tok in merged_tokens:
if prev and fuzz.ratio(prev, tok) > 95:
continue
cleaned_tokens.append(tok)
prev = tok
raw = " ".join(cleaned_tokens)
formatted = raw.title()
if level in ["kabupaten", "kota"]:
if "kota" in name.lower():
if not formatted.startswith("Kota "):
formatted = f"Kota {formatted}"
else:
formatted = formatted.replace("Kota ", "")
return formatted
def is_geom_empty(g):
"""True jika geometry None, NaN, atau geometry Shapely kosong."""
if g is None:
return True
if isinstance(g, float) and pd.isna(g):
return True
if isinstance(g, BaseGeometry):
return g.is_empty
return False
2025-10-30 10:14:53 +00:00
import math
2025-11-04 15:19:25 +00:00
def normalize_lon(val, is_lat=False):
2025-10-30 10:14:53 +00:00
if pd.isna(val):
return None
try:
v = float(val)
except:
return None
av = abs(v)
if av == 0:
return v
if (-180 <= v <= 180 and not is_lat) or (-90 <= v <= 90 and is_lat):
return v
for factor in [1, 10, 100, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9]:
nv = v / factor
if (not is_lat and -180 <= nv <= 180) or (is_lat and -90 <= nv <= 90):
return nv
return None
def normalize_lat(val):
if pd.isna(val):
return None
v = float(val)
av = abs(v)
if av > 1e9: # contoh: -8167413802 (10 digit)
return v / 1e9
elif av > 1e8: # fallback jika ada variasi
return v / 1e8
else:
return v
2025-10-29 10:07:48 +00:00
# ============================================================
# FUNGSI UTAMA GEOMETRY DETECTION (LAT/LON / PATH)
# ============================================================
def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFrame = None):
"""
Mendeteksi dan membentuk geometry dari DataFrame.
Bisa dari lat/lon, WKT, atau join ke master polygon (jika disediakan).
"""
if isinstance(df, gpd.GeoDataFrame):
if "geometry" in df.columns and df.geometry.notna().any():
geom_count = df.geometry.notna().sum()
geom_type = list(df.geom_type.unique())
print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).")
return df
2025-10-30 10:14:53 +00:00
lat_col = next((c for c in df.columns if re.search(r'\b(lat|latitude|y[_\s]*coord|y$)\b', c.lower())), None)
lon_col = next((c for c in df.columns if re.search(r'\b(lon|long|longitude|x[_\s]*coord|x$)\b', c.lower())), None)
2025-10-29 10:07:48 +00:00
if lat_col and lon_col:
df[lat_col] = pd.to_numeric(df[lat_col], errors='coerce')
df[lon_col] = pd.to_numeric(df[lon_col], errors='coerce')
2025-10-30 08:38:20 +00:00
2025-11-04 15:19:25 +00:00
df[lon_col] = df[lon_col].apply(lambda x: normalize_lon(x, is_lat=False))
2025-10-30 10:14:53 +00:00
df[lat_col] = df[lat_col].apply(normalize_lat)
2025-10-30 08:38:20 +00:00
2025-10-29 10:07:48 +00:00
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs="EPSG:4326")
2025-10-30 10:14:53 +00:00
print("[INFO] Geometry dibangun dari kolom lat/lon.")
2025-10-29 10:07:48 +00:00
return gdf
coord_col = next(
(c for c in df.columns if re.search(r'(geom|geometry|wkt|shp|shape|path|coord)', c.lower())), None
)
if coord_col and df[coord_col].notnull().any():
sample_val = str(df[coord_col].dropna().iloc[0]).strip()
if sample_val.startswith('['):
def parse_geom(val):
try:
pts = eval(val)
return LineString(pts)
except Exception:
return None
df['geometry'] = df[coord_col].apply(parse_geom)
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326")
print("[INFO] Geometry dibangun dari kolom koordinat/path (list of points).")
return gdf
elif any(x in sample_val.upper() for x in ["POINT", "LINESTRING", "POLYGON"]):
try:
df['geometry'] = df[coord_col].apply(
lambda g: wkt.loads(g) if isinstance(g, str) and any(
x in g.upper() for x in ["POINT", "LINESTRING", "POLYGON"]
) else None
)
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326")
print("[INFO] Geometry dibangun dari kolom WKT (Point/Line/Polygon/MultiPolygon).")
return gdf
except Exception as e:
print(f"[WARN] Gagal parsing kolom geometry sebagai WKT: {e}")
if master_polygons is not None:
df.columns = df.columns.str.lower().str.strip().str.replace(' ', '_').str.replace('/', '_')
matches = find_admin_column(df, COLUMN_ALIASES)
if 'desa' in matches:
admin_col = matches['desa']
merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_desa', how='left')
if merged['geometry'].isna().sum() > 0:
merged = fuzzy_merge(df, master_polygons, admin_col, 'nama_desa')
gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs)
return gdf
elif 'kecamatan' in matches:
admin_col = matches['kecamatan']
merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kecamatan', how='left')
gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs)
return gdf
elif 'kabupaten' in matches:
admin_col = matches['kabupaten']
merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kabupaten', how='left')
gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs)
return gdf
print("[WARN] Tidak ditemukan geometry (lat/lon, path, atau master).")
return df
2025-10-30 04:05:55 +00:00
# def get_reference_polygons(level):
# """Mengambil data batas wilayah (MultiPolygon) dari DB referensi"""
# table_map = {
# 'desa': f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim",
# 'kecamatan': f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim",
# 'kabupaten': f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim"
# }
# table_name = table_map.get(level)
# if not table_name:
# raise ValueError(f"Tidak ada tabel referensi untuk level '{level}'.")
# engine = create_engine(REFERENCE_DB_URL)
# query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}"
# gdf = gpd.read_postgis(query, engine, geom_col='geometry')
# print(f"[INFO] {len(gdf)} data referensi '{level}' berhasil dimuat dari {table_name}.")
# return gdf
from functools import lru_cache
@lru_cache(maxsize=3)
2025-10-29 10:07:48 +00:00
def get_reference_polygons(level):
2025-10-30 04:05:55 +00:00
local_path = f"cache/{level}_ref.parquet"
if os.path.exists(local_path):
print(f"[CACHE] Memuat referensi '{level}' dari file lokal.")
return gpd.read_parquet(local_path)
print(f"[DB] Mengambil data referensi '{level}' dari database...")
2025-10-29 10:07:48 +00:00
table_map = {
2025-10-30 04:05:55 +00:00
"desa": f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim",
"kecamatan": f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim",
"kabupaten": f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim"
2025-10-29 10:07:48 +00:00
}
table_name = table_map.get(level)
engine = create_engine(REFERENCE_DB_URL)
query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}"
2025-10-30 04:05:55 +00:00
gdf = gpd.read_postgis(query, engine, geom_col="geometry")
gdf.to_parquet(local_path)
print(f"[CACHE] Disimpan ke {local_path}")
2025-10-29 10:07:48 +00:00
return gdf
2025-10-30 04:05:55 +00:00
2025-10-30 03:37:45 +00:00
# ============================================================
2025-10-30 04:05:55 +00:00
# Optimize Join
2025-10-30 03:37:45 +00:00
# ============================================================
def build_join_key(df, cols):
arr = df[cols].astype(str).replace("nan", "", regex=False).to_numpy()
return np.char.add.reduce(np.column_stack(
[arr[:, i] + ("|" if i < len(cols) - 1 else "") for i in range(len(cols))]
), axis=1)
2025-10-29 10:07:48 +00:00
# ============================================================
# FUNGSI: AUTO ATTACH POLYGON KE DATAFRAME NON-SPASIAL
# ============================================================
def attach_polygon_geometry_auto(df: pd.DataFrame):
"""
Tambahkan kolom geometry MultiPolygon berdasarkan kombinasi
(desa/kelurahan + kecamatan + kabupaten/kota), tanpa duplikasi baris.
"""
level = detect_smallest_admin_level(df)
if not level:
print("[WARN] Tidak ditemukan kolom administratif (desa/kecamatan/kabupaten).")
return df
print(f"[INFO] Detected smallest admin level: {level}")
ref_gdf = get_reference_polygons(level)
desa_col = next((c for c in df.columns if any(x in c.lower() for x in ['desa', 'kelurahan'])), None)
kec_col = next((c for c in df.columns if 'kec' in c.lower()), None)
kab_col = next((c for c in df.columns if any(x in c.lower() for x in ['kab', 'kota'])), None)
if desa_col and (not kec_col or not kab_col):
print("[ERROR] Kolom 'Desa' ditemukan tetapi kolom 'Kecamatan' dan/atau 'Kabupaten' tidak lengkap.")
print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}")
return df
elif not desa_col and kec_col and not kab_col:
print("[ERROR] Kolom 'Kecamatan' ditemukan tetapi kolom 'Kabupaten/Kota' tidak ditemukan.")
print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}")
return df
elif kab_col and not desa_col and not kec_col :
print("[INFO] Struktur kolom administratif valid (minimal Kabupaten/Kota ditemukan).")
print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}")
elif not desa_col and not kec_col and not kab_col:
print("[WARN] Tidak ditemukan kolom administratif apapun (Desa/Kecamatan/Kabupaten).")
print(f"[DEBUG] Kolom CSV: {list(df.columns)}")
return df
# kolom di referensi
2025-11-06 07:23:24 +00:00
desa_ref = DESA_REF
kec_ref = KEC_REF
kab_ref = KAB_REF
2025-10-29 10:07:48 +00:00
if desa_col is not None:
df[desa_col] = df[desa_col].astype(str).apply(lambda x: normalize_name(x, "desa"))
if kec_col is not None:
df[kec_col] = df[kec_col].astype(str).apply(lambda x: normalize_name(x, "kecamatan"))
if kab_col is not None:
df[kab_col] = df[kab_col].astype(str).apply(lambda x: normalize_name(x, "kabupaten"))
if desa_ref is not None:
ref_gdf[desa_ref] = ref_gdf[desa_ref].astype(str).apply(lambda x: normalize_name(x, "desa"))
if kec_ref is not None:
ref_gdf[kec_ref] = ref_gdf[kec_ref].astype(str).apply(lambda x: normalize_name(x, "kecamatan"))
if kab_ref is not None:
ref_gdf[kab_ref] = ref_gdf[kab_ref].astype(str).apply(lambda x: normalize_name(x, "kabupaten"))
join_cols = [col for col in [desa_col, kec_col, kab_col] if col]
if not join_cols:
print("[ERROR] Tidak ada kolom administratif yang bisa digunakan untuk join key.")
else:
join_cols_df = [col for col in [desa_col, kec_col, kab_col] if col]
join_cols_ref = [col for col in [desa_ref, kec_ref, kab_ref] if col]
common_depth = min(len(join_cols_df), len(join_cols_ref))
join_cols_df = join_cols_df[-common_depth:]
join_cols_ref = join_cols_ref[-common_depth:]
# print(f"[DEBUG] Join kolom DF : {join_cols_df}")
# print(f"[DEBUG] Join kolom REF : {join_cols_ref}")
2025-10-30 03:37:45 +00:00
# df["_join_key"] = df[join_cols_df].astype(str).agg("|".join, axis=1)
# ref_gdf["_join_key"] = ref_gdf[join_cols_ref].astype(str).agg("|".join, axis=1)
df["_join_key"] = build_join_key(df, join_cols_df)
ref_gdf["_join_key"] = build_join_key(ref_gdf, join_cols_ref)
2025-10-29 10:07:48 +00:00
# print(f"[INFO] Join key berhasil dibuat dari kolom: {join_cols_df}")
ref_lookup = ref_gdf[["_join_key", "geometry"]].drop_duplicates(subset=["_join_key"])
df = df.merge(ref_lookup, how="left", on="_join_key")
matched = df["geometry"].notna().sum()
# print(f"[INFO] {matched} dari {len(df)} baris cocok langsung berdasarkan (desa + kec + kab/kota).")
if matched < len(df):
unmatched = df[df["geometry"].isna()]
# print(f"[INFO] Melakukan fuzzy match untuk {len(unmatched)} baris yang belum cocok...")
ref_dict = dict(zip(ref_lookup["_join_key"], ref_lookup["geometry"]))
def find_fuzzy_geom(row):
key = row["_join_key"]
if not isinstance(key, str):
return None
# fuzzy old
# match = process.extractOne(key, list(ref_dict.keys()), scorer=fuzz.token_sort_ratio)
# fuzzy new
match = process.extractOne(
key, list(ref_dict.keys()), scorer=fuzz.token_set_ratio, score_cutoff=80
)
if match and match[1] >= 85:
return ref_dict[match[0]]
return None
df.loc[df["geometry"].isna(), "geometry"] = df[df["geometry"].isna()].apply(find_fuzzy_geom, axis=1)
df = df.drop(columns=["_join_key"], errors="ignore")
# admin_cols = [col for col in [desa_col, kec_col, kab_col] if col and col in df.columns]
# if matched < len(df):
# diff = df[df['geometry'].isna()][admin_cols]
# print("[DEBUG] Baris yang tidak match:")
# if diff.empty:
# print("(semua baris berhasil match)")
# else:
# print(diff.to_string(index=False))
# print(f"[REPORT] Total match: {df['geometry'].notna().sum()} / {len(df)} ({df['geometry'].notna().mean()*100:.2f}%)")
return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")