from fastapi import FastAPI, BackgroundTasks import psycopg2 import requests from uuid import uuid4 from qgis_bootstrap import start_qgis # from cleansing_service import load_layer, cleansing_layer from full_cleansing_service import load_layer, cleansing_layer from qgis.core import ( QgsVectorLayer, QgsVectorLayerExporter, QgsDataSourceUri, QgsProviderRegistry, QgsCoordinateReferenceSystem ) from qgis.PyQt.QtCore import QByteArray from core.config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL app = FastAPI() qgs = start_qgis() @app.get("/") def root(): return {"status": "QGIS Cleansing API Running"} @app.get("/clean/{table_name}") def clean_table(table_name: str): layer = load_layer(table_name) if not layer.isValid(): return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."} print(layer) result = cleansing_layer(layer) summary = result["summary"] return { "table": table_name, "summary": summary, "message": "Cleansing selesai" } @app.post("/process/{table_name}") def process_table(table_name: str, background: BackgroundTasks): job_id = uuid4().hex background.add_task(run_clean_table, table_name, job_id) return { "status": "ACCEPTED", "job_id": job_id, "table": table_name } def run_clean_table(table_name: str, job_id: str): print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===") layer = load_layer(table_name) if not layer.isValid(): print(f"[ERROR] Table '{table_name}' tidak valid.") return result = cleansing_layer(layer) summary = result["summary"] clean_layer = result["clean_layer"] # STEP 1 — simpan hasil ke PostGIS save_to_postgis(clean_layer, table_name) # STEP 2 — kirim callback ke backend utama callback_payload = { "job_id": job_id, "table": table_name, "summary": summary, "status": "FINISHED" } requests.post( "http://localhost:8000/jobs/callback", json=callback_payload ) print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n") def to_python(v): # Null if v is None: return None # QVariant kosong if hasattr(v, "isNull") and v.isNull(): return None # Convert QVariant to Python native if hasattr(v, "toPyObject"): return v.toPyObject() # Fallback return v def save_to_postgis(layer, table_name): host = HOST port = PORT db = DB user = USER pwd = PWD schema = SCHEMA geom_col = GEOM_COL srid = layer.crs().postgisSrid() fields = layer.fields() # CONNECT conn = psycopg2.connect( dbname=db, host=host, port=port, user=user, password=pwd ) cur = conn.cursor() # DROP TABLE cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table_name}" CASCADE') # CREATE TABLE field_defs = [] for f in fields: if f.name() == geom_col: continue # type mapping t = f.typeName().lower() if "int" in t: pg_type = "INTEGER" elif "double" in t or "float" in t or "real" in t: pg_type = "DOUBLE PRECISION" else: pg_type = "TEXT" col = f.name().replace(" ", "_") field_defs.append(f'"{col}" {pg_type}') # geometry column field_defs.append(f'"{geom_col}" geometry(MultiPolygon,{srid})') create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({",".join(field_defs)});' cur.execute(create_sql) # Prepare INSERT attribute_columns = [ f'"{f.name().replace(" ", "_")}"' for f in fields if f.name() != geom_col ] insert_columns = attribute_columns + [f'"{geom_col}"'] placeholders = ["%s"] * len(insert_columns) insert_sql = f""" INSERT INTO "{schema}"."{table_name}" ({",".join(insert_columns)}) VALUES ({",".join(placeholders)}) """ # INSERT ROWS count = 0 for feat in layer.getFeatures(): attrs = feat.attributes() row = [] for f, v in zip(fields, attrs): if f.name() != geom_col: row.append(to_python(v)) geom = feat.geometry() wkb_bytes = geom.asWkb() if isinstance(wkb_bytes, QByteArray): wkb_bytes = bytes(wkb_bytes) row.append(psycopg2.Binary(wkb_bytes)) cur.execute(insert_sql, row) count += 1 conn.commit() cur.close() conn.close() print(f"[DB] Inserted features: {count}")