from qgis.core import ( QgsVectorLayer, QgsVectorLayerExporter, QgsVectorFileWriter ) import processing from typing import Dict from database import build_uri def load_layer(table_name: str): uri = build_uri(table_name) print('uri',uri) layer = QgsVectorLayer(uri, table_name, "postgres") print("Layer valid:", layer.isValid()) # print("Error:", layer.error().summary()) # print("FIELDS:", [f.name() for f in layer.fields()]) return layer def cleansing_layer(layer: QgsVectorLayer) -> Dict: summary = { "total_features_before": layer.featureCount(), "invalid_geometries_before": 0, "invalid_geometries_fixed": 0, "duplicates_removed": 0, } # ------------------------- # 1. IDENTIFY INVALID GEOMETRY # ------------------------- invalid_ids = [] for f in layer.getFeatures(): if not f.geometry().isGeosValid(): invalid_ids.append(f.id()) summary["invalid_geometries_before"] = len(invalid_ids) # ------------------------- # 2. FIX INVALID GEOMETRY # ------------------------- fixed = processing.run( "native:fixgeometries", { "INPUT": layer, "OUTPUT": "memory:" } )["OUTPUT"] summary["invalid_geometries_fixed"] = len(invalid_ids) # ------------------------- # 3. AUTO DETECT DUPLICATE KEY FIELD # ------------------------- all_fields = [f.name() for f in fixed.fields()] print("Detecting key fields:", all_fields) key_fields = None # (1) Prefer kolom "id" if "id" in all_fields: key_fields = ["id"] # (2) Jika tidak ada, pilih kolom integer pertama if key_fields is None: int_cols = [f.name() for f in fixed.fields() if f.typeName().lower() in ["int", "integer", "bigint"]] if int_cols: key_fields = [int_cols[0]] # (3) Jika tetap tidak ada, pakai semua kolom if key_fields is None: key_fields = all_fields print("Using key field:", key_fields) # ------------------------- # 4. REMOVE DUPLICATES BY ATTRIBUTE # ------------------------- dedup = processing.run( "native:removeduplicatesbyattribute", { "INPUT": fixed, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:" } )["OUTPUT"] summary["duplicates_removed"] = ( fixed.featureCount() - dedup.featureCount() ) # ------------------------- # 5. TRIM STRING FIELDS # ------------------------- trimmed = processing.run( "qgis:refactorfields", { "INPUT": dedup, "FIELDS_MAPPING": [ { "expression": f"trim(\"{field.name()}\")" if field.typeName().lower() in ["text", "varchar"] else f"\"{field.name()}\"", "name": field.name(), "type": field.type(), "length": field.length(), "precision": field.precision() } for field in dedup.fields() ], "OUTPUT": "memory:" } )["OUTPUT"] # ------------------------- # RESULT # ------------------------- return { "summary": summary, "clean_layer": trimmed }