import geopandas as gpd from shapely.geometry import Point, LineString import pandas as pd import numpy as np import re import os from shapely import wkt from rapidfuzz import process, fuzz from sqlalchemy import create_engine from shapely.geometry.base import BaseGeometry from core.config import REFERENCE_DB_URL, REFERENCE_SCHEMA # ============================================================ # KONFIGURASI DAN KONSTANTA # ============================================================ COLUMN_ALIASES = { 'desa': ['desa', 'kelurahan', 'desa_kelurahan', 'desa/kelurahan', 'nama_desa', 'nama_kelurahan', 'Desa/Kel'], 'kecamatan': ['kec', 'kecamatan', 'nama_kec', 'nama_kecamatan'], 'kabupaten': ['kab', 'kabupaten', 'kota', 'kabupaten_kota', 'kota_kabupaten', 'kab/kota', 'kota/kabupaten', 'kota/kab'] } # ============================================================ # FUNGSI BANTU ADMINISTRATIF # ============================================================ def find_admin_column(df, aliases): """Mencari kolom yang paling cocok untuk tiap level admin (desa/kec/kab)""" matched = {} for level, alias_list in aliases.items(): for col in df.columns: col_norm = col.strip().lower().replace(' ', '_').replace('/', '_') if any(alias in col_norm for alias in alias_list): matched[level] = col break return matched def detect_smallest_admin_level(df): """Mendeteksi level administratif terkecil yang ada di DataFrame""" cols = [c.lower() for c in df.columns] if any('desa' in c or 'kelurahan' in c for c in cols): return 'desa' elif any('kecamatan' in c for c in cols): return 'kecamatan' elif any('kab' in c or 'kota' in c for c in cols): return 'kabupaten' return None def fuzzy_merge(df, master, left_key, right_key, threshold=85): """Melakukan fuzzy matching antar nama wilayah""" matches = df[left_key].apply( lambda x: process.extractOne(str(x), master[right_key], score_cutoff=threshold) ) df['match'] = matches.apply(lambda m: m[0] if m else None) merged = df.merge(master, left_on='match', right_on=right_key, how='left') return merged def normalize_name(name: str, level: str = None): if not isinstance(name, str): return None name = name.strip() if not name: return None raw = name.lower() raw = re.sub(r'^(desa|kelurahan|kel|dusun|kampung)\s+', '', raw) raw = re.sub(r'^(kecamatan|kec)\s+', '', raw) raw = re.sub(r'^(kabupaten|kab\.?|kab)\s+', '', raw) if level in ["kabupaten", "kota"]: raw = re.sub(r'^(kota\s+)', '', raw) raw = re.sub(r'[^a-z\s]', '', raw) raw = re.sub(r'\s+', ' ', raw).strip() tokens = raw.split() merged_tokens = [] i = 0 while i < len(tokens): if i < len(tokens) - 1: sim = fuzz.ratio(tokens[i], tokens[i + 1]) if sim > 75: merged_tokens.append(tokens[i] + tokens[i + 1]) i += 2 continue merged_tokens.append(tokens[i]) i += 1 cleaned_tokens = [] prev = None for tok in merged_tokens: if prev and fuzz.ratio(prev, tok) > 95: continue cleaned_tokens.append(tok) prev = tok raw = " ".join(cleaned_tokens) formatted = raw.title() if level in ["kabupaten", "kota"]: if "kota" in name.lower(): if not formatted.startswith("Kota "): formatted = f"Kota {formatted}" else: formatted = formatted.replace("Kota ", "") return formatted def is_geom_empty(g): """True jika geometry None, NaN, atau geometry Shapely kosong.""" if g is None: return True if isinstance(g, float) and pd.isna(g): return True if isinstance(g, BaseGeometry): return g.is_empty return False # ============================================================ # FUNGSI UTAMA GEOMETRY DETECTION (LAT/LON / PATH) # ============================================================ def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFrame = None): """ Mendeteksi dan membentuk geometry dari DataFrame. Bisa dari lat/lon, WKT, atau join ke master polygon (jika disediakan). """ if isinstance(df, gpd.GeoDataFrame): if "geometry" in df.columns and df.geometry.notna().any(): geom_count = df.geometry.notna().sum() geom_type = list(df.geom_type.unique()) print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).") return df lat_col = next( (c for c in df.columns if re.search(r'\b(lat|latitude|y[_\s]*coord|y$)\b', c.lower())), None ) lon_col = next( (c for c in df.columns if re.search(r'\b(lon|long|longitude|x[_\s]*coord|x$)\b', c.lower())), None ) if lat_col and lon_col: df[lat_col] = pd.to_numeric(df[lat_col], errors='coerce') df[lon_col] = pd.to_numeric(df[lon_col], errors='coerce') gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs="EPSG:4326") print("[INFO] Geometry dibangun dari kolom lat/lon.") return gdf coord_col = next( (c for c in df.columns if re.search(r'(geom|geometry|wkt|shp|shape|path|coord)', c.lower())), None ) if coord_col and df[coord_col].notnull().any(): sample_val = str(df[coord_col].dropna().iloc[0]).strip() if sample_val.startswith('['): def parse_geom(val): try: pts = eval(val) return LineString(pts) except Exception: return None df['geometry'] = df[coord_col].apply(parse_geom) gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326") print("[INFO] Geometry dibangun dari kolom koordinat/path (list of points).") return gdf elif any(x in sample_val.upper() for x in ["POINT", "LINESTRING", "POLYGON"]): try: df['geometry'] = df[coord_col].apply( lambda g: wkt.loads(g) if isinstance(g, str) and any( x in g.upper() for x in ["POINT", "LINESTRING", "POLYGON"] ) else None ) gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326") print("[INFO] Geometry dibangun dari kolom WKT (Point/Line/Polygon/MultiPolygon).") return gdf except Exception as e: print(f"[WARN] Gagal parsing kolom geometry sebagai WKT: {e}") if master_polygons is not None: df.columns = df.columns.str.lower().str.strip().str.replace(' ', '_').str.replace('/', '_') matches = find_admin_column(df, COLUMN_ALIASES) if 'desa' in matches: admin_col = matches['desa'] merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_desa', how='left') if merged['geometry'].isna().sum() > 0: merged = fuzzy_merge(df, master_polygons, admin_col, 'nama_desa') gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) return gdf elif 'kecamatan' in matches: admin_col = matches['kecamatan'] merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kecamatan', how='left') gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) return gdf elif 'kabupaten' in matches: admin_col = matches['kabupaten'] merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kabupaten', how='left') gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) return gdf print("[WARN] Tidak ditemukan geometry (lat/lon, path, atau master).") return df # def get_reference_polygons(level): # """Mengambil data batas wilayah (MultiPolygon) dari DB referensi""" # table_map = { # 'desa': f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim", # 'kecamatan': f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim", # 'kabupaten': f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim" # } # table_name = table_map.get(level) # if not table_name: # raise ValueError(f"Tidak ada tabel referensi untuk level '{level}'.") # engine = create_engine(REFERENCE_DB_URL) # query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}" # gdf = gpd.read_postgis(query, engine, geom_col='geometry') # print(f"[INFO] {len(gdf)} data referensi '{level}' berhasil dimuat dari {table_name}.") # return gdf from functools import lru_cache @lru_cache(maxsize=3) def get_reference_polygons(level): local_path = f"cache/{level}_ref.parquet" if os.path.exists(local_path): print(f"[CACHE] Memuat referensi '{level}' dari file lokal.") return gpd.read_parquet(local_path) print(f"[DB] Mengambil data referensi '{level}' dari database...") table_map = { "desa": f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim", "kecamatan": f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim", "kabupaten": f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim" } table_name = table_map.get(level) engine = create_engine(REFERENCE_DB_URL) query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}" gdf = gpd.read_postgis(query, engine, geom_col="geometry") gdf.to_parquet(local_path) print(f"[CACHE] Disimpan ke {local_path}") return gdf # ============================================================ # Optimize Join # ============================================================ def build_join_key(df, cols): arr = df[cols].astype(str).replace("nan", "", regex=False).to_numpy() return np.char.add.reduce(np.column_stack( [arr[:, i] + ("|" if i < len(cols) - 1 else "") for i in range(len(cols))] ), axis=1) # ============================================================ # FUNGSI: AUTO ATTACH POLYGON KE DATAFRAME NON-SPASIAL # ============================================================ def attach_polygon_geometry_auto(df: pd.DataFrame): """ Tambahkan kolom geometry MultiPolygon berdasarkan kombinasi (desa/kelurahan + kecamatan + kabupaten/kota), tanpa duplikasi baris. """ level = detect_smallest_admin_level(df) if not level: print("[WARN] Tidak ditemukan kolom administratif (desa/kecamatan/kabupaten).") return df print(f"[INFO] Detected smallest admin level: {level}") ref_gdf = get_reference_polygons(level) desa_col = next((c for c in df.columns if any(x in c.lower() for x in ['desa', 'kelurahan'])), None) kec_col = next((c for c in df.columns if 'kec' in c.lower()), None) kab_col = next((c for c in df.columns if any(x in c.lower() for x in ['kab', 'kota'])), None) if desa_col and (not kec_col or not kab_col): print("[ERROR] Kolom 'Desa' ditemukan tetapi kolom 'Kecamatan' dan/atau 'Kabupaten' tidak lengkap.") print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") return df elif not desa_col and kec_col and not kab_col: print("[ERROR] Kolom 'Kecamatan' ditemukan tetapi kolom 'Kabupaten/Kota' tidak ditemukan.") print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") return df elif kab_col and not desa_col and not kec_col : print("[INFO] Struktur kolom administratif valid (minimal Kabupaten/Kota ditemukan).") print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") elif not desa_col and not kec_col and not kab_col: print("[WARN] Tidak ditemukan kolom administratif apapun (Desa/Kecamatan/Kabupaten).") print(f"[DEBUG] Kolom CSV: {list(df.columns)}") return df # kolom di referensi desa_ref = "WADMKD" kec_ref = "WADMKC" kab_ref = "WADMKK" if desa_col is not None: df[desa_col] = df[desa_col].astype(str).apply(lambda x: normalize_name(x, "desa")) if kec_col is not None: df[kec_col] = df[kec_col].astype(str).apply(lambda x: normalize_name(x, "kecamatan")) if kab_col is not None: df[kab_col] = df[kab_col].astype(str).apply(lambda x: normalize_name(x, "kabupaten")) if desa_ref is not None: ref_gdf[desa_ref] = ref_gdf[desa_ref].astype(str).apply(lambda x: normalize_name(x, "desa")) if kec_ref is not None: ref_gdf[kec_ref] = ref_gdf[kec_ref].astype(str).apply(lambda x: normalize_name(x, "kecamatan")) if kab_ref is not None: ref_gdf[kab_ref] = ref_gdf[kab_ref].astype(str).apply(lambda x: normalize_name(x, "kabupaten")) join_cols = [col for col in [desa_col, kec_col, kab_col] if col] if not join_cols: print("[ERROR] Tidak ada kolom administratif yang bisa digunakan untuk join key.") else: join_cols_df = [col for col in [desa_col, kec_col, kab_col] if col] join_cols_ref = [col for col in [desa_ref, kec_ref, kab_ref] if col] common_depth = min(len(join_cols_df), len(join_cols_ref)) join_cols_df = join_cols_df[-common_depth:] join_cols_ref = join_cols_ref[-common_depth:] # print(f"[DEBUG] Join kolom DF : {join_cols_df}") # print(f"[DEBUG] Join kolom REF : {join_cols_ref}") # df["_join_key"] = df[join_cols_df].astype(str).agg("|".join, axis=1) # ref_gdf["_join_key"] = ref_gdf[join_cols_ref].astype(str).agg("|".join, axis=1) df["_join_key"] = build_join_key(df, join_cols_df) ref_gdf["_join_key"] = build_join_key(ref_gdf, join_cols_ref) # print(f"[INFO] Join key berhasil dibuat dari kolom: {join_cols_df}") ref_lookup = ref_gdf[["_join_key", "geometry"]].drop_duplicates(subset=["_join_key"]) df = df.merge(ref_lookup, how="left", on="_join_key") matched = df["geometry"].notna().sum() # print(f"[INFO] {matched} dari {len(df)} baris cocok langsung berdasarkan (desa + kec + kab/kota).") if matched < len(df): unmatched = df[df["geometry"].isna()] # print(f"[INFO] Melakukan fuzzy match untuk {len(unmatched)} baris yang belum cocok...") ref_dict = dict(zip(ref_lookup["_join_key"], ref_lookup["geometry"])) def find_fuzzy_geom(row): key = row["_join_key"] if not isinstance(key, str): return None # fuzzy old # match = process.extractOne(key, list(ref_dict.keys()), scorer=fuzz.token_sort_ratio) # fuzzy new match = process.extractOne( key, list(ref_dict.keys()), scorer=fuzz.token_set_ratio, score_cutoff=80 ) if match and match[1] >= 85: return ref_dict[match[0]] return None df.loc[df["geometry"].isna(), "geometry"] = df[df["geometry"].isna()].apply(find_fuzzy_geom, axis=1) df = df.drop(columns=["_join_key"], errors="ignore") # admin_cols = [col for col in [desa_col, kec_col, kab_col] if col and col in df.columns] # if matched < len(df): # diff = df[df['geometry'].isna()][admin_cols] # print("[DEBUG] Baris yang tidak match:") # if diff.empty: # print("(semua baris berhasil match)") # else: # print(diff.to_string(index=False)) # print(f"[REPORT] Total match: {df['geometry'].notna().sum()} / {len(df)} ({df['geometry'].notna().mean()*100:.2f}%)") return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")