file_table_reader/main.py

195 lines
4.5 KiB
Python
Raw Normal View History

2025-11-25 08:33:38 +00:00
from fastapi import FastAPI, BackgroundTasks
2025-11-29 04:44:08 +00:00
import psycopg2
import requests
2025-11-26 07:18:46 +00:00
from uuid import uuid4
2025-11-29 04:44:08 +00:00
from qgis_bootstrap import start_qgis
2025-11-25 08:33:38 +00:00
# from cleansing_service import load_layer, cleansing_layer
from full_cleansing_service import load_layer, cleansing_layer
2025-11-29 04:44:08 +00:00
from qgis.core import (
QgsVectorLayer,
QgsVectorLayerExporter,
QgsDataSourceUri,
QgsProviderRegistry,
QgsCoordinateReferenceSystem
)
from qgis.PyQt.QtCore import QByteArray
2025-12-01 03:02:48 +00:00
from config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL
2025-11-29 04:44:08 +00:00
2025-11-25 08:33:38 +00:00
app = FastAPI()
qgs = start_qgis()
@app.get("/")
def root():
return {"status": "QGIS Cleansing API Running"}
@app.get("/clean/{table_name}")
def clean_table(table_name: str):
layer = load_layer(table_name)
if not layer.isValid():
return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."}
print(layer)
result = cleansing_layer(layer)
summary = result["summary"]
return {
"table": table_name,
"summary": summary,
"message": "Cleansing selesai"
}
@app.post("/process/{table_name}")
def process_table(table_name: str, background: BackgroundTasks):
2025-11-26 07:18:46 +00:00
job_id = uuid4().hex
background.add_task(run_clean_table, table_name, job_id)
return {
"status": "ACCEPTED",
"job_id": job_id,
"table": table_name
}
2025-11-25 08:33:38 +00:00
2025-11-26 07:18:46 +00:00
def run_clean_table(table_name: str, job_id: str):
2025-11-25 08:33:38 +00:00
print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===")
layer = load_layer(table_name)
if not layer.isValid():
2025-11-26 07:18:46 +00:00
print(f"[ERROR] Table '{table_name}' tidak valid.")
2025-11-25 08:33:38 +00:00
return
result = cleansing_layer(layer)
summary = result["summary"]
clean_layer = result["clean_layer"]
2025-11-26 07:18:46 +00:00
# STEP 1 — simpan hasil ke PostGIS
save_to_postgis(clean_layer, table_name)
# STEP 2 — kirim callback ke backend utama
callback_payload = {
"job_id": job_id,
"table": table_name,
2025-12-01 03:02:48 +00:00
# "summary": summary,
2025-11-26 07:18:46 +00:00
"status": "FINISHED"
}
2025-11-25 08:33:38 +00:00
2025-11-26 07:18:46 +00:00
requests.post(
2025-11-29 04:44:08 +00:00
"http://localhost:8000/jobs/callback",
2025-11-26 07:18:46 +00:00
json=callback_payload
)
2025-11-25 08:33:38 +00:00
print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n")
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
def to_python(v):
# Null
if v is None:
return None
# QVariant kosong
if hasattr(v, "isNull") and v.isNull():
return None
# Convert QVariant to Python native
if hasattr(v, "toPyObject"):
return v.toPyObject()
# Fallback
return v
def save_to_postgis(layer, table_name):
host = HOST
port = PORT
db = DB
user = USER
pwd = PWD
schema = SCHEMA
geom_col = GEOM_COL
srid = layer.crs().postgisSrid()
fields = layer.fields()
# CONNECT
conn = psycopg2.connect(
dbname=db,
host=host,
port=port,
user=user,
password=pwd
)
cur = conn.cursor()
# DROP TABLE
cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table_name}" CASCADE')
# CREATE TABLE
field_defs = []
for f in fields:
if f.name() == geom_col:
continue
# type mapping
t = f.typeName().lower()
if "int" in t:
pg_type = "INTEGER"
elif "double" in t or "float" in t or "real" in t:
pg_type = "DOUBLE PRECISION"
else:
pg_type = "TEXT"
col = f.name().replace(" ", "_")
field_defs.append(f'"{col}" {pg_type}')
# geometry column
field_defs.append(f'"{geom_col}" geometry(MultiPolygon,{srid})')
create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({",".join(field_defs)});'
cur.execute(create_sql)
# Prepare INSERT
attribute_columns = [
f'"{f.name().replace(" ", "_")}"'
for f in fields if f.name() != geom_col
]
insert_columns = attribute_columns + [f'"{geom_col}"']
placeholders = ["%s"] * len(insert_columns)
insert_sql = f"""
INSERT INTO "{schema}"."{table_name}"
({",".join(insert_columns)})
VALUES ({",".join(placeholders)})
"""
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
# INSERT ROWS
count = 0
for feat in layer.getFeatures():
attrs = feat.attributes()
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
row = []
for f, v in zip(fields, attrs):
if f.name() != geom_col:
row.append(to_python(v))
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
geom = feat.geometry()
wkb_bytes = geom.asWkb()
if isinstance(wkb_bytes, QByteArray):
wkb_bytes = bytes(wkb_bytes)
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
row.append(psycopg2.Binary(wkb_bytes))
cur.execute(insert_sql, row)
count += 1
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
conn.commit()
cur.close()
conn.close()
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
print(f"[DB] Inserted features: {count}")
2025-11-26 07:18:46 +00:00