from qgis.core import ( QgsDataSourceUri, QgsFeature, QgsVectorLayer, QgsVectorLayerExporter, QgsVectorFileWriter, QgsWkbTypes ) import processing from typing import Dict from config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL def load_layer(table_name: str): uri = QgsDataSourceUri() uri.setConnection(HOST, PORT, DB, USER, PWD) uri.setDataSource(SCHEMA, table_name, GEOM_COL, "", "_id") layer = QgsVectorLayer(uri.uri(), table_name, "postgres") print("Layer valid:", layer.isValid()) return layer # self-intersection def cleansing_layer(layer: QgsVectorLayer) -> Dict: print("\n========== START CLEANSING ==========") print("Step 0: Load Layer") print(" - Valid:", layer.isValid()) print(" - Feature Count:", layer.featureCount()) print(" - type:", layer.geometryType()) summary = { "features": layer.featureCount(), "invalid_before": 0, "after_fixgeometries": 0, "after_fix": 0, "after_multipolygon": 0, "duplicates_removed": 0, "after_remove_vertices": 0, "after_srid": 0, "sliver_removed": 0, "after_deleteholes": 0, "valid_after": 0 } # # 1. Geometry validity check # print("\nStep 1: Geometry validity check (QGIS native)") # validity = processing.run( # "qgis:checkvalidity", # { # "INPUT_LAYER": layer, # "METHOD": 2, # GEOS # "IGNORE_RING_SELF_INTERSECTION": False, # "VALID_OUTPUT": "memory:", # "INVALID_OUTPUT": "memory:", # "ERROR_OUTPUT": "memory:" # } # ) # invalid_layer = validity["INVALID_OUTPUT"] # error_table = validity["ERROR_OUTPUT"] # invalid_count = invalid_layer.featureCount() # summary["invalid_before"] = invalid_count # print(" - Invalid geometries found:", invalid_count) # print(" - Total error messages:", error_table.featureCount()) # # 1.1 Fix invalid geometries # # print("\nStep 1.1: Fix invalid geometries (FixGeometries)") # # fixed_pre = processing.run("native:fixgeometries", {"INPUT": layer, "OUTPUT": "memory:"})["OUTPUT"] # # summary["after_fixgeometries"] = fixed_pre.featureCount() # # print(" - Features after FixGeometries:", fixed_pre.featureCount()) # # layer = fixed_pre # # 2. Fix geometries (again) # print("\nStep 2: Fix geometries (including self-intersections)") # fixed = processing.run("native:fixgeometries", {"INPUT": layer, "OUTPUT": "memory:"})["OUTPUT"] # print(" - Valid after fix:", fixed.isValid()) # print(" - Features after fix:", fixed.featureCount()) # summary["after_fix"] = fixed.featureCount() # # ======================================================== # # 3. ENSURE MULTIPOLYGON (LTR compatible!!) # # ======================================================== # print("\nStep 3: Ensure MULTIPOLYGON (LTR-safe method)") # # Step 3.1: Pecah multiparts → single (agar bersih) # singleparts = processing.run( # "native:multiparttosingleparts", # {"INPUT": fixed, "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - After multiparttosingleparts:", singleparts.featureCount()) # # Step 3.2: Promote semua polygon → multipolygon # multipolygon = processing.run( # "native:promotetomulti", # {"INPUT": fixed, "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - After promotetomulti:", multipolygon.featureCount()) # print(" - Valid:", multipolygon.isValid()) # summary["after_multipolygon"] = multipolygon.featureCount() # # 4. Remove duplicate rows # print("\nStep 4: Remove duplicate rows") # all_fields = [f.name() for f in multipolygon.fields()] # print(" - All fields:", all_fields) # if "id" in all_fields: # key_fields = ["id"] # else: # int_cols = [f.name() for f in multipolygon.fields() if f.typeName().lower() in ["int", "integer", "bigint"]] # key_fields = [int_cols[0]] if int_cols else all_fields # print(" - Using duplicate key:", key_fields) # dedup = processing.run("native:removeduplicatesbyattribute", {"INPUT": multipolygon, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:"})["OUTPUT"] # duplicates_removed = multipolygon.featureCount() - dedup.featureCount() # summary["duplicates_removed"] = duplicates_removed # print(" - Features before:", multipolygon.featureCount()) # print(" - Features after:", dedup.featureCount()) # print(" - Duplicates removed:", duplicates_removed) # # 5. Remove duplicate vertices # print("\nStep 5: Remove duplicate vertices") # no_dup_vertices = processing.run("native:removeduplicatevertices", {"INPUT": dedup, "VERTICES": 0, "OUTPUT": "memory:"})["OUTPUT"] # print(" - Features:", no_dup_vertices.featureCount()) # summary["after_remove_vertices"] = no_dup_vertices.featureCount() # print("\nStep 5.5: Check input CRS before reprojection") # input_crs = no_dup_vertices.crs() # if input_crs.isValid(): # print(" - Input CRS:", input_crs.authid()) # print(" - CRS description:", input_crs.description()) # else: # print(" - CRS INVALID or UNDEFINED") # # 6. REPROJECT to metric CRS BEFORE any area-based ops (use EPSG:4326 or local UTM) # print("\nStep 6: Reproject layer to EPSG:4326 for metric area calculations") # # choose EPSG:4326 or better choose local UTM if you know it; EPSG:4326 is general metric # final_proj = processing.run("native:reprojectlayer", {"INPUT": no_dup_vertices, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"})["OUTPUT"] # print(" - Features after reproject:", final_proj.featureCount()) # summary["after_srid"] = final_proj.featureCount() # ======================================================== # 1. REPROJECT FIRST (Step 6 dipindah ke Step 1) # ======================================================== print("\nStep 1: Reproject layer to EPSG:4326 (formerly Step 6)") input_crs = layer.crs() if input_crs.isValid(): print(" - Original CRS:", input_crs.authid()) print(" - Description:", input_crs.description()) else: print(" - Original CRS INVALID or UNDEFINED") reprojected = processing.run( "native:reprojectlayer", {"INPUT": layer, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"} )["OUTPUT"] print(" - Features after reprojection:", reprojected.featureCount()) summary["after_reproject"] = reprojected.featureCount() # ======================================================== # 2. Geometry validity check # ======================================================== print("\nStep 2: Geometry validity check (QGIS native)") validity = processing.run( "qgis:checkvalidity", { "INPUT_LAYER": reprojected, "METHOD": 2, "IGNORE_RING_SELF_INTERSECTION": False, "VALID_OUTPUT": "memory:", "INVALID_OUTPUT": "memory:", "ERROR_OUTPUT": "memory:" } ) invalid_layer = validity["INVALID_OUTPUT"] error_table = validity["ERROR_OUTPUT"] invalid_count = invalid_layer.featureCount() summary["invalid_before"] = invalid_count print(" - Invalid geometries found:", invalid_count) print(" - Total error messages:", error_table.featureCount()) # ======================================================== # 3. Fix geometries # ======================================================== print("\nStep 3: Fix geometries") fixed = processing.run( "native:fixgeometries", {"INPUT": reprojected, "OUTPUT": "memory:"} )["OUTPUT"] print(" - Valid after fix:", fixed.isValid()) print(" - Features after fix:", fixed.featureCount()) summary["after_fix"] = fixed.featureCount() # ======================================================== # 4. Ensure MULTIPOLYGON (LTR compatible) # ======================================================== print("\nStep 4: Ensure MULTIPOLYGON (LTR-safe method)") # 4.1 Split multipart → singlepart singleparts = processing.run( "native:multiparttosingleparts", {"INPUT": fixed, "OUTPUT": "memory:"} )["OUTPUT"] print(" - After multipart to single:", singleparts.featureCount()) # 4.2 Promote all polygons → multipolygon multipolygon = processing.run( "native:promotetomulti", {"INPUT": fixed, "OUTPUT": "memory:"} )["OUTPUT"] print(" - After promotetomulti:", multipolygon.featureCount()) print(" - Valid:", multipolygon.isValid()) summary["after_multipolygon"] = multipolygon.featureCount() # ======================================================== # 5. Remove duplicates rows & vertices # ======================================================== print("\nStep 5: Remove duplicate rows") all_fields = [f.name() for f in multipolygon.fields()] print(" - All fields:", all_fields) if "id" in all_fields: key_fields = ["id"] else: int_cols = [ f.name() for f in multipolygon.fields() if f.typeName().lower() in ["int", "integer", "bigint"] ] key_fields = [int_cols[0]] if int_cols else all_fields print(" - Using duplicate key:", key_fields) dedup = processing.run( "native:removeduplicatesbyattribute", {"INPUT": multipolygon, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:"} )["OUTPUT"] duplicates_removed = multipolygon.featureCount() - dedup.featureCount() summary["duplicates_removed"] = duplicates_removed print(" - Features before:", multipolygon.featureCount()) print(" - Features after:", dedup.featureCount()) print(" - Duplicates removed:", duplicates_removed) # Remove duplicate vertices print("\nStep 5.5: Remove duplicate vertices") no_dup_vertices = processing.run( "native:removeduplicatevertices", {"INPUT": dedup, "VERTICES": 0, "OUTPUT": "memory:"} )["OUTPUT"] print(" - Features:", no_dup_vertices.featureCount()) summary["after_remove_vertices"] = no_dup_vertices.featureCount() # ======================================================== # 6. FINAL STEP: final_proj tetap dipakai # ======================================================== print("\nStep 6: Finalize (using final_proj variable as requested)") final_proj = no_dup_vertices print(" - Final features:", final_proj.featureCount()) summary["after_final"] = final_proj.featureCount() # 7. Remove sliver polygons based on metric area (< 1 m^2) # print("\nStep 7: Remove sliver polygons (<1 m²)") # # use $area now because layer is in meters (EPSG:3857) # slivers = processing.run("native:extractbyexpression", {"INPUT": reprojected, "EXPRESSION": "$area < 1", "OUTPUT": "memory:"})["OUTPUT"] # summary["sliver_removed"] = slivers.featureCount() # print(" - Slivers found:", slivers.featureCount()) # no_sliver = processing.run( # "native:extractbyexpression", # { # "INPUT": reprojected, # "EXPRESSION": "geometry IS NOT NULL AND $area >= 1", # "OUTPUT": "memory:" # } # )["OUTPUT"] # print(" - Features left after removing slivers:", no_sliver.featureCount()) # # 8. Remove tiny holes (<1 m^2) — still in metric CRS # print("\nStep 8: Remove tiny holes (<1 m²)") # no_holes = processing.run("native:deleteholes", {"INPUT": no_sliver, "MIN_AREA": 1, "OUTPUT": "memory:"})["OUTPUT"] # print(" - Features after delete holes:", no_holes.featureCount()) # summary["after_deleteholes"] = no_holes.featureCount() # # Reproject BACK to EPSG:4326 for downstream (GeoServer/PostGIS target) # print("\nStep 9: Reproject back to EPSG:4326") # final_proj = processing.run("native:reprojectlayer", {"INPUT": no_holes, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"})["OUTPUT"] # print(" - Features:", final_proj.featureCount()) # Final: Trim string fields print("\nFinal Step: Trim string fields") trimmed = processing.run( "qgis:refactorfields", { "INPUT": final_proj, "FIELDS_MAPPING": [ { "expression": f"trim(\"{field.name()}\")" if field.typeName().lower() in ["text", "varchar"] else f"\"{field.name()}\"", "name": field.name(), "type": field.type(), "length": field.length(), "precision": field.precision() } for field in final_proj.fields() ], "KEEP_GEOMETRY": True, # <--- WAJIB "OUTPUT": "memory:" } )["OUTPUT"] valid_after = 0 for f in trimmed.getFeatures(): if f.geometry() is not None and f.geometry().isGeosValid(): valid_after += 1 summary["valid_after"] = valid_after print(" - Final feature count:", trimmed.featureCount()) print("========== CLEANSING DONE ==========\n") return {"summary": summary, "clean_layer": trimmed} def cleansing_points(layer: QgsVectorLayer): print("\n=== POINT CLEANING PIPELINE ===") summary = { "features_before": layer.featureCount(), "invalid_before": 0, "after_fix": 0, "after_dedup": 0, "after_reproject": 0, "valid_after": 0 } # 1. Check validity (will always return 0 errors for points) validity = processing.run( "qgis:checkvalidity", {"INPUT_LAYER": layer, "METHOD": 2, "VALID_OUTPUT": "memory:", "INVALID_OUTPUT": "memory:", "ERROR_OUTPUT": "memory:"} ) invalid = validity["INVALID_OUTPUT"].featureCount() summary["invalid_before"] = invalid print("- Invalid points:", invalid) # 2. Fix geometries (safe) fixed = processing.run("native:fixgeometries", {"INPUT": layer, "OUTPUT": "memory:"})["OUTPUT"] summary["after_fix"] = fixed.featureCount() # 3. Remove duplicate coordinates (points only) dedup = processing.run( "native:removedduplicategeometries", {"INPUT": fixed, "OUTPUT": "memory:"} )["OUTPUT"] summary["after_dedup"] = dedup.featureCount() # 4. Reproject reproject = processing.run( "native:reprojectlayer", {"INPUT": dedup, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"} )["OUTPUT"] summary["after_reproject"] = reproject.featureCount() # 5. Trim string fields trimmed = processing.run( "qgis:refactorfields", { "INPUT": reproject, "FIELDS_MAPPING": [ { "expression": f"trim(\"{field.name()}\")" if field.typeName().lower() in ["text","varchar"] else f"\"{field.name()}\"", "name": field.name(), "type": field.type(), "length": field.length(), "precision": field.precision(), } for field in reproject.fields() ], "KEEP_GEOMETRY": True, "OUTPUT": "memory:" } )["OUTPUT"] # 6. Validity check for points (simple) valid_after = 0 for f in trimmed.getFeatures(): if f.geometry() is not None: valid_after += 1 summary["valid_after"] = valid_after return {"summary": summary, "clean_layer": trimmed}