file_table_reader/services/geometry_detector.py
dmsanhrProject 16f0042508 Init Commit
2025-10-29 17:07:48 +07:00

377 lines
14 KiB
Python

import geopandas as gpd
from shapely.geometry import Point, LineString
import pandas as pd
import re
from shapely import wkt
from rapidfuzz import process, fuzz
from sqlalchemy import create_engine
from shapely.geometry.base import BaseGeometry
from core.config import REFERENCE_DB_URL, REFERENCE_SCHEMA, REF_COLUMN_MAP
# ============================================================
# KONFIGURASI DAN KONSTANTA
# ============================================================
COLUMN_ALIASES = {
'desa': ['desa', 'kelurahan', 'desa_kelurahan', 'desa/kelurahan', 'nama_desa', 'nama_kelurahan', 'Desa/Kel'],
'kecamatan': ['kec', 'kecamatan', 'nama_kec', 'nama_kecamatan'],
'kabupaten': ['kab', 'kabupaten', 'kota', 'kabupaten_kota', 'kota_kabupaten', 'kab/kota', 'kota/kabupaten', 'kota/kab']
}
# ============================================================
# FUNGSI BANTU ADMINISTRATIF
# ============================================================
def find_admin_column(df, aliases):
"""Mencari kolom yang paling cocok untuk tiap level admin (desa/kec/kab)"""
matched = {}
for level, alias_list in aliases.items():
for col in df.columns:
col_norm = col.strip().lower().replace(' ', '_').replace('/', '_')
if any(alias in col_norm for alias in alias_list):
matched[level] = col
break
return matched
def detect_smallest_admin_level(df):
"""Mendeteksi level administratif terkecil yang ada di DataFrame"""
cols = [c.lower() for c in df.columns]
if any('desa' in c or 'kelurahan' in c for c in cols):
return 'desa'
elif any('kecamatan' in c for c in cols):
return 'kecamatan'
elif any('kab' in c or 'kota' in c for c in cols):
return 'kabupaten'
return None
def fuzzy_merge(df, master, left_key, right_key, threshold=85):
"""Melakukan fuzzy matching antar nama wilayah"""
matches = df[left_key].apply(
lambda x: process.extractOne(str(x), master[right_key], score_cutoff=threshold)
)
df['match'] = matches.apply(lambda m: m[0] if m else None)
merged = df.merge(master, left_on='match', right_on=right_key, how='left')
return merged
def normalize_name(name: str, level: str = None):
if not isinstance(name, str):
return None
name = name.strip()
if not name:
return None
raw = name.lower()
raw = re.sub(r'^(desa|kelurahan|kel|dusun|kampung)\s+', '', raw)
raw = re.sub(r'^(kecamatan|kec)\s+', '', raw)
raw = re.sub(r'^(kabupaten|kab\.?|kab)\s+', '', raw)
if level in ["kabupaten", "kota"]:
raw = re.sub(r'^(kota\s+)', '', raw)
raw = re.sub(r'[^a-z\s]', '', raw)
raw = re.sub(r'\s+', ' ', raw).strip()
tokens = raw.split()
merged_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1:
sim = fuzz.ratio(tokens[i], tokens[i + 1])
if sim > 75:
merged_tokens.append(tokens[i] + tokens[i + 1])
i += 2
continue
merged_tokens.append(tokens[i])
i += 1
cleaned_tokens = []
prev = None
for tok in merged_tokens:
if prev and fuzz.ratio(prev, tok) > 95:
continue
cleaned_tokens.append(tok)
prev = tok
raw = " ".join(cleaned_tokens)
formatted = raw.title()
if level in ["kabupaten", "kota"]:
if "kota" in name.lower():
if not formatted.startswith("Kota "):
formatted = f"Kota {formatted}"
else:
formatted = formatted.replace("Kota ", "")
return formatted
def is_geom_empty(g):
"""True jika geometry None, NaN, atau geometry Shapely kosong."""
if g is None:
return True
if isinstance(g, float) and pd.isna(g):
return True
if isinstance(g, BaseGeometry):
return g.is_empty
return False
# ============================================================
# FUNGSI UTAMA GEOMETRY DETECTION (LAT/LON / PATH)
# ============================================================
def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFrame = None):
"""
Mendeteksi dan membentuk geometry dari DataFrame.
Bisa dari lat/lon, WKT, atau join ke master polygon (jika disediakan).
"""
if isinstance(df, gpd.GeoDataFrame):
if "geometry" in df.columns and df.geometry.notna().any():
geom_count = df.geometry.notna().sum()
geom_type = list(df.geom_type.unique())
print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).")
return df
lat_col = next(
(c for c in df.columns if re.search(r'\b(lat|latitude|y[_\s]*coord|y$)\b', c.lower())), None
)
lon_col = next(
(c for c in df.columns if re.search(r'\b(lon|long|longitude|x[_\s]*coord|x$)\b', c.lower())), None
)
if lat_col and lon_col:
df[lat_col] = pd.to_numeric(df[lat_col], errors='coerce')
df[lon_col] = pd.to_numeric(df[lon_col], errors='coerce')
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs="EPSG:4326")
print("[INFO] Geometry dibangun dari kolom lat/lon.")
return gdf
coord_col = next(
(c for c in df.columns if re.search(r'(geom|geometry|wkt|shp|shape|path|coord)', c.lower())), None
)
if coord_col and df[coord_col].notnull().any():
sample_val = str(df[coord_col].dropna().iloc[0]).strip()
if sample_val.startswith('['):
def parse_geom(val):
try:
pts = eval(val)
return LineString(pts)
except Exception:
return None
df['geometry'] = df[coord_col].apply(parse_geom)
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326")
print("[INFO] Geometry dibangun dari kolom koordinat/path (list of points).")
return gdf
elif any(x in sample_val.upper() for x in ["POINT", "LINESTRING", "POLYGON"]):
try:
df['geometry'] = df[coord_col].apply(
lambda g: wkt.loads(g) if isinstance(g, str) and any(
x in g.upper() for x in ["POINT", "LINESTRING", "POLYGON"]
) else None
)
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326")
print("[INFO] Geometry dibangun dari kolom WKT (Point/Line/Polygon/MultiPolygon).")
return gdf
except Exception as e:
print(f"[WARN] Gagal parsing kolom geometry sebagai WKT: {e}")
if master_polygons is not None:
df.columns = df.columns.str.lower().str.strip().str.replace(' ', '_').str.replace('/', '_')
matches = find_admin_column(df, COLUMN_ALIASES)
if 'desa' in matches:
admin_col = matches['desa']
merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_desa', how='left')
if merged['geometry'].isna().sum() > 0:
merged = fuzzy_merge(df, master_polygons, admin_col, 'nama_desa')
gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs)
return gdf
elif 'kecamatan' in matches:
admin_col = matches['kecamatan']
merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kecamatan', how='left')
gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs)
return gdf
elif 'kabupaten' in matches:
admin_col = matches['kabupaten']
merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kabupaten', how='left')
gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs)
return gdf
print("[WARN] Tidak ditemukan geometry (lat/lon, path, atau master).")
return df
def get_reference_polygons(level):
"""Mengambil data batas wilayah (MultiPolygon) dari DB referensi"""
table_map = {
'desa': f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim",
'kecamatan': f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim",
'kabupaten': f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim"
}
table_name = table_map.get(level)
if not table_name:
raise ValueError(f"Tidak ada tabel referensi untuk level '{level}'.")
engine = create_engine(REFERENCE_DB_URL)
query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}"
gdf = gpd.read_postgis(query, engine, geom_col='geometry')
print(f"[INFO] {len(gdf)} data referensi '{level}' berhasil dimuat dari {table_name}.")
return gdf
# ============================================================
# FUNGSI: AUTO ATTACH POLYGON KE DATAFRAME NON-SPASIAL
# ============================================================
def attach_polygon_geometry_auto(df: pd.DataFrame):
"""
Tambahkan kolom geometry MultiPolygon berdasarkan kombinasi
(desa/kelurahan + kecamatan + kabupaten/kota), tanpa duplikasi baris.
"""
level = detect_smallest_admin_level(df)
if not level:
print("[WARN] Tidak ditemukan kolom administratif (desa/kecamatan/kabupaten).")
return df
print(f"[INFO] Detected smallest admin level: {level}")
ref_gdf = get_reference_polygons(level)
desa_col = next((c for c in df.columns if any(x in c.lower() for x in ['desa', 'kelurahan'])), None)
kec_col = next((c for c in df.columns if 'kec' in c.lower()), None)
kab_col = next((c for c in df.columns if any(x in c.lower() for x in ['kab', 'kota'])), None)
if desa_col and (not kec_col or not kab_col):
print("[ERROR] Kolom 'Desa' ditemukan tetapi kolom 'Kecamatan' dan/atau 'Kabupaten' tidak lengkap.")
print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}")
return df
elif not desa_col and kec_col and not kab_col:
print("[ERROR] Kolom 'Kecamatan' ditemukan tetapi kolom 'Kabupaten/Kota' tidak ditemukan.")
print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}")
return df
elif kab_col and not desa_col and not kec_col :
print("[INFO] Struktur kolom administratif valid (minimal Kabupaten/Kota ditemukan).")
print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}")
elif not desa_col and not kec_col and not kab_col:
print("[WARN] Tidak ditemukan kolom administratif apapun (Desa/Kecamatan/Kabupaten).")
print(f"[DEBUG] Kolom CSV: {list(df.columns)}")
return df
# kolom di referensi
desa_ref = "WADMKD"
kec_ref = "WADMKC"
kab_ref = "WADMKK"
if desa_col is not None:
df[desa_col] = df[desa_col].astype(str).apply(lambda x: normalize_name(x, "desa"))
if kec_col is not None:
df[kec_col] = df[kec_col].astype(str).apply(lambda x: normalize_name(x, "kecamatan"))
if kab_col is not None:
df[kab_col] = df[kab_col].astype(str).apply(lambda x: normalize_name(x, "kabupaten"))
if desa_ref is not None:
ref_gdf[desa_ref] = ref_gdf[desa_ref].astype(str).apply(lambda x: normalize_name(x, "desa"))
if kec_ref is not None:
ref_gdf[kec_ref] = ref_gdf[kec_ref].astype(str).apply(lambda x: normalize_name(x, "kecamatan"))
if kab_ref is not None:
ref_gdf[kab_ref] = ref_gdf[kab_ref].astype(str).apply(lambda x: normalize_name(x, "kabupaten"))
join_cols = [col for col in [desa_col, kec_col, kab_col] if col]
if not join_cols:
print("[ERROR] Tidak ada kolom administratif yang bisa digunakan untuk join key.")
else:
join_cols_df = [col for col in [desa_col, kec_col, kab_col] if col]
join_cols_ref = [col for col in [desa_ref, kec_ref, kab_ref] if col]
common_depth = min(len(join_cols_df), len(join_cols_ref))
join_cols_df = join_cols_df[-common_depth:]
join_cols_ref = join_cols_ref[-common_depth:]
# print(f"[DEBUG] Join kolom DF : {join_cols_df}")
# print(f"[DEBUG] Join kolom REF : {join_cols_ref}")
df["_join_key"] = df[join_cols_df].astype(str).agg("|".join, axis=1)
ref_gdf["_join_key"] = ref_gdf[join_cols_ref].astype(str).agg("|".join, axis=1)
# print(f"[INFO] Join key berhasil dibuat dari kolom: {join_cols_df}")
ref_lookup = ref_gdf[["_join_key", "geometry"]].drop_duplicates(subset=["_join_key"])
df = df.merge(ref_lookup, how="left", on="_join_key")
matched = df["geometry"].notna().sum()
# print(f"[INFO] {matched} dari {len(df)} baris cocok langsung berdasarkan (desa + kec + kab/kota).")
if matched < len(df):
unmatched = df[df["geometry"].isna()]
# print(f"[INFO] Melakukan fuzzy match untuk {len(unmatched)} baris yang belum cocok...")
ref_dict = dict(zip(ref_lookup["_join_key"], ref_lookup["geometry"]))
def find_fuzzy_geom(row):
key = row["_join_key"]
if not isinstance(key, str):
return None
# fuzzy old
# match = process.extractOne(key, list(ref_dict.keys()), scorer=fuzz.token_sort_ratio)
# fuzzy new
match = process.extractOne(
key, list(ref_dict.keys()), scorer=fuzz.token_set_ratio, score_cutoff=80
)
if match and match[1] >= 85:
return ref_dict[match[0]]
return None
df.loc[df["geometry"].isna(), "geometry"] = df[df["geometry"].isna()].apply(find_fuzzy_geom, axis=1)
df = df.drop(columns=["_join_key"], errors="ignore")
# admin_cols = [col for col in [desa_col, kec_col, kab_col] if col and col in df.columns]
# if matched < len(df):
# diff = df[df['geometry'].isna()][admin_cols]
# print("[DEBUG] Baris yang tidak match:")
# if diff.empty:
# print("(semua baris berhasil match)")
# else:
# print(diff.to_string(index=False))
# print(f"[REPORT] Total match: {df['geometry'].notna().sum()} / {len(df)} ({df['geometry'].notna().mean()*100:.2f}%)")
return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")