file_table_reader/main.py

295 lines
6.9 KiB
Python
Raw Normal View History

2025-11-25 08:33:38 +00:00
from fastapi import FastAPI, BackgroundTasks
2025-11-29 04:44:08 +00:00
import psycopg2
import requests
2025-11-26 07:18:46 +00:00
from uuid import uuid4
2025-11-29 04:44:08 +00:00
from qgis_bootstrap import start_qgis
2025-11-25 08:33:38 +00:00
# from cleansing_service import load_layer, cleansing_layer
from full_cleansing_service import load_layer, cleansing_layer
2025-11-29 04:44:08 +00:00
from qgis.core import (
QgsVectorLayer,
QgsVectorLayerExporter,
QgsDataSourceUri,
QgsProviderRegistry,
2025-12-22 08:24:15 +00:00
QgsCoordinateReferenceSystem,
QgsWkbTypes,
QgsGeometry
2025-11-29 04:44:08 +00:00
)
from qgis.PyQt.QtCore import QByteArray
2025-12-01 03:02:48 +00:00
from config import HOST,PORT,DB,USER,PWD,SCHEMA,GEOM_COL
2025-11-29 04:44:08 +00:00
2025-11-25 08:33:38 +00:00
app = FastAPI()
qgs = start_qgis()
@app.get("/")
def root():
return {"status": "QGIS Cleansing API Running"}
@app.get("/clean/{table_name}")
def clean_table(table_name: str):
layer = load_layer(table_name)
if not layer.isValid():
return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."}
print(layer)
result = cleansing_layer(layer)
summary = result["summary"]
return {
"table": table_name,
"summary": summary,
"message": "Cleansing selesai"
}
2025-12-22 08:24:15 +00:00
from pydantic import BaseModel
2025-11-25 08:33:38 +00:00
2025-12-22 08:24:15 +00:00
class ProcessRequest(BaseModel):
table_name: str
job_id: str
@app.post("/process")
def process_table(
payload: ProcessRequest,
background: BackgroundTasks
):
background.add_task(
run_clean_table,
payload.table_name,
payload.job_id
)
2025-11-26 07:18:46 +00:00
return {
"status": "ACCEPTED",
2025-12-22 08:24:15 +00:00
"job_id": payload.job_id,
"table": payload.table_name
2025-11-26 07:18:46 +00:00
}
2025-11-25 08:33:38 +00:00
2025-12-22 08:24:15 +00:00
# @app.post("/process/{table_name}")
# def process_table(table_name: str, background: BackgroundTasks):
# job_id = uuid4().hex
# background.add_task(run_clean_table, table_name, job_id)
# return {
# "status": "ACCEPTED",
# "job_id": job_id,
# "table": table_name
# }
2025-11-25 08:33:38 +00:00
2025-11-26 07:18:46 +00:00
def run_clean_table(table_name: str, job_id: str):
2025-11-25 08:33:38 +00:00
print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===")
layer = load_layer(table_name)
if not layer.isValid():
2025-11-26 07:18:46 +00:00
print(f"[ERROR] Table '{table_name}' tidak valid.")
2025-11-25 08:33:38 +00:00
return
result = cleansing_layer(layer)
summary = result["summary"]
clean_layer = result["clean_layer"]
2025-11-26 07:18:46 +00:00
# STEP 1 — simpan hasil ke PostGIS
save_to_postgis(clean_layer, table_name)
# STEP 2 — kirim callback ke backend utama
callback_payload = {
"job_id": job_id,
"table": table_name,
2025-12-01 03:02:48 +00:00
# "summary": summary,
2025-11-26 07:18:46 +00:00
"status": "FINISHED"
}
2025-11-25 08:33:38 +00:00
2025-11-26 07:18:46 +00:00
requests.post(
2025-12-22 08:24:15 +00:00
"http://localhost:8000/dataset/jobs/callback",
2025-11-26 07:18:46 +00:00
json=callback_payload
)
2025-11-25 08:33:38 +00:00
print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n")
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
def to_python(v):
# Null
if v is None:
return None
# QVariant kosong
if hasattr(v, "isNull") and v.isNull():
return None
# Convert QVariant to Python native
if hasattr(v, "toPyObject"):
return v.toPyObject()
# Fallback
return v
2025-12-22 08:24:15 +00:00
# def get_postgis_geom_type(layer):
# for f in layer.getFeatures():
# g = f.geometry()
# if g.isMultipart():
# return QgsWkbTypes.displayString(g.wkbType()).upper()
# else:
# base = QgsWkbTypes.displayString(g.wkbType()).upper()
# if "POINT" in base:
# return "MULTIPOINT"
# if "LINESTRING" in base:
# return "MULTILINESTRING"
# if "POLYGON" in base:
# return "MULTIPOLYGON"
# return "GEOMETRY"
def get_postgis_geom_type(layer):
has_z = False
has_m = False
base_type = None # polygon / linestring / point
is_multi = False
for f in layer.getFeatures():
g = f.geometry()
if g.isEmpty():
continue
wkb = g.wkbType()
# Detect Z & M
if QgsWkbTypes.hasZ(wkb):
has_z = True
if QgsWkbTypes.hasM(wkb):
has_m = True
# Detect MULTI
if QgsWkbTypes.isMultiType(wkb):
is_multi = True
# Detect base type (polygon / line / point)
geom_type = QgsWkbTypes.geometryType(wkb)
base_type = geom_type # polygon=2, line=1, point=0
if base_type is None:
return "GEOMETRY"
# Convert base_type to PostGIS string
if base_type == QgsWkbTypes.PointGeometry:
base = "POINT"
elif base_type == QgsWkbTypes.LineGeometry:
base = "LINESTRING"
elif base_type == QgsWkbTypes.PolygonGeometry:
base = "POLYGON"
else:
base = "GEOMETRY"
# Force MULTI
if base != "GEOMETRY":
base = "MULTI" + base
# Add dimensionality
if has_z and has_m:
base += "ZM"
elif has_z:
base += "Z"
elif has_m:
base += "M"
return base
2025-11-29 04:44:08 +00:00
def save_to_postgis(layer, table_name):
host = HOST
port = PORT
db = DB
user = USER
pwd = PWD
schema = SCHEMA
geom_col = GEOM_COL
srid = layer.crs().postgisSrid()
fields = layer.fields()
# CONNECT
conn = psycopg2.connect(
dbname=db,
host=host,
port=port,
user=user,
password=pwd
)
cur = conn.cursor()
# DROP TABLE
cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table_name}" CASCADE')
2025-12-22 08:24:15 +00:00
print(f'Drop table {table_name}')
2025-11-29 04:44:08 +00:00
field_defs = []
for f in fields:
if f.name() == geom_col:
continue
t = f.typeName().lower()
if "int" in t:
pg_type = "INTEGER"
elif "double" in t or "float" in t or "real" in t:
pg_type = "DOUBLE PRECISION"
else:
pg_type = "TEXT"
col = f.name().replace(" ", "_")
field_defs.append(f'"{col}" {pg_type}')
2025-12-22 08:24:15 +00:00
# AUTODETECT 2D/3D geometry
pg_geom_type = get_postgis_geom_type(layer)
print("get type")
field_defs.append(f'"{geom_col}" geometry({pg_geom_type},{srid})')
2025-11-29 04:44:08 +00:00
create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({",".join(field_defs)});'
cur.execute(create_sql)
2025-12-22 08:24:15 +00:00
2025-11-29 04:44:08 +00:00
# Prepare INSERT
attribute_columns = [
f'"{f.name().replace(" ", "_")}"'
for f in fields if f.name() != geom_col
]
insert_columns = attribute_columns + [f'"{geom_col}"']
placeholders = ["%s"] * len(insert_columns)
insert_sql = f"""
INSERT INTO "{schema}"."{table_name}"
({",".join(insert_columns)})
VALUES ({",".join(placeholders)})
"""
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
# INSERT ROWS
count = 0
for feat in layer.getFeatures():
attrs = feat.attributes()
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
row = []
for f, v in zip(fields, attrs):
if f.name() != geom_col:
row.append(to_python(v))
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
geom = feat.geometry()
wkb_bytes = geom.asWkb()
2025-12-22 08:24:15 +00:00
# geom_2d = QgsGeometry.fromWkt(geom.asWkt())
# wkb_bytes = geom_2d.asWkb()
2025-11-29 04:44:08 +00:00
if isinstance(wkb_bytes, QByteArray):
wkb_bytes = bytes(wkb_bytes)
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
row.append(psycopg2.Binary(wkb_bytes))
cur.execute(insert_sql, row)
count += 1
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
conn.commit()
cur.close()
conn.close()
2025-11-26 07:18:46 +00:00
2025-11-29 04:44:08 +00:00
print(f"[DB] Inserted features: {count}")
2025-11-26 07:18:46 +00:00