2025-11-25 08:33:38 +00:00
|
|
|
from fastapi import FastAPI, BackgroundTasks
|
2025-11-29 04:44:08 +00:00
|
|
|
import psycopg2
|
|
|
|
|
import requests
|
2025-11-26 07:18:46 +00:00
|
|
|
from uuid import uuid4
|
2025-11-29 04:44:08 +00:00
|
|
|
from qgis_bootstrap import start_qgis
|
2025-11-25 08:33:38 +00:00
|
|
|
# from cleansing_service import load_layer, cleansing_layer
|
|
|
|
|
from full_cleansing_service import load_layer, cleansing_layer
|
2025-11-29 04:44:08 +00:00
|
|
|
from qgis.core import (
|
|
|
|
|
QgsVectorLayer,
|
|
|
|
|
QgsVectorLayerExporter,
|
|
|
|
|
QgsDataSourceUri,
|
|
|
|
|
QgsProviderRegistry,
|
|
|
|
|
QgsCoordinateReferenceSystem
|
|
|
|
|
)
|
|
|
|
|
from qgis.PyQt.QtCore import QByteArray
|
2025-12-01 03:02:48 +00:00
|
|
|
from config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL
|
2025-11-29 04:44:08 +00:00
|
|
|
|
2025-11-25 08:33:38 +00:00
|
|
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
|
|
|
|
qgs = start_qgis()
|
|
|
|
|
|
|
|
|
|
@app.get("/")
|
|
|
|
|
def root():
|
|
|
|
|
return {"status": "QGIS Cleansing API Running"}
|
|
|
|
|
|
|
|
|
|
@app.get("/clean/{table_name}")
|
|
|
|
|
def clean_table(table_name: str):
|
|
|
|
|
|
|
|
|
|
layer = load_layer(table_name)
|
|
|
|
|
if not layer.isValid():
|
|
|
|
|
return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."}
|
|
|
|
|
|
|
|
|
|
print(layer)
|
|
|
|
|
result = cleansing_layer(layer)
|
|
|
|
|
|
|
|
|
|
summary = result["summary"]
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"table": table_name,
|
|
|
|
|
"summary": summary,
|
|
|
|
|
"message": "Cleansing selesai"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/process/{table_name}")
|
|
|
|
|
def process_table(table_name: str, background: BackgroundTasks):
|
2025-11-26 07:18:46 +00:00
|
|
|
job_id = uuid4().hex
|
|
|
|
|
background.add_task(run_clean_table, table_name, job_id)
|
|
|
|
|
return {
|
|
|
|
|
"status": "ACCEPTED",
|
|
|
|
|
"job_id": job_id,
|
|
|
|
|
"table": table_name
|
|
|
|
|
}
|
2025-11-25 08:33:38 +00:00
|
|
|
|
|
|
|
|
|
2025-11-26 07:18:46 +00:00
|
|
|
|
|
|
|
|
def run_clean_table(table_name: str, job_id: str):
|
2025-11-25 08:33:38 +00:00
|
|
|
print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===")
|
|
|
|
|
|
|
|
|
|
layer = load_layer(table_name)
|
|
|
|
|
if not layer.isValid():
|
2025-11-26 07:18:46 +00:00
|
|
|
print(f"[ERROR] Table '{table_name}' tidak valid.")
|
2025-11-25 08:33:38 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
result = cleansing_layer(layer)
|
|
|
|
|
summary = result["summary"]
|
|
|
|
|
clean_layer = result["clean_layer"]
|
|
|
|
|
|
2025-11-26 07:18:46 +00:00
|
|
|
# STEP 1 — simpan hasil ke PostGIS
|
|
|
|
|
save_to_postgis(clean_layer, table_name)
|
|
|
|
|
|
|
|
|
|
# STEP 2 — kirim callback ke backend utama
|
|
|
|
|
callback_payload = {
|
|
|
|
|
"job_id": job_id,
|
|
|
|
|
"table": table_name,
|
2025-12-01 03:02:48 +00:00
|
|
|
# "summary": summary,
|
2025-11-26 07:18:46 +00:00
|
|
|
"status": "FINISHED"
|
|
|
|
|
}
|
2025-11-25 08:33:38 +00:00
|
|
|
|
2025-11-26 07:18:46 +00:00
|
|
|
requests.post(
|
2025-11-29 04:44:08 +00:00
|
|
|
"http://localhost:8000/jobs/callback",
|
2025-11-26 07:18:46 +00:00
|
|
|
json=callback_payload
|
|
|
|
|
)
|
2025-11-25 08:33:38 +00:00
|
|
|
|
|
|
|
|
print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n")
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
def to_python(v):
|
|
|
|
|
# Null
|
|
|
|
|
if v is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# QVariant kosong
|
|
|
|
|
if hasattr(v, "isNull") and v.isNull():
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Convert QVariant to Python native
|
|
|
|
|
if hasattr(v, "toPyObject"):
|
|
|
|
|
return v.toPyObject()
|
|
|
|
|
|
|
|
|
|
# Fallback
|
|
|
|
|
return v
|
|
|
|
|
|
|
|
|
|
def save_to_postgis(layer, table_name):
|
|
|
|
|
host = HOST
|
|
|
|
|
port = PORT
|
|
|
|
|
db = DB
|
|
|
|
|
user = USER
|
|
|
|
|
pwd = PWD
|
|
|
|
|
schema = SCHEMA
|
|
|
|
|
geom_col = GEOM_COL
|
|
|
|
|
|
|
|
|
|
srid = layer.crs().postgisSrid()
|
|
|
|
|
fields = layer.fields()
|
|
|
|
|
|
|
|
|
|
# CONNECT
|
|
|
|
|
conn = psycopg2.connect(
|
|
|
|
|
dbname=db,
|
|
|
|
|
host=host,
|
|
|
|
|
port=port,
|
|
|
|
|
user=user,
|
|
|
|
|
password=pwd
|
|
|
|
|
)
|
|
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
|
|
|
|
# DROP TABLE
|
|
|
|
|
cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table_name}" CASCADE')
|
|
|
|
|
|
|
|
|
|
# CREATE TABLE
|
|
|
|
|
field_defs = []
|
|
|
|
|
for f in fields:
|
|
|
|
|
if f.name() == geom_col:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# type mapping
|
|
|
|
|
t = f.typeName().lower()
|
|
|
|
|
if "int" in t:
|
|
|
|
|
pg_type = "INTEGER"
|
|
|
|
|
elif "double" in t or "float" in t or "real" in t:
|
|
|
|
|
pg_type = "DOUBLE PRECISION"
|
|
|
|
|
else:
|
|
|
|
|
pg_type = "TEXT"
|
|
|
|
|
|
|
|
|
|
col = f.name().replace(" ", "_")
|
|
|
|
|
field_defs.append(f'"{col}" {pg_type}')
|
|
|
|
|
|
|
|
|
|
# geometry column
|
|
|
|
|
field_defs.append(f'"{geom_col}" geometry(MultiPolygon,{srid})')
|
|
|
|
|
|
|
|
|
|
create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({",".join(field_defs)});'
|
|
|
|
|
cur.execute(create_sql)
|
|
|
|
|
|
|
|
|
|
# Prepare INSERT
|
|
|
|
|
attribute_columns = [
|
|
|
|
|
f'"{f.name().replace(" ", "_")}"'
|
|
|
|
|
for f in fields if f.name() != geom_col
|
|
|
|
|
]
|
|
|
|
|
insert_columns = attribute_columns + [f'"{geom_col}"']
|
|
|
|
|
placeholders = ["%s"] * len(insert_columns)
|
|
|
|
|
|
|
|
|
|
insert_sql = f"""
|
|
|
|
|
INSERT INTO "{schema}"."{table_name}"
|
|
|
|
|
({",".join(insert_columns)})
|
|
|
|
|
VALUES ({",".join(placeholders)})
|
|
|
|
|
"""
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
# INSERT ROWS
|
|
|
|
|
count = 0
|
|
|
|
|
for feat in layer.getFeatures():
|
|
|
|
|
attrs = feat.attributes()
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
row = []
|
|
|
|
|
for f, v in zip(fields, attrs):
|
|
|
|
|
if f.name() != geom_col:
|
|
|
|
|
row.append(to_python(v))
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
geom = feat.geometry()
|
|
|
|
|
wkb_bytes = geom.asWkb()
|
|
|
|
|
if isinstance(wkb_bytes, QByteArray):
|
|
|
|
|
wkb_bytes = bytes(wkb_bytes)
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
row.append(psycopg2.Binary(wkb_bytes))
|
|
|
|
|
cur.execute(insert_sql, row)
|
|
|
|
|
count += 1
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
conn.commit()
|
|
|
|
|
cur.close()
|
|
|
|
|
conn.close()
|
2025-11-26 07:18:46 +00:00
|
|
|
|
2025-11-29 04:44:08 +00:00
|
|
|
print(f"[DB] Inserted features: {count}")
|
2025-11-26 07:18:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|