file_table_reader/main.py
2025-11-29 11:44:08 +07:00

195 lines
4.5 KiB
Python

from fastapi import FastAPI, BackgroundTasks
import psycopg2
import requests
from uuid import uuid4
from qgis_bootstrap import start_qgis
# from cleansing_service import load_layer, cleansing_layer
from full_cleansing_service import load_layer, cleansing_layer
from qgis.core import (
QgsVectorLayer,
QgsVectorLayerExporter,
QgsDataSourceUri,
QgsProviderRegistry,
QgsCoordinateReferenceSystem
)
from qgis.PyQt.QtCore import QByteArray
from core.config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL
app = FastAPI()
qgs = start_qgis()
@app.get("/")
def root():
return {"status": "QGIS Cleansing API Running"}
@app.get("/clean/{table_name}")
def clean_table(table_name: str):
layer = load_layer(table_name)
if not layer.isValid():
return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."}
print(layer)
result = cleansing_layer(layer)
summary = result["summary"]
return {
"table": table_name,
"summary": summary,
"message": "Cleansing selesai"
}
@app.post("/process/{table_name}")
def process_table(table_name: str, background: BackgroundTasks):
job_id = uuid4().hex
background.add_task(run_clean_table, table_name, job_id)
return {
"status": "ACCEPTED",
"job_id": job_id,
"table": table_name
}
def run_clean_table(table_name: str, job_id: str):
print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===")
layer = load_layer(table_name)
if not layer.isValid():
print(f"[ERROR] Table '{table_name}' tidak valid.")
return
result = cleansing_layer(layer)
summary = result["summary"]
clean_layer = result["clean_layer"]
# STEP 1 — simpan hasil ke PostGIS
save_to_postgis(clean_layer, table_name)
# STEP 2 — kirim callback ke backend utama
callback_payload = {
"job_id": job_id,
"table": table_name,
"summary": summary,
"status": "FINISHED"
}
requests.post(
"http://localhost:8000/jobs/callback",
json=callback_payload
)
print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n")
def to_python(v):
# Null
if v is None:
return None
# QVariant kosong
if hasattr(v, "isNull") and v.isNull():
return None
# Convert QVariant to Python native
if hasattr(v, "toPyObject"):
return v.toPyObject()
# Fallback
return v
def save_to_postgis(layer, table_name):
host = HOST
port = PORT
db = DB
user = USER
pwd = PWD
schema = SCHEMA
geom_col = GEOM_COL
srid = layer.crs().postgisSrid()
fields = layer.fields()
# CONNECT
conn = psycopg2.connect(
dbname=db,
host=host,
port=port,
user=user,
password=pwd
)
cur = conn.cursor()
# DROP TABLE
cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table_name}" CASCADE')
# CREATE TABLE
field_defs = []
for f in fields:
if f.name() == geom_col:
continue
# type mapping
t = f.typeName().lower()
if "int" in t:
pg_type = "INTEGER"
elif "double" in t or "float" in t or "real" in t:
pg_type = "DOUBLE PRECISION"
else:
pg_type = "TEXT"
col = f.name().replace(" ", "_")
field_defs.append(f'"{col}" {pg_type}')
# geometry column
field_defs.append(f'"{geom_col}" geometry(MultiPolygon,{srid})')
create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({",".join(field_defs)});'
cur.execute(create_sql)
# Prepare INSERT
attribute_columns = [
f'"{f.name().replace(" ", "_")}"'
for f in fields if f.name() != geom_col
]
insert_columns = attribute_columns + [f'"{geom_col}"']
placeholders = ["%s"] * len(insert_columns)
insert_sql = f"""
INSERT INTO "{schema}"."{table_name}"
({",".join(insert_columns)})
VALUES ({",".join(placeholders)})
"""
# INSERT ROWS
count = 0
for feat in layer.getFeatures():
attrs = feat.attributes()
row = []
for f, v in zip(fields, attrs):
if f.name() != geom_col:
row.append(to_python(v))
geom = feat.geometry()
wkb_bytes = geom.asWkb()
if isinstance(wkb_bytes, QByteArray):
wkb_bytes = bytes(wkb_bytes)
row.append(psycopg2.Binary(wkb_bytes))
cur.execute(insert_sql, row)
count += 1
conn.commit()
cur.close()
conn.close()
print(f"[DB] Inserted features: {count}")