844 lines
31 KiB
Python
844 lines
31 KiB
Python
|
|
import json
|
||
|
|
import os
|
||
|
|
import random
|
||
|
|
import subprocess
|
||
|
|
import uuid
|
||
|
|
import asyncpg
|
||
|
|
import pandas as pd
|
||
|
|
import geopandas as gpd
|
||
|
|
import numpy as np
|
||
|
|
import re
|
||
|
|
import zipfile
|
||
|
|
import tempfile
|
||
|
|
import asyncio
|
||
|
|
from pyproj import CRS
|
||
|
|
from shapely.geometry.base import BaseGeometry
|
||
|
|
from shapely.geometry import base as shapely_base
|
||
|
|
from fastapi import Depends, File, Form, UploadFile, HTTPException
|
||
|
|
from api.routers.datasets_router import cleansing_data, publish_layer, query_cleansing_data, upload_to_main
|
||
|
|
from core.config import DB_DSN, DB_HOST, DB_NAME, DB_PASS, DB_PORT, DB_USER, UPLOAD_FOLDER, MAX_FILE_MB, GEONETWORK_URL
|
||
|
|
from services.upload_file.ai_generate import send_metadata
|
||
|
|
from services.upload_file.readers.reader_csv import read_csv
|
||
|
|
from services.upload_file.readers.reader_shp import read_shp
|
||
|
|
from services.upload_file.readers.reader_gdb import read_gdb
|
||
|
|
from services.upload_file.readers.reader_mpk import read_mpk
|
||
|
|
from services.upload_file.readers.reader_pdf import convert_df, read_pdf
|
||
|
|
from services.upload_file.utils.df_validation import process_dataframe_synchronous
|
||
|
|
from services.upload_file.utils.geometry_detector import detect_and_build_geometry, attach_polygon_geometry_auto
|
||
|
|
from database.connection import engine, sync_engine
|
||
|
|
from pydantic import BaseModel
|
||
|
|
from typing import Any, Dict, List, Optional
|
||
|
|
from shapely import MultiLineString, MultiPolygon, wkt
|
||
|
|
from sqlalchemy import text
|
||
|
|
from datetime import datetime
|
||
|
|
from response import successRes, errorRes
|
||
|
|
from utils.logger_config import log_activity
|
||
|
|
# Base.metadata.create_all(bind=engine)
|
||
|
|
|
||
|
|
|
||
|
|
def is_geom_empty(g):
|
||
|
|
if g is None:
|
||
|
|
return True
|
||
|
|
if isinstance(g, float) and pd.isna(g):
|
||
|
|
return True
|
||
|
|
if isinstance(g, BaseGeometry):
|
||
|
|
return g.is_empty
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def safe_json(value):
|
||
|
|
"""Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable"""
|
||
|
|
if isinstance(value, (np.int64, np.int32)):
|
||
|
|
return int(value)
|
||
|
|
if isinstance(value, (np.float64, np.float32)):
|
||
|
|
return float(value)
|
||
|
|
if isinstance(value, pd.Timestamp):
|
||
|
|
return value.isoformat()
|
||
|
|
if isinstance(value, shapely_base.BaseGeometry):
|
||
|
|
return str(value) # convert to WKT string
|
||
|
|
if pd.isna(value):
|
||
|
|
return None
|
||
|
|
return value
|
||
|
|
|
||
|
|
|
||
|
|
def detect_zip_type(zip_path: str) -> str:
|
||
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
||
|
|
files = zip_ref.namelist()
|
||
|
|
|
||
|
|
if any(f.lower().endswith(".gdb/") or ".gdb/" in f.lower() for f in files):
|
||
|
|
return "gdb"
|
||
|
|
|
||
|
|
if any(f.lower().endswith(ext) for ext in [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files):
|
||
|
|
return "gdb"
|
||
|
|
|
||
|
|
if any(f.lower().endswith(".shp") for f in files):
|
||
|
|
return "shp"
|
||
|
|
|
||
|
|
return "unknown"
|
||
|
|
|
||
|
|
|
||
|
|
async def process_data(df: pd.DataFrame, ext: str, filename: str, fileDesc: str):
|
||
|
|
result = detect_and_build_geometry(df, master_polygons=None)
|
||
|
|
|
||
|
|
if not hasattr(result, "geometry") or result.geometry.isna().all():
|
||
|
|
result = attach_polygon_geometry_auto(result)
|
||
|
|
|
||
|
|
# if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns:
|
||
|
|
# geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \
|
||
|
|
# if not result.empty else "None"
|
||
|
|
|
||
|
|
# null_geom = result.geometry.isna().sum()
|
||
|
|
|
||
|
|
def normalize_geom_type(geom_type):
|
||
|
|
if geom_type.startswith("Multi"):
|
||
|
|
return geom_type.replace("Multi", "")
|
||
|
|
return geom_type
|
||
|
|
|
||
|
|
if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns:
|
||
|
|
geom_types = (
|
||
|
|
result.geometry
|
||
|
|
.dropna()
|
||
|
|
.geom_type
|
||
|
|
.apply(normalize_geom_type)
|
||
|
|
.unique()
|
||
|
|
)
|
||
|
|
|
||
|
|
geom_type = geom_types[0] if len(geom_types) > 0 else "None"
|
||
|
|
null_geom = result.geometry.isna().sum()
|
||
|
|
|
||
|
|
print(f"[INFO] Tipe Geometry: {geom_type}")
|
||
|
|
print(f"[INFO] Jumlah geometry kosong: {null_geom}")
|
||
|
|
else:
|
||
|
|
res = {
|
||
|
|
"message": "Tidak menemukan tabel yang relevan.",
|
||
|
|
"file_type": ext,
|
||
|
|
"rows": 0,
|
||
|
|
"columns": 0,
|
||
|
|
"geometry_valid": 0,
|
||
|
|
"geometry_empty": 0,
|
||
|
|
"geometry_valid_percent": 0,
|
||
|
|
"warnings": [],
|
||
|
|
"warning_examples": [],
|
||
|
|
"preview": []
|
||
|
|
}
|
||
|
|
|
||
|
|
return errorRes(message="Tidak berhasil mencocokan geometry pada tabel." ,details=res, status_code=422)
|
||
|
|
|
||
|
|
result = result.replace([pd.NA, float('inf'), float('-inf')], None)
|
||
|
|
if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns:
|
||
|
|
result['geometry'] = result['geometry'].apply(
|
||
|
|
lambda g: g.wkt if g is not None else None
|
||
|
|
)
|
||
|
|
|
||
|
|
empty_count = result['geometry'].apply(is_geom_empty).sum()
|
||
|
|
valid_count = len(result) - empty_count
|
||
|
|
match_percentage = (valid_count / len(result)) * 100
|
||
|
|
|
||
|
|
warnings = []
|
||
|
|
if empty_count > 0:
|
||
|
|
warnings.append(
|
||
|
|
f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid "
|
||
|
|
f"({100 - match_percentage:.2f}% data gagal cocok)."
|
||
|
|
)
|
||
|
|
|
||
|
|
if empty_count > 0:
|
||
|
|
examples = result[result['geometry'].apply(is_geom_empty)].head(500)
|
||
|
|
warning_examples = examples.to_dict(orient="records")
|
||
|
|
else:
|
||
|
|
warning_examples = []
|
||
|
|
|
||
|
|
# preview_data = result.head(15).to_dict(orient="records")
|
||
|
|
preview_data = result.to_dict(orient="records")
|
||
|
|
|
||
|
|
preview_safe = [
|
||
|
|
{k: safe_json(v) for k, v in row.items()} for row in preview_data
|
||
|
|
]
|
||
|
|
|
||
|
|
warning_safe = [
|
||
|
|
{k: safe_json(v) for k, v in row.items()} for row in warning_examples
|
||
|
|
]
|
||
|
|
|
||
|
|
ai_context = {
|
||
|
|
"nama_file_peta": filename,
|
||
|
|
"nama_opd": "Badan Penanggulangan Bencana Daerah (BPBD) Provinsi Jatim",
|
||
|
|
"tipe_data_spasial": geom_type,
|
||
|
|
"deskripsi_singkat": fileDesc,
|
||
|
|
"struktur_atribut_data": {},
|
||
|
|
# "metadata": {
|
||
|
|
# "judul": "",
|
||
|
|
# "abstrak": "",
|
||
|
|
# "tujuan": "",
|
||
|
|
# "keyword": [],
|
||
|
|
# "kategori": [],
|
||
|
|
# "kategori_mapset": ""
|
||
|
|
# }
|
||
|
|
}
|
||
|
|
ai_suggest = send_metadata(ai_context)
|
||
|
|
# ai_suggest = {'judul': 'Peta Risiko Letusan Gunung Arjuna di Provinsi Jawa Timur', 'abstrak': 'Peta ini menggambarkan wilayah berisiko letusan Gunung Arjuna yang berada di Provinsi Jawa Timur. Data disajikan dalam bentuk poligon yang menunjukkan zona risiko berdasarkan analisis potensi aktivitas vulkanik.', 'tujuan': 'Data dapat digunakan untuk perencanaan mitigasi bencana dan pengambilan keputusan di wilayah Jawa Timur.', 'keyword': ['Risiko letusan', 'Gunung Arjuna', 'Bencana alam', 'Provinsi Jawa Timur', 'Geologi'], 'kategori': ['Geoscientific information', 'Environment'], 'kategori_mapset': 'Lingkungan Hidup'}
|
||
|
|
# print(ai_suggest)
|
||
|
|
|
||
|
|
tmp_file = generate_unique_filename()
|
||
|
|
# tmp_file = f"tmp/{filename}.parquet"
|
||
|
|
|
||
|
|
# export_df = result.copy()
|
||
|
|
# export_df["geom"] = export_df["geometry"].apply(wkt.loads)
|
||
|
|
# export_df = export_df.drop(columns=["geometry"])
|
||
|
|
# export_df = export_df.set_geometry("geom")
|
||
|
|
# export_df = export_df.set_crs("EPSG:4326")
|
||
|
|
# export_df = export_df.rename(
|
||
|
|
# columns=lambda c: c.upper() if c != "geom" else c
|
||
|
|
# )
|
||
|
|
# await asyncio.to_thread(export_df.to_parquet, tmp_file)
|
||
|
|
await asyncio.to_thread(process_dataframe_synchronous, result, tmp_file)
|
||
|
|
|
||
|
|
response = {
|
||
|
|
"message": "File berhasil dibaca dan dianalisis.",
|
||
|
|
"file_name": filename,
|
||
|
|
"file_type": ext,
|
||
|
|
"rows": int(len(result)),
|
||
|
|
"columns": list(map(str, result.columns)),
|
||
|
|
"geometry_valid": int(valid_count),
|
||
|
|
"geometry_empty": int(empty_count),
|
||
|
|
"geometry_valid_percent": float(round(match_percentage, 2)),
|
||
|
|
"geometry_type": geom_type,
|
||
|
|
"warnings": warnings,
|
||
|
|
"warning_rows": warning_safe,
|
||
|
|
"preview": preview_safe,
|
||
|
|
"metadata_suggest": ai_suggest,
|
||
|
|
"tmp_path": tmp_file
|
||
|
|
}
|
||
|
|
|
||
|
|
# return successRes(content=response)
|
||
|
|
return response
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
async def handle_upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form(""), fileDesc: Optional[str] = Form("")):
|
||
|
|
fname = file.filename
|
||
|
|
ext = os.path.splitext(fname)[1].lower()
|
||
|
|
contents = await file.read()
|
||
|
|
size_mb = len(contents) / (1024*1024)
|
||
|
|
if size_mb > MAX_FILE_MB:
|
||
|
|
raise errorRes(status_code=413, message="Ukuran File Terlalu Besar")
|
||
|
|
tmp_path = UPLOAD_FOLDER / fname
|
||
|
|
with open(tmp_path, "wb") as f:
|
||
|
|
f.write(contents)
|
||
|
|
try:
|
||
|
|
df = None
|
||
|
|
print('ext', ext)
|
||
|
|
|
||
|
|
if ext == ".csv":
|
||
|
|
df = read_csv(str(tmp_path))
|
||
|
|
elif ext == ".xlsx":
|
||
|
|
df = read_csv(str(tmp_path), sheet)
|
||
|
|
elif ext == ".mpk":
|
||
|
|
df = read_mpk(str(tmp_path))
|
||
|
|
elif ext == ".pdf":
|
||
|
|
tbl = read_pdf(tmp_path, page)
|
||
|
|
if len(tbl) == 0:
|
||
|
|
res = {
|
||
|
|
"message": "Tidak ditemukan tabel valid pada halaman yang dipilih",
|
||
|
|
"tables": {},
|
||
|
|
"file_type": ext
|
||
|
|
}
|
||
|
|
return successRes(message="Tidak ditemukan tabel valid pada halaman yang dipilih", data=res)
|
||
|
|
elif len(tbl) > 1:
|
||
|
|
res = {
|
||
|
|
"message": "File berhasil dibaca dan dianalisis.",
|
||
|
|
"tables": tbl,
|
||
|
|
"file_type": ext
|
||
|
|
}
|
||
|
|
return successRes(data=res, message="File berhasil dibaca dan dianalisis.")
|
||
|
|
else:
|
||
|
|
df = convert_df(tbl[0])
|
||
|
|
elif ext == ".zip":
|
||
|
|
zip_type = detect_zip_type(str(tmp_path))
|
||
|
|
|
||
|
|
if zip_type == "shp":
|
||
|
|
print("[INFO] ZIP terdeteksi sebagai Shapefile.")
|
||
|
|
df = read_shp(str(tmp_path))
|
||
|
|
|
||
|
|
elif zip_type == "gdb":
|
||
|
|
print("[INFO] ZIP terdeteksi sebagai Geodatabase (GDB).")
|
||
|
|
df = read_gdb(str(tmp_path))
|
||
|
|
|
||
|
|
else:
|
||
|
|
return successRes(message="ZIP file tidak mengandung SHP / GDB valid.")
|
||
|
|
else:
|
||
|
|
raise errorRes(status_code=400, message="Unsupported file type")
|
||
|
|
|
||
|
|
if df is None or (hasattr(df, "empty") and df.empty):
|
||
|
|
return successRes(message="File berhasil dibaca, Tetapi tidak ditemukan tabel valid")
|
||
|
|
|
||
|
|
res = await process_data(df, ext, fname, fileDesc)
|
||
|
|
|
||
|
|
tmp_path.unlink(missing_ok=True)
|
||
|
|
|
||
|
|
return successRes(data=res)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[ERROR] {e}")
|
||
|
|
return errorRes(
|
||
|
|
message="Internal Server Error",
|
||
|
|
details=str(e),
|
||
|
|
status_code=500
|
||
|
|
)
|
||
|
|
|
||
|
|
# finally:
|
||
|
|
# db_session.close()
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
class PdfRequest(BaseModel):
|
||
|
|
title: str
|
||
|
|
columns: List[str]
|
||
|
|
rows: List[List]
|
||
|
|
fileName: str
|
||
|
|
fileDesc: str
|
||
|
|
|
||
|
|
async def handle_process_pdf(payload: PdfRequest):
|
||
|
|
try:
|
||
|
|
df = convert_df(payload.model_dump())
|
||
|
|
if df is None or (hasattr(df, "empty") and df.empty):
|
||
|
|
return errorRes(message="Tidak ada tabel")
|
||
|
|
|
||
|
|
res = await process_data(df, '.pdf', payload.fileName, payload.fileDesc)
|
||
|
|
return successRes(data=res)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[ERROR] {e}")
|
||
|
|
|
||
|
|
return errorRes(message="Internal Server Error", details= str(e), status_code=500)
|
||
|
|
|
||
|
|
# finally:
|
||
|
|
# db_session.close()
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
class UploadRequest(BaseModel):
|
||
|
|
title: str
|
||
|
|
path: str
|
||
|
|
rows: List[dict]
|
||
|
|
columns: List[str]
|
||
|
|
author: Dict[str, Any]
|
||
|
|
style: str
|
||
|
|
|
||
|
|
# generate _2 if exist
|
||
|
|
async def generate_unique_table_name(base_name: str):
|
||
|
|
base_name = base_name.lower().replace(" ", "_").replace("-", "_")
|
||
|
|
table_name = base_name
|
||
|
|
counter = 2
|
||
|
|
|
||
|
|
async with engine.connect() as conn:
|
||
|
|
while True:
|
||
|
|
result = await conn.execute(
|
||
|
|
text("SELECT to_regclass(:tname)"),
|
||
|
|
{"tname": table_name}
|
||
|
|
)
|
||
|
|
exists = result.scalar()
|
||
|
|
|
||
|
|
if not exists:
|
||
|
|
return table_name
|
||
|
|
|
||
|
|
table_name = f"{base_name}_{counter}"
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
def str_to_date(raw_date: str):
|
||
|
|
if raw_date:
|
||
|
|
try:
|
||
|
|
return datetime.strptime(raw_date, "%Y-%m-%d").date()
|
||
|
|
except Exception as e:
|
||
|
|
print("[WARNING] Tidak bisa parse dateCreated:", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def generate_unique_filename(folder="tmp", ext="parquet", digits=6):
|
||
|
|
os.makedirs(folder, exist_ok=True)
|
||
|
|
while True:
|
||
|
|
file_id = file_id = uuid.uuid4().int
|
||
|
|
filename = f"{folder}/{file_id}.{ext}"
|
||
|
|
|
||
|
|
if not os.path.exists(filename):
|
||
|
|
return filename
|
||
|
|
|
||
|
|
def generate_job_id(user_id: str) -> str:
|
||
|
|
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
|
|
return f"{user_id}_{timestamp}"
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def save_xml_to_sld(xml_string, filename):
|
||
|
|
folder_path = 'style_temp'
|
||
|
|
os.makedirs(folder_path, exist_ok=True)
|
||
|
|
|
||
|
|
file_path = os.path.join(folder_path, f"{filename}.sld")
|
||
|
|
|
||
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
||
|
|
f.write(xml_string)
|
||
|
|
|
||
|
|
return file_path
|
||
|
|
|
||
|
|
|
||
|
|
async def process_parquet_upload(filename: str, table_name: str):
|
||
|
|
from main import db_pool
|
||
|
|
file_path = os.path.join("tmp", filename)
|
||
|
|
|
||
|
|
if not os.path.exists(file_path):
|
||
|
|
print(f"File {file_path} tidak ditemukan")
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
loop = asyncio.get_running_loop()
|
||
|
|
df = await loop.run_in_executor(None, pd.read_parquet, file_path)
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 1. CLEANING NAMA KOLOM (PENTING!)
|
||
|
|
# =====================================================================
|
||
|
|
df.columns = [str(col).strip().upper() for col in df.columns]
|
||
|
|
|
||
|
|
# Cek kolom GEOM (bisa GEOM atau geom setelah upper)
|
||
|
|
# Kita standarkan nama kolom GEOM di DF menjadi "GEOM" untuk memudahkan logic
|
||
|
|
if "GEOM" in df.columns:
|
||
|
|
df.rename(columns={"GEOM": "GEOM"}, inplace=True)
|
||
|
|
|
||
|
|
if "GEOM" not in df.columns:
|
||
|
|
raise Exception("Kolom GEOM tidak ditemukan")
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 2. PERSIAPAN DATA (Row Processing)
|
||
|
|
# =====================================================================
|
||
|
|
clean_rows = []
|
||
|
|
geom_types = set()
|
||
|
|
|
||
|
|
# Ambil semua kolom atribut selain GEOM
|
||
|
|
# Pastikan list ini yang dipakai untuk CREATE TABLE dan COPY (SINKRON)
|
||
|
|
attr_columns = [col for col in df.columns if col != "GEOM"]
|
||
|
|
|
||
|
|
for row in df.itertuples(index=False):
|
||
|
|
# --- Handle GEOM ---
|
||
|
|
raw_geom = getattr(row, "GEOM", None)
|
||
|
|
if not raw_geom: continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
geom = None
|
||
|
|
if isinstance(raw_geom, str):
|
||
|
|
geom = wkt.loads(raw_geom)
|
||
|
|
elif isinstance(raw_geom, bytes):
|
||
|
|
from shapely import wkb
|
||
|
|
geom = wkb.loads(raw_geom)
|
||
|
|
|
||
|
|
if not geom: continue
|
||
|
|
if not geom.is_valid: geom = geom.buffer(0)
|
||
|
|
|
||
|
|
gtype = geom.geom_type.upper()
|
||
|
|
if gtype == "POLYGON": geom = MultiPolygon([geom])
|
||
|
|
elif gtype == "LINESTRING": geom = MultiLineString([geom])
|
||
|
|
|
||
|
|
geom_types.add(geom.geom_type)
|
||
|
|
ewkt = f"SRID=4326;{geom.wkt}"
|
||
|
|
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# --- Handle Attributes (FORCE STRING) ---
|
||
|
|
row_data = []
|
||
|
|
for col in attr_columns:
|
||
|
|
# getattr menggunakan nama kolom uppercase dari attr_columns
|
||
|
|
val = getattr(row, col, None)
|
||
|
|
if val is not None:
|
||
|
|
row_data.append(str(val)) # Convert int/float ke string
|
||
|
|
else:
|
||
|
|
row_data.append(None)
|
||
|
|
|
||
|
|
row_data.append(ewkt)
|
||
|
|
clean_rows.append(tuple(row_data))
|
||
|
|
|
||
|
|
if not clean_rows:
|
||
|
|
raise Exception("Data valid kosong")
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 3. DATABASE OPERATIONS
|
||
|
|
# =====================================================================
|
||
|
|
final_geom_type = list(geom_types)[0].upper() if geom_types else "GEOM"
|
||
|
|
if "MULTI" not in final_geom_type and final_geom_type != "GEOM":
|
||
|
|
final_geom_type = "MULTI" + final_geom_type
|
||
|
|
|
||
|
|
# A. BUILD DDL (CREATE TABLE)
|
||
|
|
# Kita pakai f-string quotes f'"{col}"' agar di DB jadi UPPERCASE ("ID", "NAMA")
|
||
|
|
col_defs = [f'"{col}" TEXT' for col in attr_columns]
|
||
|
|
|
||
|
|
create_sql = f"""
|
||
|
|
CREATE TABLE {table_name} (
|
||
|
|
_id SERIAL PRIMARY KEY, -- lowercase default
|
||
|
|
{', '.join(col_defs)}, -- UPPERCASE (Hasil loop attr_columns)
|
||
|
|
geom TEXT -- lowercase
|
||
|
|
);
|
||
|
|
"""
|
||
|
|
|
||
|
|
async with db_pool.acquire() as conn:
|
||
|
|
# Drop table jika ada (untuk safety dev, production hati-hati)
|
||
|
|
# await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
|
||
|
|
|
||
|
|
# 1. Create Table
|
||
|
|
await conn.execute(create_sql)
|
||
|
|
|
||
|
|
# 2. COPY Data
|
||
|
|
# target_cols harus PERSIS sama dengan attr_columns
|
||
|
|
# asyncpg akan meng-quote string ini otomatis ("ID", "NAMA", "geom")
|
||
|
|
target_cols = attr_columns + ['geom']
|
||
|
|
await conn.copy_records_to_table(
|
||
|
|
table_name,
|
||
|
|
records=clean_rows,
|
||
|
|
columns=target_cols
|
||
|
|
)
|
||
|
|
|
||
|
|
# 3. Alter ke GEOM 2D
|
||
|
|
alter_sql = f"""
|
||
|
|
ALTER TABLE {table_name}
|
||
|
|
ALTER COLUMN geom TYPE geometry({final_geom_type}, 4326)
|
||
|
|
USING ST_Force2D(geom::geometry)::geometry({final_geom_type}, 4326);
|
||
|
|
|
||
|
|
CREATE INDEX idx_{table_name}_geom ON {table_name} USING GIST (geom);
|
||
|
|
"""
|
||
|
|
await conn.execute(alter_sql)
|
||
|
|
|
||
|
|
print(f"Sukses upload {len(clean_rows)} baris ke {table_name}.")
|
||
|
|
os.remove(file_path)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error processing parquet: {e}")
|
||
|
|
# Log error
|
||
|
|
|
||
|
|
|
||
|
|
async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
|
||
|
|
try:
|
||
|
|
table_name = await generate_unique_table_name(payload.title)
|
||
|
|
# DataFrame
|
||
|
|
df = pd.DataFrame(payload.rows)
|
||
|
|
df.columns = [col.upper() for col in df.columns]
|
||
|
|
if "GEOMETRY" not in df.columns:
|
||
|
|
raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan")
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 1. LOAD WKT → SHAPELY
|
||
|
|
# =====================================================================
|
||
|
|
def safe_load_wkt(g):
|
||
|
|
if not isinstance(g, str):
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
geom = wkt.loads(g)
|
||
|
|
return geom
|
||
|
|
except:
|
||
|
|
return None
|
||
|
|
|
||
|
|
df["GEOMETRY"] = df["GEOMETRY"].apply(safe_load_wkt)
|
||
|
|
df = df.rename(columns={"GEOMETRY": "geom"})
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 2. DROP ROW geometry NULL
|
||
|
|
# =====================================================================
|
||
|
|
df = df[df["geom"].notnull()]
|
||
|
|
if df.empty:
|
||
|
|
raise HTTPException(400, "Semua geometry invalid atau NULL")
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 3. VALIDATE geometry (very important)
|
||
|
|
# =====================================================================
|
||
|
|
df["geom"] = df["geom"].apply(lambda g: g if g.is_valid else g.buffer(0))
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 4. SERAGAMKAN TIPE GEOMETRY (Polygon→MultiPolygon, Line→MultiLine)
|
||
|
|
# =====================================================================
|
||
|
|
def unify_geometry_type(g):
|
||
|
|
gtype = g.geom_type.upper()
|
||
|
|
if gtype == "POLYGON":
|
||
|
|
return MultiPolygon([g])
|
||
|
|
if gtype == "LINESTRING":
|
||
|
|
return MultiLineString([g])
|
||
|
|
return g # sudah MULTI atau POINT
|
||
|
|
df["geom"] = df["geom"].apply(unify_geometry_type)
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 5. DETEKSI CRS DARI METADATA / INPUT / DEFAULT
|
||
|
|
# =====================================================================
|
||
|
|
detected_crs = payload.author.get("crs")
|
||
|
|
|
||
|
|
detected = payload.author.get("crs")
|
||
|
|
print('crs', detected)
|
||
|
|
|
||
|
|
if not detected_crs:
|
||
|
|
detected_crs = "EPSG:4326"
|
||
|
|
|
||
|
|
detected_crs = 'EPSG:4326'
|
||
|
|
# Buat GeoDataFrame
|
||
|
|
gdf = gpd.GeoDataFrame(df, geometry="geom", crs=detected_crs)
|
||
|
|
row_count = len(gdf)
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 6. VERIFY CRS (SRID) VALID di PROJ / PostGIS
|
||
|
|
# =====================================================================
|
||
|
|
try:
|
||
|
|
_ = gdf.to_crs(gdf.crs) # test CRS valid
|
||
|
|
except:
|
||
|
|
raise HTTPException(400, f"CRS {detected_crs} tidak valid")
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 7. SIMPAN KE POSTGIS (synchronous)
|
||
|
|
# =====================================================================
|
||
|
|
# print("run")
|
||
|
|
# # 1. Konfigurasi Database (Ambil dari Config/Env Variable)
|
||
|
|
# # Jangan hardcode password di sini
|
||
|
|
# db_host = DB_HOST
|
||
|
|
# db_port = DB_PORT
|
||
|
|
# db_name = DB_NAME
|
||
|
|
# db_user = DB_USER
|
||
|
|
# db_pass = DB_PASS
|
||
|
|
|
||
|
|
# # Connection string untuk OGR (tanpa password agar aman)
|
||
|
|
# conn_str = f"PG:host={db_host} port={db_port} dbname={db_name} user={db_user}"
|
||
|
|
|
||
|
|
# # 2. Siapkan Environment Variable khusus untuk subprocess ini
|
||
|
|
# # Password dimasukkan lewat ENV agar tidak muncul di process list server
|
||
|
|
# env = os.environ.copy()
|
||
|
|
# env["PGPASSWORD"] = db_pass
|
||
|
|
|
||
|
|
# temp_parquet = f"tmp/123123.parquet"
|
||
|
|
|
||
|
|
# # 3. Jalankan ogr2ogr secara Async
|
||
|
|
# # Command: ogr2ogr -f PostgreSQL "PG:..." input.parquet -nln nama_tabel -overwrite
|
||
|
|
# process = await asyncio.create_subprocess_exec(
|
||
|
|
# "ogr2ogr",
|
||
|
|
# "-f", "PostgreSQL",
|
||
|
|
# conn_str,
|
||
|
|
# temp_parquet,
|
||
|
|
# "-nln", table_name,
|
||
|
|
# "-overwrite",
|
||
|
|
# "-nlt", "PROMOTE_TO_MULTI",
|
||
|
|
# "-a_srs", "EPSG:4326",
|
||
|
|
# "-lco", "GEOMETRY_NAME=geom", # Nama kolom di DB
|
||
|
|
# "-lco", "FID=_id", # Nama Primary Key
|
||
|
|
# "-lco", "SPATIAL_INDEX=YES",
|
||
|
|
# "-lco", "LAUNDER=NO",
|
||
|
|
# "--config", "PG_USE_COPY", "YES",
|
||
|
|
# "-dim", "2",
|
||
|
|
# stdout=asyncio.subprocess.PIPE,
|
||
|
|
# stderr=asyncio.subprocess.PIPE,
|
||
|
|
# env=env # Pass password lewat sini
|
||
|
|
# )
|
||
|
|
|
||
|
|
# # 4. Tunggu proses selesai
|
||
|
|
# stdout, stderr = await process.communicate()
|
||
|
|
|
||
|
|
# # 5. Cek apakah sukses
|
||
|
|
# if process.returncode != 0:
|
||
|
|
# error_msg = stderr.decode().strip()
|
||
|
|
# print(f"OGR2OGR Error: {error_msg}")
|
||
|
|
# raise HTTPException(500, detail=f"Gagal import ke DB: {error_msg}")
|
||
|
|
|
||
|
|
# if os.path.exists(temp_parquet):
|
||
|
|
# os.remove(temp_parquet)
|
||
|
|
|
||
|
|
job_id = generate_job_id(str(user_id))
|
||
|
|
await process_parquet_upload(payload.path, table_name)
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 9. SIMPAN METADATA (geom_type, author metadata)
|
||
|
|
# =====================================================================
|
||
|
|
unified_geom_type = list(gdf.geom_type.unique())
|
||
|
|
author = payload.author
|
||
|
|
async with engine.begin() as conn:
|
||
|
|
await conn.execute(text("""
|
||
|
|
INSERT INTO backend.author_metadata (
|
||
|
|
table_title,
|
||
|
|
dataset_title,
|
||
|
|
dataset_abstract,
|
||
|
|
keywords,
|
||
|
|
topic_category,
|
||
|
|
date_created,
|
||
|
|
dataset_status,
|
||
|
|
organization_name,
|
||
|
|
contact_person_name,
|
||
|
|
contact_email,
|
||
|
|
contact_phone,
|
||
|
|
geom_type,
|
||
|
|
user_id,
|
||
|
|
process,
|
||
|
|
geometry_count
|
||
|
|
) VALUES (
|
||
|
|
:table_title,
|
||
|
|
:dataset_title,
|
||
|
|
:dataset_abstract,
|
||
|
|
:keywords,
|
||
|
|
:topic_category,
|
||
|
|
:date_created,
|
||
|
|
:dataset_status,
|
||
|
|
:organization_name,
|
||
|
|
:contact_person_name,
|
||
|
|
:contact_email,
|
||
|
|
:contact_phone,
|
||
|
|
:geom_type,
|
||
|
|
:user_id,
|
||
|
|
:process,
|
||
|
|
:geometry_count
|
||
|
|
)
|
||
|
|
"""), {
|
||
|
|
"table_title": table_name,
|
||
|
|
"dataset_title": payload.title,
|
||
|
|
"dataset_abstract": author.get("abstract"),
|
||
|
|
"keywords": author.get("keywords"),
|
||
|
|
"topic_category": ", ".join(author.get("topicCategory")),
|
||
|
|
"date_created": str_to_date(author.get("dateCreated")),
|
||
|
|
"dataset_status": author.get("status"),
|
||
|
|
"organization_name": author.get("organization"),
|
||
|
|
"contact_person_name": author.get("contactName"),
|
||
|
|
"contact_email": author.get("contactEmail"),
|
||
|
|
"contact_phone": author.get("contactPhone"),
|
||
|
|
"geom_type": json.dumps(unified_geom_type),
|
||
|
|
"user_id": user_id,
|
||
|
|
"process": 'CLEANSING',
|
||
|
|
"geometry_count": row_count
|
||
|
|
})
|
||
|
|
|
||
|
|
# =====================================================================
|
||
|
|
# 10. LOGGING
|
||
|
|
# =====================================================================
|
||
|
|
await log_activity(
|
||
|
|
user_id=user_id,
|
||
|
|
action_type="UPLOAD",
|
||
|
|
action_title=f"Upload dataset {table_name}",
|
||
|
|
details={"table_name": table_name, "rows": len(gdf)}
|
||
|
|
)
|
||
|
|
|
||
|
|
# job_id = generate_job_id(str(user_id))
|
||
|
|
result = {
|
||
|
|
"job_id": job_id,
|
||
|
|
"job_status": "wait",
|
||
|
|
"table_name": table_name,
|
||
|
|
"status": "success",
|
||
|
|
"message": f"Tabel '{table_name}' berhasil dibuat.",
|
||
|
|
"total_rows": len(gdf),
|
||
|
|
"geometry_type": unified_geom_type,
|
||
|
|
"crs": detected_crs,
|
||
|
|
"metadata_uuid": ""
|
||
|
|
}
|
||
|
|
save_xml_to_sld(payload.style, job_id)
|
||
|
|
|
||
|
|
# await report_progress(job_id, "upload", 20, "Upload selesai")
|
||
|
|
|
||
|
|
# cleansing_data(table_name, job_id)
|
||
|
|
cleansing = await query_cleansing_data(table_name)
|
||
|
|
result['job_status'] = cleansing
|
||
|
|
|
||
|
|
publish = await publish_layer(table_name, job_id)
|
||
|
|
result['metadata_uuid'] = publish['uuid']
|
||
|
|
|
||
|
|
mapset = {
|
||
|
|
"name": payload.title,
|
||
|
|
"description": author.get("abstract"),
|
||
|
|
"scale": "1:25000",
|
||
|
|
'projection_system_id': '0196c746-d1ba-7f1c-9706-5df738679cc7',
|
||
|
|
"category_id": author.get("mapsetCategory"),
|
||
|
|
"data_status": "sementara",
|
||
|
|
'classification_id': '01968b4b-d3f9-76c9-888c-ee887ac31ce4',
|
||
|
|
'producer_id': '01968b54-0000-7a67-bd10-975b8923b93e',
|
||
|
|
"layer_type": unified_geom_type[0],
|
||
|
|
'source_id': ['019c03ef-35e1-738b-858d-871dc7d1e4d6'],
|
||
|
|
"layer_url": publish['geos_link'],
|
||
|
|
"metadata_url": f"{GEONETWORK_URL}/srv/eng/catalog.search#/metadata/{publish['uuid']}",
|
||
|
|
"coverage_level": "provinsi",
|
||
|
|
"coverage_area": "kabupaten",
|
||
|
|
"data_update_period": "Tahunan",
|
||
|
|
"data_version": "2026",
|
||
|
|
"is_popular": False,
|
||
|
|
"is_active": True,
|
||
|
|
'regional_id': '01968b53-a910-7a67-bd10-975b8923b92e',
|
||
|
|
"notes": "Mapset baru dibuat",
|
||
|
|
"status_validation": "on_verification",
|
||
|
|
}
|
||
|
|
|
||
|
|
print("mapset data",mapset)
|
||
|
|
await upload_to_main(mapset)
|
||
|
|
|
||
|
|
return successRes(data=result)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
await log_activity(
|
||
|
|
user_id=user_id,
|
||
|
|
action_type="ERROR",
|
||
|
|
action_title="Upload gagal",
|
||
|
|
details={"error": str(e)}
|
||
|
|
)
|
||
|
|
raise HTTPException(status_code=500, detail=str(e))
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
# async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
|
||
|
|
# try:
|
||
|
|
# os.remove(payload.path)
|
||
|
|
# job_id = generate_job_id(str(user_id))
|
||
|
|
# result = {
|
||
|
|
# "job_id": job_id,
|
||
|
|
# "job_status": "done",
|
||
|
|
# "table_name": "just for test",
|
||
|
|
# "status": "success",
|
||
|
|
# "message": f"Tabel test berhasil dibuat.",
|
||
|
|
# "total_rows": 10,
|
||
|
|
# "geometry_type": "Polygon",
|
||
|
|
# "crs": "EPSG 4326",
|
||
|
|
# "metadata_uuid": "-"
|
||
|
|
# }
|
||
|
|
|
||
|
|
# # mapset = {
|
||
|
|
# # 'name': 'TEST Risiko Letusan Gunung Arjuno',
|
||
|
|
# # 'description': 'Peta ini menampilkan area yang berpotensi mengalami letusan Gunung Arjuno dan Gunung Bromo di Provinsi Jawa Timur. Data disusun dalam bentuk poligon yang mengindikasikan tingkat risiko berdasarkan sejarah aktivitas dan karakteristik geologi di wilayah tersebut.',
|
||
|
|
# # 'scale': '1:25000',
|
||
|
|
# # 'projection_system_id': '0196c746-d1ba-7f1c-9706-5df738679cc7',
|
||
|
|
# # 'category_id': '0196c80c-855f-77f9-abd0-0c8a30b8c2f5',
|
||
|
|
# # 'data_status': 'sementara',
|
||
|
|
# # 'classification_id': '01968b4b-d3f9-76c9-888c-ee887ac31ce4',
|
||
|
|
# # 'producer_id': '01968b54-0000-7a67-bd10-975b8923b93e',
|
||
|
|
# # 'layer_type': 'MultiPolygon',
|
||
|
|
# # 'source_id': ['019c03ef-35e1-738b-858d-871dc7d1e4d6'],
|
||
|
|
# # 'layer_url': 'http://192.168.60.24:8888/geoserver/labai/wms?service=WMS&version=1.1.0&request=GetMap&layers=labai:test_risiko_letusan_gunung_arjuno&styles=&bbox=110.89528623700005%2C-8.780412043999945%2C116.26994997700001%2C-5.042971664999925&width=768&height=384&srs=EPSG:4326&format=application/openlayers',
|
||
|
|
# # 'metadata_url': 'http://192.168.60.24:7777/geonetwork/srv/eng/catalog.search#/metadata/123123',
|
||
|
|
# # 'coverage_level': 'provinsi',
|
||
|
|
# # 'coverage_area': 'kabupaten',
|
||
|
|
# # 'data_update_period': 'Tahunan',
|
||
|
|
# # 'data_version': '2026',
|
||
|
|
# # 'is_popular': False,
|
||
|
|
# # 'is_active': True,
|
||
|
|
# # 'regional_id': '01968b53-a910-7a67-bd10-975b8923b92e',
|
||
|
|
# # 'notes': 'Mapset baru dibuat',
|
||
|
|
# # 'status_validation': 'on_verification'
|
||
|
|
# # }
|
||
|
|
|
||
|
|
# # await upload_to_main(mapset)
|
||
|
|
|
||
|
|
# return successRes(data=result)
|
||
|
|
|
||
|
|
# except Exception as e:
|
||
|
|
# print("errot", e)
|
||
|
|
|
||
|
|
|