import geopandas as gpd from shapely.geometry import Point, LineString import pandas as pd import re from shapely import wkt from rapidfuzz import process, fuzz from sqlalchemy import create_engine from shapely.geometry.base import BaseGeometry from core.config import REFERENCE_DB_URL, REFERENCE_SCHEMA, REF_COLUMN_MAP # ============================================================ # KONFIGURASI DAN KONSTANTA # ============================================================ COLUMN_ALIASES = { 'desa': ['desa', 'kelurahan', 'desa_kelurahan', 'desa/kelurahan', 'nama_desa', 'nama_kelurahan', 'Desa/Kel'], 'kecamatan': ['kec', 'kecamatan', 'nama_kec', 'nama_kecamatan'], 'kabupaten': ['kab', 'kabupaten', 'kota', 'kabupaten_kota', 'kota_kabupaten', 'kab/kota', 'kota/kabupaten', 'kota/kab'] } # ============================================================ # FUNGSI BANTU ADMINISTRATIF # ============================================================ def find_admin_column(df, aliases): """Mencari kolom yang paling cocok untuk tiap level admin (desa/kec/kab)""" matched = {} for level, alias_list in aliases.items(): for col in df.columns: col_norm = col.strip().lower().replace(' ', '_').replace('/', '_') if any(alias in col_norm for alias in alias_list): matched[level] = col break return matched def detect_smallest_admin_level(df): """Mendeteksi level administratif terkecil yang ada di DataFrame""" cols = [c.lower() for c in df.columns] if any('desa' in c or 'kelurahan' in c for c in cols): return 'desa' elif any('kecamatan' in c for c in cols): return 'kecamatan' elif any('kab' in c or 'kota' in c for c in cols): return 'kabupaten' return None def fuzzy_merge(df, master, left_key, right_key, threshold=85): """Melakukan fuzzy matching antar nama wilayah""" matches = df[left_key].apply( lambda x: process.extractOne(str(x), master[right_key], score_cutoff=threshold) ) df['match'] = matches.apply(lambda m: m[0] if m else None) merged = df.merge(master, left_on='match', right_on=right_key, how='left') return merged def normalize_name(name: str, level: str = None): if not isinstance(name, str): return None name = name.strip() if not name: return None raw = name.lower() raw = re.sub(r'^(desa|kelurahan|kel|dusun|kampung)\s+', '', raw) raw = re.sub(r'^(kecamatan|kec)\s+', '', raw) raw = re.sub(r'^(kabupaten|kab\.?|kab)\s+', '', raw) if level in ["kabupaten", "kota"]: raw = re.sub(r'^(kota\s+)', '', raw) raw = re.sub(r'[^a-z\s]', '', raw) raw = re.sub(r'\s+', ' ', raw).strip() tokens = raw.split() merged_tokens = [] i = 0 while i < len(tokens): if i < len(tokens) - 1: sim = fuzz.ratio(tokens[i], tokens[i + 1]) if sim > 75: merged_tokens.append(tokens[i] + tokens[i + 1]) i += 2 continue merged_tokens.append(tokens[i]) i += 1 cleaned_tokens = [] prev = None for tok in merged_tokens: if prev and fuzz.ratio(prev, tok) > 95: continue cleaned_tokens.append(tok) prev = tok raw = " ".join(cleaned_tokens) formatted = raw.title() if level in ["kabupaten", "kota"]: if "kota" in name.lower(): if not formatted.startswith("Kota "): formatted = f"Kota {formatted}" else: formatted = formatted.replace("Kota ", "") return formatted def is_geom_empty(g): """True jika geometry None, NaN, atau geometry Shapely kosong.""" if g is None: return True if isinstance(g, float) and pd.isna(g): return True if isinstance(g, BaseGeometry): return g.is_empty return False # ============================================================ # FUNGSI UTAMA GEOMETRY DETECTION (LAT/LON / PATH) # ============================================================ def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFrame = None): """ Mendeteksi dan membentuk geometry dari DataFrame. Bisa dari lat/lon, WKT, atau join ke master polygon (jika disediakan). """ if isinstance(df, gpd.GeoDataFrame): if "geometry" in df.columns and df.geometry.notna().any(): geom_count = df.geometry.notna().sum() geom_type = list(df.geom_type.unique()) print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).") return df lat_col = next( (c for c in df.columns if re.search(r'\b(lat|latitude|y[_\s]*coord|y$)\b', c.lower())), None ) lon_col = next( (c for c in df.columns if re.search(r'\b(lon|long|longitude|x[_\s]*coord|x$)\b', c.lower())), None ) if lat_col and lon_col: df[lat_col] = pd.to_numeric(df[lat_col], errors='coerce') df[lon_col] = pd.to_numeric(df[lon_col], errors='coerce') gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs="EPSG:4326") print("[INFO] Geometry dibangun dari kolom lat/lon.") return gdf coord_col = next( (c for c in df.columns if re.search(r'(geom|geometry|wkt|shp|shape|path|coord)', c.lower())), None ) if coord_col and df[coord_col].notnull().any(): sample_val = str(df[coord_col].dropna().iloc[0]).strip() if sample_val.startswith('['): def parse_geom(val): try: pts = eval(val) return LineString(pts) except Exception: return None df['geometry'] = df[coord_col].apply(parse_geom) gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326") print("[INFO] Geometry dibangun dari kolom koordinat/path (list of points).") return gdf elif any(x in sample_val.upper() for x in ["POINT", "LINESTRING", "POLYGON"]): try: df['geometry'] = df[coord_col].apply( lambda g: wkt.loads(g) if isinstance(g, str) and any( x in g.upper() for x in ["POINT", "LINESTRING", "POLYGON"] ) else None ) gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326") print("[INFO] Geometry dibangun dari kolom WKT (Point/Line/Polygon/MultiPolygon).") return gdf except Exception as e: print(f"[WARN] Gagal parsing kolom geometry sebagai WKT: {e}") if master_polygons is not None: df.columns = df.columns.str.lower().str.strip().str.replace(' ', '_').str.replace('/', '_') matches = find_admin_column(df, COLUMN_ALIASES) if 'desa' in matches: admin_col = matches['desa'] merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_desa', how='left') if merged['geometry'].isna().sum() > 0: merged = fuzzy_merge(df, master_polygons, admin_col, 'nama_desa') gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) return gdf elif 'kecamatan' in matches: admin_col = matches['kecamatan'] merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kecamatan', how='left') gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) return gdf elif 'kabupaten' in matches: admin_col = matches['kabupaten'] merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kabupaten', how='left') gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) return gdf print("[WARN] Tidak ditemukan geometry (lat/lon, path, atau master).") return df def get_reference_polygons(level): """Mengambil data batas wilayah (MultiPolygon) dari DB referensi""" table_map = { 'desa': f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim", 'kecamatan': f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim", 'kabupaten': f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim" } table_name = table_map.get(level) if not table_name: raise ValueError(f"Tidak ada tabel referensi untuk level '{level}'.") engine = create_engine(REFERENCE_DB_URL) query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}" gdf = gpd.read_postgis(query, engine, geom_col='geometry') print(f"[INFO] {len(gdf)} data referensi '{level}' berhasil dimuat dari {table_name}.") return gdf # ============================================================ # FUNGSI: AUTO ATTACH POLYGON KE DATAFRAME NON-SPASIAL # ============================================================ def attach_polygon_geometry_auto(df: pd.DataFrame): """ Tambahkan kolom geometry MultiPolygon berdasarkan kombinasi (desa/kelurahan + kecamatan + kabupaten/kota), tanpa duplikasi baris. """ level = detect_smallest_admin_level(df) if not level: print("[WARN] Tidak ditemukan kolom administratif (desa/kecamatan/kabupaten).") return df print(f"[INFO] Detected smallest admin level: {level}") ref_gdf = get_reference_polygons(level) desa_col = next((c for c in df.columns if any(x in c.lower() for x in ['desa', 'kelurahan'])), None) kec_col = next((c for c in df.columns if 'kec' in c.lower()), None) kab_col = next((c for c in df.columns if any(x in c.lower() for x in ['kab', 'kota'])), None) if desa_col and (not kec_col or not kab_col): print("[ERROR] Kolom 'Desa' ditemukan tetapi kolom 'Kecamatan' dan/atau 'Kabupaten' tidak lengkap.") print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") return df elif not desa_col and kec_col and not kab_col: print("[ERROR] Kolom 'Kecamatan' ditemukan tetapi kolom 'Kabupaten/Kota' tidak ditemukan.") print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") return df elif kab_col and not desa_col and not kec_col : print("[INFO] Struktur kolom administratif valid (minimal Kabupaten/Kota ditemukan).") print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") elif not desa_col and not kec_col and not kab_col: print("[WARN] Tidak ditemukan kolom administratif apapun (Desa/Kecamatan/Kabupaten).") print(f"[DEBUG] Kolom CSV: {list(df.columns)}") return df # kolom di referensi desa_ref = "WADMKD" kec_ref = "WADMKC" kab_ref = "WADMKK" if desa_col is not None: df[desa_col] = df[desa_col].astype(str).apply(lambda x: normalize_name(x, "desa")) if kec_col is not None: df[kec_col] = df[kec_col].astype(str).apply(lambda x: normalize_name(x, "kecamatan")) if kab_col is not None: df[kab_col] = df[kab_col].astype(str).apply(lambda x: normalize_name(x, "kabupaten")) if desa_ref is not None: ref_gdf[desa_ref] = ref_gdf[desa_ref].astype(str).apply(lambda x: normalize_name(x, "desa")) if kec_ref is not None: ref_gdf[kec_ref] = ref_gdf[kec_ref].astype(str).apply(lambda x: normalize_name(x, "kecamatan")) if kab_ref is not None: ref_gdf[kab_ref] = ref_gdf[kab_ref].astype(str).apply(lambda x: normalize_name(x, "kabupaten")) join_cols = [col for col in [desa_col, kec_col, kab_col] if col] if not join_cols: print("[ERROR] Tidak ada kolom administratif yang bisa digunakan untuk join key.") else: join_cols_df = [col for col in [desa_col, kec_col, kab_col] if col] join_cols_ref = [col for col in [desa_ref, kec_ref, kab_ref] if col] common_depth = min(len(join_cols_df), len(join_cols_ref)) join_cols_df = join_cols_df[-common_depth:] join_cols_ref = join_cols_ref[-common_depth:] # print(f"[DEBUG] Join kolom DF : {join_cols_df}") # print(f"[DEBUG] Join kolom REF : {join_cols_ref}") df["_join_key"] = df[join_cols_df].astype(str).agg("|".join, axis=1) ref_gdf["_join_key"] = ref_gdf[join_cols_ref].astype(str).agg("|".join, axis=1) # print(f"[INFO] Join key berhasil dibuat dari kolom: {join_cols_df}") ref_lookup = ref_gdf[["_join_key", "geometry"]].drop_duplicates(subset=["_join_key"]) df = df.merge(ref_lookup, how="left", on="_join_key") matched = df["geometry"].notna().sum() # print(f"[INFO] {matched} dari {len(df)} baris cocok langsung berdasarkan (desa + kec + kab/kota).") if matched < len(df): unmatched = df[df["geometry"].isna()] # print(f"[INFO] Melakukan fuzzy match untuk {len(unmatched)} baris yang belum cocok...") ref_dict = dict(zip(ref_lookup["_join_key"], ref_lookup["geometry"])) def find_fuzzy_geom(row): key = row["_join_key"] if not isinstance(key, str): return None # fuzzy old # match = process.extractOne(key, list(ref_dict.keys()), scorer=fuzz.token_sort_ratio) # fuzzy new match = process.extractOne( key, list(ref_dict.keys()), scorer=fuzz.token_set_ratio, score_cutoff=80 ) if match and match[1] >= 85: return ref_dict[match[0]] return None df.loc[df["geometry"].isna(), "geometry"] = df[df["geometry"].isna()].apply(find_fuzzy_geom, axis=1) df = df.drop(columns=["_join_key"], errors="ignore") # admin_cols = [col for col in [desa_col, kec_col, kab_col] if col and col in df.columns] # if matched < len(df): # diff = df[df['geometry'].isna()][admin_cols] # print("[DEBUG] Baris yang tidak match:") # if diff.empty: # print("(semua baris berhasil match)") # else: # print(diff.to_string(index=False)) # print(f"[REPORT] Total match: {df['geometry'].notna().sum()} / {len(df)} ({df['geometry'].notna().mean()*100:.2f}%)") return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")