from fastapi import FastAPI, BackgroundTasks import psycopg2 import requests from uuid import uuid4 from qgis_bootstrap import start_qgis # from cleansing_service import load_layer, cleansing_layer from full_cleansing_service import load_layer, cleansing_layer from qgis.core import ( QgsVectorLayer, QgsVectorLayerExporter, QgsDataSourceUri, QgsProviderRegistry, QgsCoordinateReferenceSystem, QgsWkbTypes, QgsGeometry ) from qgis.PyQt.QtCore import QByteArray from config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL app = FastAPI() qgs = start_qgis() @app.get("/") def root(): return {"status": "QGIS Cleansing API Running"} @app.get("/clean/{table_name}") def clean_table(table_name: str): layer = load_layer(table_name) if not layer.isValid(): return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."} print(layer) result = cleansing_layer(layer) summary = result["summary"] return { "table": table_name, "summary": summary, "message": "Cleansing selesai" } from pydantic import BaseModel class ProcessRequest(BaseModel): table_name: str job_id: str @app.post("/process") def process_table( payload: ProcessRequest, background: BackgroundTasks ): background.add_task( run_clean_table, payload.table_name, payload.job_id ) return { "status": "ACCEPTED", "job_id": payload.job_id, "table": payload.table_name } # @app.post("/process/{table_name}") # def process_table(table_name: str, background: BackgroundTasks): # job_id = uuid4().hex # background.add_task(run_clean_table, table_name, job_id) # return { # "status": "ACCEPTED", # "job_id": job_id, # "table": table_name # } def run_clean_table(table_name: str, job_id: str): print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===") layer = load_layer(table_name) if not layer.isValid(): print(f"[ERROR] Table '{table_name}' tidak valid.") return result = cleansing_layer(layer) summary = result["summary"] clean_layer = result["clean_layer"] # STEP 1 — simpan hasil ke PostGIS save_to_postgis(clean_layer, table_name) # STEP 2 — kirim callback ke backend utama callback_payload = { "job_id": job_id, "table": table_name, # "summary": summary, "status": "FINISHED" } requests.post( "http://labai.polinema.ac.id:808/dataset/jobs/callback", json=callback_payload ) print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n") def to_python(v): # Null if v is None: return None # QVariant kosong if hasattr(v, "isNull") and v.isNull(): return None # Convert QVariant to Python native if hasattr(v, "toPyObject"): return v.toPyObject() # Fallback return v # def get_postgis_geom_type(layer): # for f in layer.getFeatures(): # g = f.geometry() # if g.isMultipart(): # return QgsWkbTypes.displayString(g.wkbType()).upper() # else: # base = QgsWkbTypes.displayString(g.wkbType()).upper() # if "POINT" in base: # return "MULTIPOINT" # if "LINESTRING" in base: # return "MULTILINESTRING" # if "POLYGON" in base: # return "MULTIPOLYGON" # return "GEOMETRY" def get_postgis_geom_type(layer): has_z = False has_m = False base_type = None # polygon / linestring / point is_multi = False for f in layer.getFeatures(): g = f.geometry() if g.isEmpty(): continue wkb = g.wkbType() # Detect Z & M if QgsWkbTypes.hasZ(wkb): has_z = True if QgsWkbTypes.hasM(wkb): has_m = True # Detect MULTI if QgsWkbTypes.isMultiType(wkb): is_multi = True # Detect base type (polygon / line / point) geom_type = QgsWkbTypes.geometryType(wkb) base_type = geom_type # polygon=2, line=1, point=0 if base_type is None: return "GEOMETRY" # Convert base_type to PostGIS string if base_type == QgsWkbTypes.PointGeometry: base = "POINT" elif base_type == QgsWkbTypes.LineGeometry: base = "LINESTRING" elif base_type == QgsWkbTypes.PolygonGeometry: base = "POLYGON" else: base = "GEOMETRY" # Force MULTI if base != "GEOMETRY": base = "MULTI" + base # Add dimensionality if has_z and has_m: base += "ZM" elif has_z: base += "Z" elif has_m: base += "M" return base def save_to_postgis(layer, table_name): host = HOST port = PORT db = DB user = USER pwd = PWD schema = SCHEMA geom_col = GEOM_COL srid = layer.crs().postgisSrid() fields = layer.fields() # CONNECT conn = psycopg2.connect( dbname=db, host=host, port=port, user=user, password=pwd ) cur = conn.cursor() # DROP TABLE cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table_name}" CASCADE') print(f'Drop table {table_name}') field_defs = [] for f in fields: if f.name() == geom_col: continue t = f.typeName().lower() if "int" in t: pg_type = "INTEGER" elif "double" in t or "float" in t or "real" in t: pg_type = "DOUBLE PRECISION" else: pg_type = "TEXT" col = f.name().replace(" ", "_") field_defs.append(f'"{col}" {pg_type}') # AUTODETECT 2D/3D geometry pg_geom_type = get_postgis_geom_type(layer) print("get type") field_defs.append(f'"{geom_col}" geometry({pg_geom_type},{srid})') create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({",".join(field_defs)});' cur.execute(create_sql) # Prepare INSERT attribute_columns = [ f'"{f.name().replace(" ", "_")}"' for f in fields if f.name() != geom_col ] insert_columns = attribute_columns + [f'"{geom_col}"'] placeholders = ["%s"] * len(insert_columns) insert_sql = f""" INSERT INTO "{schema}"."{table_name}" ({",".join(insert_columns)}) VALUES ({",".join(placeholders)}) """ # INSERT ROWS count = 0 for feat in layer.getFeatures(): attrs = feat.attributes() row = [] for f, v in zip(fields, attrs): if f.name() != geom_col: row.append(to_python(v)) geom = feat.geometry() wkb_bytes = geom.asWkb() # geom_2d = QgsGeometry.fromWkt(geom.asWkt()) # wkb_bytes = geom_2d.asWkb() if isinstance(wkb_bytes, QByteArray): wkb_bytes = bytes(wkb_bytes) row.append(psycopg2.Binary(wkb_bytes)) cur.execute(insert_sql, row) count += 1 conn.commit() cur.close() conn.close() print(f"[DB] Inserted features: {count}")