from qgis.core import ( QgsFeature, QgsVectorLayer, QgsVectorLayerExporter, QgsVectorFileWriter ) import processing from typing import Dict from database import build_uri def load_layer(table_name: str): uri = build_uri(table_name) print('uri', uri) layer = QgsVectorLayer(uri, table_name, "postgres") print("Layer valid:", layer.isValid()) return layer # def cleansing_layer(layer: QgsVectorLayer) -> Dict: # summary = { # "total_features_before": layer.featureCount(), # "invalid_geometries_before": 0, # "invalid_geometries_fixed": 0, # "duplicates_removed": 0, # "sliver_removed": 0, # "holes_removed": 0 # } # # ======================================================== # # 1. IDENTIFY INVALID GEOMETRY # # ======================================================== # invalid_ids = [] # for f in layer.getFeatures(): # if not f.geometry().isGeosValid(): # invalid_ids.append(f.id()) # summary["invalid_geometries_before"] = len(invalid_ids) # # ======================================================== # # 2. FIX GEOMETRIES # # ======================================================== # fixed = processing.run( # "native:fixgeometries", # { # "INPUT": layer, # "OUTPUT": "memory:" # } # )["OUTPUT"] # summary["invalid_geometries_fixed"] = len(invalid_ids) # # ======================================================== # # 3. ENSURE MULTIPOLYGON # # ======================================================== # multipolygon = processing.run( # "native:collect", # { # "INPUT": fixed, # "OUTPUT": "memory:" # } # )["OUTPUT"] # # ======================================================== # # 4. REMOVE DUPLICATE ROWS # # ======================================================== # all_fields = [f.name() for f in multipolygon.fields()] # print("Detecting key fields:", all_fields) # key_fields = None # # (1) Prefer 'id' # if "id" in all_fields: # key_fields = ["id"] # # (2) Else pick first integer field # if key_fields is None: # int_cols = [ # f.name() for f in multipolygon.fields() # if f.typeName().lower() in ["int", "integer", "bigint"] # ] # if int_cols: # key_fields = [int_cols[0]] # # (3) Else use all fields # if key_fields is None: # key_fields = all_fields # print("Using key field:", key_fields) # dedup = processing.run( # "native:removeduplicatesbyattribute", # { # "INPUT": multipolygon, # "FIELDS": key_fields, # "METHOD": 0, # "OUTPUT": "memory:" # } # )["OUTPUT"] # summary["duplicates_removed"] = ( # multipolygon.featureCount() - dedup.featureCount() # ) # # ======================================================== # # 5. REMOVE DUPLICATE VERTICES # # ======================================================== # no_dup_vertices = processing.run( # "native:removeduplicatevertices", # { # "INPUT": dedup, # "VERTICES": 0, # remove exact duplicates # "OUTPUT": "memory:" # } # )["OUTPUT"] # # ======================================================== # # 6. FIX SRID (REPROJECT IF NEEDED) # # ======================================================== # # Force SRID to 4326 # reprojected = processing.run( # "native:reprojectlayer", # { # "INPUT": no_dup_vertices, # "TARGET_CRS": "EPSG:4326", # "OUTPUT": "memory:" # } # )["OUTPUT"] # # ======================================================== # # 7. REMOVE SLIVER POLYGONS (< 1 m²) # # ======================================================== # # Filter polygons with area < 1 (threshold bisa kamu ubah) # slivers = processing.run( # "native:extractbyexpression", # { # "INPUT": reprojected, # "EXPRESSION": "$area < 1", # "OUTPUT": "memory:" # } # )["OUTPUT"] # summary["sliver_removed"] = slivers.featureCount() # # Keep only polygons with area >= 1 # no_sliver = processing.run( # "native:extractbyexpression", # { # "INPUT": reprojected, # "EXPRESSION": "$area >= 1", # "OUTPUT": "memory:" # } # )["OUTPUT"] # # ======================================================== # # 8. REMOVE TINY HOLES (< 1 m²) # # ======================================================== # no_holes = processing.run( # "native:deleteholes", # { # "INPUT": no_sliver, # "MIN_AREA": 1, # minimum area of hole to keep # "OUTPUT": "memory:" # } # )["OUTPUT"] # summary["holes_removed"] = 0 # can't count holes easily in PyQGIS # # ======================================================== # # 9. TRIM STRING FIELDS (ATTRIBUTE CLEANSING) # # ======================================================== # trimmed = processing.run( # "qgis:refactorfields", # { # "INPUT": no_holes, # "FIELDS_MAPPING": [ # { # "expression": f"trim(\"{field.name()}\")" # if field.typeName().lower() in ["text", "varchar"] # else f"\"{field.name()}\"", # "name": field.name(), # "type": field.type(), # "length": field.length(), # "precision": field.precision() # } # for field in no_holes.fields() # ], # "OUTPUT": "memory:" # } # )["OUTPUT"] # # ======================================================== # # RETURN CLEANED LAYER # # ======================================================== # return { # "summary": summary, # "clean_layer": trimmed # } # def cleansing_layer(layer: QgsVectorLayer) -> Dict: # # ======================================================== # # INITIAL STATE # # ======================================================== # print("\n========== START CLEANSING ==========") # print("Step 0: Load Layer") # print(" - Valid:", layer.isValid()) # print(" - Feature Count:", layer.featureCount()) # summary = { # "step0_features": layer.featureCount(), # "step1_invalid_before": 0, # "step2_after_fix": 0, # "step3_after_multipolygon": 0, # "step4_duplicates_removed": 0, # "step5_after_remove_vertices": 0, # "step6_after_srid": 0, # "step7_sliver_removed": 0, # "step8_after_deleteholes": 0 # } # # ======================================================== # # 1. VALIDATE GEOMETRY # # ======================================================== # print("\nStep 1: Identify invalid geometries") # invalid_ids = [] # for f in layer.getFeatures(): # if not f.geometry().isGeosValid(): # invalid_ids.append(f.id()) # summary["step1_invalid_before"] = len(invalid_ids) # print(" - Invalid geometries found:", len(invalid_ids)) # # ======================================================== # # 2. FIX GEOMETRIES # # ======================================================== # print("\nStep 2: Fix geometries") # fixed = processing.run( # "native:fixgeometries", # {"INPUT": layer, "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - Valid:", fixed.isValid()) # print(" - Features after fix:", fixed.featureCount()) # summary["step2_after_fix"] = fixed.featureCount() # # ======================================================== # # 3. ENSURE MULTIPOLYGON # # ======================================================== # print("\nStep 3: Ensure MULTIPOLYGON") # multipolygon = processing.run( # "native:collect", # {"INPUT": fixed, "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - Valid:", multipolygon.isValid()) # print(" - Features:", multipolygon.featureCount()) # summary["step3_after_multipolygon"] = multipolygon.featureCount() # # ======================================================== # # 4. REMOVE DUPLICATE ROWS # # ======================================================== # print("\nStep 4: Remove duplicate rows") # all_fields = [f.name() for f in multipolygon.fields()] # print(" - All fields:", all_fields) # key_fields = None # if "id" in all_fields: # key_fields = ["id"] # else: # int_cols = [ # f.name() for f in multipolygon.fields() # if f.typeName().lower() in ["int", "integer", "bigint"] # ] # if int_cols: # key_fields = [int_cols[0]] # else: # key_fields = all_fields # print(" - Using duplicate key:", key_fields) # dedup = processing.run( # "native:removeduplicatesbyattribute", # {"INPUT": multipolygon, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:"} # )["OUTPUT"] # duplicates_removed = multipolygon.featureCount() - dedup.featureCount() # summary["step4_duplicates_removed"] = duplicates_removed # print(" - Features before:", multipolygon.featureCount()) # print(" - Features after:", dedup.featureCount()) # print(" - Duplicates removed:", duplicates_removed) # # ======================================================== # # 5. REMOVE DUPLICATE VERTICES # # ======================================================== # print("\nStep 5: Remove duplicate vertices") # no_dup_vertices = processing.run( # "native:removeduplicatevertices", # {"INPUT": dedup, "VERTICES": 0, "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - Features:", no_dup_vertices.featureCount()) # summary["step5_after_remove_vertices"] = no_dup_vertices.featureCount() # # ======================================================== # # 6. FIX SRID / REPROJECT # # ======================================================== # print("\nStep 6: Reproject (Fix SRID to EPSG:4326)") # reprojected = processing.run( # "native:reprojectlayer", # {"INPUT": no_dup_vertices, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - Features:", reprojected.featureCount()) # summary["step6_after_srid"] = reprojected.featureCount() # # ======================================================== # # 7. REMOVE SLIVER POLYGONS (< 1 m2) # # ======================================================== # print("\nStep 7: Remove sliver polygons (<1 m²)") # slivers = processing.run( # "native:extractbyexpression", # {"INPUT": reprojected, "EXPRESSION": "$area < 1", "OUTPUT": "memory:"} # )["OUTPUT"] # summary["step7_sliver_removed"] = slivers.featureCount() # print(" - Slivers found:", slivers.featureCount()) # no_sliver = processing.run( # "native:extractbyexpression", # {"INPUT": reprojected, "EXPRESSION": "$area >= 1", "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - Features left after removing slivers:", no_sliver.featureCount()) # # ======================================================== # # 8. REMOVE TINY HOLES (< 1 m2) # # ======================================================== # print("\nStep 8: Remove tiny holes") # no_holes = processing.run( # "native:deleteholes", # {"INPUT": no_sliver, "MIN_AREA": 1, "OUTPUT": "memory:"} # )["OUTPUT"] # print(" - Features:", no_holes.featureCount()) # summary["step8_after_deleteholes"] = no_holes.featureCount() # # ======================================================== # # FINISH (TRIM ATTRIBUTES) # # ======================================================== # print("\nFinal Step: Trim string fields") # trimmed = processing.run( # "qgis:refactorfields", # { # "INPUT": no_holes, # "FIELDS_MAPPING": [ # { # "expression": f"trim(\"{field.name()}\")" # if field.typeName().lower() in ["text", "varchar"] # else f"\"{field.name()}\"", # "name": field.name(), # "type": field.type(), # "length": field.length(), # "precision": field.precision() # } # for field in no_holes.fields() # ], # "OUTPUT": "memory:" # } # )["OUTPUT"] # print(" - Final feature count:", trimmed.featureCount()) # print("========== CLEANSING DONE ==========\n") # return { # "summary": summary, # "clean_layer": trimmed # } # self-intersection def cleansing_layer(layer: QgsVectorLayer) -> Dict: # ======================================================== # INITIAL STATE # ======================================================== print("\n========== START CLEANSING ==========") print("Step 0: Load Layer") print(" - Valid:", layer.isValid()) print(" - Feature Count:", layer.featureCount()) summary = { "step0_features": layer.featureCount(), "step1_invalid_before": 0, "step1_5_self_intersections": 0, "step2_after_fix": 0, "step3_after_multipolygon": 0, "step4_duplicates_removed": 0, "step5_after_remove_vertices": 0, "step6_after_srid": 0, "step7_sliver_removed": 0, "step8_after_deleteholes": 0 } # ======================================================== # 1. VALIDATE GEOMETRY # ======================================================== print("\nStep 1: Identify invalid geometries") invalid_ids = [] for f in layer.getFeatures(): if not f.geometry().isGeosValid(): invalid_ids.append(f.id()) summary["step1_invalid_before"] = len(invalid_ids) print(" - Invalid geometries found:", len(invalid_ids)) # ======================================================== # 1.5 DETECT GEOMETRY ERRORS (MANUAL GEOS VALIDATION) # ======================================================== print("\nStep 1.5: Detect geometry errors (universal GEOS-safe method)") errors = [] for f in layer.getFeatures(): geom = f.geometry() if not geom.isGeosValid(): # Kita hanya tandai invalid (tanpa reason) errors.append(f.id()) summary["step1_5_geometry_errors"] = len(errors) print(" - Geometry errors detected:", len(errors)) print(" - Invalid feature IDs (first 10):", errors[:10]) # ======================================================== # 1.6 FIX INVALID GEOMETRIES (Native FixGeometries) # ======================================================== print("\nStep 1.6: Fix invalid geometries (FixGeometries)") fixed_pre = processing.run( "native:fixgeometries", {"INPUT": layer, "OUTPUT": "memory:"} )["OUTPUT"] summary["step1_6_after_fixgeometries"] = fixed_pre.featureCount() print(" - Features after FixGeometries:", fixed_pre.featureCount()) layer = fixed_pre # ======================================================== # 2. FIX GEOMETRIES (INCLUDES SELF-INTERSECTION FIX) # ======================================================== print("\nStep 2: Fix geometries (including self-intersections)") fixed = processing.run( "native:fixgeometries", {"INPUT": layer, "OUTPUT": "memory:"} )["OUTPUT"] print(" - Valid after fix:", fixed.isValid()) print(" - Features after fix:", fixed.featureCount()) summary["step2_after_fix"] = fixed.featureCount() # ======================================================== # 3. ENSURE MULTIPOLYGON # ======================================================== print("\nStep 3: Ensure MULTIPOLYGON") multipolygon = processing.run( "native:collect", {"INPUT": fixed, "OUTPUT": "memory:"} )["OUTPUT"] print(" - Valid:", multipolygon.isValid()) print(" - Features:", multipolygon.featureCount()) summary["step3_after_multipolygon"] = multipolygon.featureCount() # ======================================================== # 4. REMOVE DUPLICATE ROWS # ======================================================== print("\nStep 4: Remove duplicate rows") all_fields = [f.name() for f in multipolygon.fields()] print(" - All fields:", all_fields) if "id" in all_fields: key_fields = ["id"] else: int_cols = [ f.name() for f in multipolygon.fields() if f.typeName().lower() in ["int", "integer", "bigint"] ] key_fields = [int_cols[0]] if int_cols else all_fields print(" - Using duplicate key:", key_fields) dedup = processing.run( "native:removeduplicatesbyattribute", {"INPUT": multipolygon, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:"} )["OUTPUT"] duplicates_removed = multipolygon.featureCount() - dedup.featureCount() summary["step4_duplicates_removed"] = duplicates_removed print(" - Features before:", multipolygon.featureCount()) print(" - Features after:", dedup.featureCount()) print(" - Duplicates removed:", duplicates_removed) # ======================================================== # 5. REMOVE DUPLICATE VERTICES # ======================================================== print("\nStep 5: Remove duplicate vertices") no_dup_vertices = processing.run( "native:removeduplicatevertices", {"INPUT": dedup, "VERTICES": 0, "OUTPUT": "memory:"} )["OUTPUT"] print(" - Features:", no_dup_vertices.featureCount()) summary["step5_after_remove_vertices"] = no_dup_vertices.featureCount() # ======================================================== # 6. FIX SRID / REPROJECT # ======================================================== print("\nStep 6: Reproject (Fix SRID to EPSG:4326)") reprojected = processing.run( "native:reprojectlayer", {"INPUT": no_dup_vertices, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"} )["OUTPUT"] print(" - Features:", reprojected.featureCount()) summary["step6_after_srid"] = reprojected.featureCount() # ======================================================== # 7. REMOVE SLIVER POLYGONS (< 1 m2) # ======================================================== print("\nStep 7: Remove sliver polygons (<1 m²)") slivers = processing.run( "native:extractbyexpression", {"INPUT": reprojected, "EXPRESSION": "$area < 1", "OUTPUT": "memory:"} )["OUTPUT"] summary["step7_sliver_removed"] = slivers.featureCount() print(" - Slivers found:", slivers.featureCount()) no_sliver = processing.run( "native:extractbyexpression", {"INPUT": reprojected, "EXPRESSION": "$area >= 1", "OUTPUT": "memory:"} )["OUTPUT"] print(" - Features left after removing slivers:", no_sliver.featureCount()) # ======================================================== # 8. REMOVE TINY HOLES (< 1 m2) # ======================================================== print("\nStep 8: Remove tiny holes") no_holes = processing.run( "native:deleteholes", {"INPUT": no_sliver, "MIN_AREA": 1, "OUTPUT": "memory:"} )["OUTPUT"] print(" - Features:", no_holes.featureCount()) summary["step8_after_deleteholes"] = no_holes.featureCount() # ======================================================== # FINAL: TRIM STRING FIELDS # ======================================================== print("\nFinal Step: Trim string fields") trimmed = processing.run( "qgis:refactorfields", { "INPUT": no_holes, "FIELDS_MAPPING": [ { "expression": f"trim(\"{field.name()}\")" if field.typeName().lower() in ["text", "varchar"] else f"\"{field.name()}\"", "name": field.name(), "type": field.type(), "length": field.length(), "precision": field.precision() } for field in no_holes.fields() ], "OUTPUT": "memory:" } )["OUTPUT"] print(" - Final feature count:", trimmed.featureCount()) print("========== CLEANSING DONE ==========\n") return { "summary": summary, "clean_layer": trimmed }