import pandas as pd import geopandas as gpd from shapely import wkt from shapely.errors import WKTReadingError def process_dataframe_synchronous(df_input, tmp_file): """ Fungsi ini berjalan di thread terpisah (CPU bound). Melakukan validasi, cleaning, dan export ke parquet. """ # 1. Copy agar tidak mengubah data asli export_df = df_input.copy() # ========================================================================= # TAHAP 1: SAFE WKT LOADING # ========================================================================= def safe_load_wkt(raw): if not isinstance(raw, str): return None try: return wkt.loads(raw) except (WKTReadingError, Exception): return None # Return None jika WKT corrupt # Terapkan safe load export_df["geom"] = export_df["geometry"].apply(safe_load_wkt) # ========================================================================= # TAHAP 2: FILTER NULL & INVALID GEOMETRY # ========================================================================= # Hapus baris di mana konversi WKT gagal (None) export_df = export_df[export_df["geom"].notnull()] if export_df.empty: raise ValueError("Tidak ada data spasial valid yang ditemukan.") # Jadikan GeoDataFrame export_df = gpd.GeoDataFrame(export_df, geometry="geom") # ========================================================================= # TAHAP 3: FIX TOPOLOGY (PENTING!) # ========================================================================= # Cek validitas (misal: Polygon yang garisnya menabrak diri sendiri) # buffer(0) adalah trik standar GIS untuk memperbaiki topologi ringan export_df["geom"] = export_df["geom"].apply( lambda g: g.buffer(0) if not g.is_valid else g ) # Hapus lagi jika setelah di-fix malah jadi kosong (jarang terjadi, tapi aman) export_df = export_df[~export_df["geom"].is_empty] # ========================================================================= # TAHAP 4: FINALISASI (CRS & RENAME) # ========================================================================= export_df = export_df.drop(columns=["geometry"]) # Buang kolom string WKT lama export_df = export_df.set_crs("EPSG:4326", allow_override=True) # Rename kolom atribut ke UPPERCASE, biarkan 'geom' lowercase # .strip() untuk membuang spasi hantu (" ID " -> "ID") export_df = export_df.rename( columns=lambda c: str(c).strip().upper() if c != "geom" else c ) # Simpan ke Parquet export_df.to_parquet(tmp_file) return len(export_df) # --- Cara Pemanggilan di Async Function --- # await asyncio.to_thread(process_dataframe_synchronous, result, tmp_file)