2025-11-17 03:53:15 +00:00
import json
2025-11-06 07:23:24 +00:00
import os
import pandas as pd
import geopandas as gpd
import numpy as np
2025-11-17 03:53:15 +00:00
import re
2025-11-06 07:23:24 +00:00
import zipfile
2025-12-22 08:19:20 +00:00
import tempfile
import asyncio
from pyproj import CRS
2025-11-06 07:23:24 +00:00
from shapely . geometry . base import BaseGeometry
from shapely . geometry import base as shapely_base
2026-01-28 05:42:46 +00:00
from fastapi import Depends , File , Form , UploadFile , HTTPException
from api . routers . datasets_router import cleansing_data , publish_layer , query_cleansing_data , upload_to_main
from core . config import UPLOAD_FOLDER , MAX_FILE_MB , VALID_WKT_PREFIXES , GEONETWORK_URL
2025-12-22 08:19:20 +00:00
from services . upload_file . ai_generate import send_metadata
2025-11-17 03:53:15 +00:00
from services . upload_file . readers . reader_csv import read_csv
from services . upload_file . readers . reader_shp import read_shp
from services . upload_file . readers . reader_gdb import read_gdb
from services . upload_file . readers . reader_mpk import read_mpk
from services . upload_file . readers . reader_pdf import convert_df , read_pdf
2026-01-28 05:42:46 +00:00
from services . upload_file . utils . geometry_detector import detect_and_build_geometry , attach_polygon_geometry_auto
2025-12-22 08:19:20 +00:00
from services . upload_file . upload_ws import report_progress
2025-11-24 01:57:43 +00:00
from database . connection import engine , sync_engine
2025-11-06 07:23:24 +00:00
from database . models import Base
from pydantic import BaseModel
2025-11-24 01:57:43 +00:00
from typing import Any , Dict , List , Optional
2025-12-01 02:22:43 +00:00
from shapely import MultiLineString , MultiPolygon , wkt
2025-11-06 07:23:24 +00:00
from sqlalchemy import text
2025-11-17 03:53:15 +00:00
from datetime import datetime
from response import successRes , errorRes
2025-11-24 01:57:43 +00:00
from utils . logger_config import log_activity
2025-11-17 03:53:15 +00:00
# Base.metadata.create_all(bind=engine)
2025-11-06 07:23:24 +00:00
def is_geom_empty ( g ) :
if g is None :
return True
if isinstance ( g , float ) and pd . isna ( g ) :
return True
if isinstance ( g , BaseGeometry ) :
return g . is_empty
return False
def safe_json ( value ) :
""" Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable """
if isinstance ( value , ( np . int64 , np . int32 ) ) :
return int ( value )
if isinstance ( value , ( np . float64 , np . float32 ) ) :
return float ( value )
if isinstance ( value , pd . Timestamp ) :
return value . isoformat ( )
if isinstance ( value , shapely_base . BaseGeometry ) :
return str ( value ) # convert to WKT string
if pd . isna ( value ) :
return None
return value
def detect_zip_type ( zip_path : str ) - > str :
with zipfile . ZipFile ( zip_path , " r " ) as zip_ref :
files = zip_ref . namelist ( )
if any ( f . lower ( ) . endswith ( " .gdb/ " ) or " .gdb/ " in f . lower ( ) for f in files ) :
return " gdb "
if any ( f . lower ( ) . endswith ( ext ) for ext in [ " .gdbtable " , " .gdbtablx " , " .gdbindexes " , " .spx " ] for f in files ) :
return " gdb "
if any ( f . lower ( ) . endswith ( " .shp " ) for f in files ) :
return " shp "
return " unknown "
2025-12-22 08:19:20 +00:00
# def detect_zip_type(zip_path: str) -> str:
# with zipfile.ZipFile(zip_path, "r") as zip_ref:
# files = zip_ref.namelist()
# # -------------------------------------------------------------
# # 1) DETECT FileGDB
# # -------------------------------------------------------------
# is_gdb = (
# any(".gdb/" in f.lower() for f in files)
# or any(f.lower().endswith(ext) for ext in
# [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files)
# )
# if is_gdb:
# print("\n[INFO] ZIP terdeteksi berisi FileGDB.")
# with tempfile.TemporaryDirectory() as temp_dir:
# # extract ZIP
# with zipfile.ZipFile(zip_path, "r") as zip_ref:
# zip_ref.extractall(temp_dir)
# # find folder *.gdb
# gdb_path = None
# for root, dirs, _ in os.walk(temp_dir):
# for d in dirs:
# if d.lower().endswith(".gdb"):
# gdb_path = os.path.join(root, d)
# break
# if not gdb_path:
# print("[ERROR] Folder .gdb tidak ditemukan.")
# return "gdb"
# print(f"[INFO] GDB Path: {gdb_path}")
# # Cari seluruh file .gdbtable
# table_files = [
# os.path.join(gdb_path, f)
# for f in os.listdir(gdb_path)
# if f.lower().endswith(".gdbtable")
# ]
# if not table_files:
# print("[ERROR] Tidak ada file .gdbtable ditemukan.")
# return "gdb"
# # Scan semua table file untuk mencari SpatialReference
# found_crs = False
# for table_file in table_files:
# try:
# with open(table_file, "rb") as f:
# raw = f.read(15000) # baca awal file, cukup untuk header JSON
# text = raw.decode("utf-8", errors="ignore")
# start = text.find("{")
# end = text.rfind("}") + 1
# if start == -1 or end == -1:
# continue
# json_str = text[start:end]
# meta = json.loads(json_str)
# spatial_ref = meta.get("SpatialReference")
# if not spatial_ref:
# continue
# wkt = spatial_ref.get("WKT")
# if not wkt:
# continue
# print(f"[FOUND] CRS metadata pada: {os.path.basename(table_file)}")
# print(f"[CRS WKT] {wkt[:200]}...")
# # Convert to EPSG
# try:
# epsg = CRS.from_wkt(wkt).to_epsg()
# print(f"[EPSG] {epsg}")
# except:
# print("[EPSG] Tidak ditemukan EPSG.")
# found_crs = True
# break
# except Exception:
# continue
# if not found_crs:
# print("[WARNING] Tidak ditemukan CRS di file .gdbtable manapun.")
# return "gdb"
# # -----------------------------------------------------
# # 2. DETEKSI SHP
# # -----------------------------------------------------
# if any(f.lower().endswith(".shp") for f in files):
# print("\n[INFO] ZIP terdeteksi berisi SHP.")
# # cari file .prj
# prj_files = [f for f in files if f.lower().endswith(".prj")]
# if not prj_files:
# print("[WARNING] Tidak ada file .prj → CRS tidak diketahui.")
# return "shp"
# with zipfile.ZipFile(zip_path, "r") as zip_ref:
# with tempfile.TemporaryDirectory() as temp_dir:
# prj_path = os.path.join(temp_dir, os.path.basename(prj_files[0]))
# zip_ref.extract(prj_files[0], temp_dir)
# # baca isi prj
# with open(prj_path, "r") as f:
# prj_text = f.read()
# try:
# crs = CRS.from_wkt(prj_text)
# print(f"[CRS WKT] {crs.to_wkt()[:200]}...")
# epsg = crs.to_epsg()
# if epsg:
# print(f"[EPSG] {epsg}")
# else:
# print("[EPSG] Tidak ditemukan dalam database EPSG.")
# except Exception as e:
# print("[ERROR] Gagal membaca CRS dari file PRJ:", e)
# return "shp"
# # -----------------------------------------------------
# # 3. UNKNOWN
# # -----------------------------------------------------
# return "unknown"
def process_data ( df : pd . DataFrame , ext : str , filename : str , fileDesc : str ) :
2025-11-06 07:23:24 +00:00
result = detect_and_build_geometry ( df , master_polygons = None )
if not hasattr ( result , " geometry " ) or result . geometry . isna ( ) . all ( ) :
result = attach_polygon_geometry_auto ( result )
2025-12-22 08:19:20 +00:00
# if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns:
# geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \
# if not result.empty else "None"
# null_geom = result.geometry.isna().sum()
def normalize_geom_type ( geom_type ) :
if geom_type . startswith ( " Multi " ) :
return geom_type . replace ( " Multi " , " " )
return geom_type
2025-11-06 07:23:24 +00:00
if isinstance ( result , gpd . GeoDataFrame ) and " geometry " in result . columns :
2025-12-22 08:19:20 +00:00
geom_types = (
result . geometry
. dropna ( )
. geom_type
. apply ( normalize_geom_type )
. unique ( )
)
2025-11-06 07:23:24 +00:00
2025-12-22 08:19:20 +00:00
geom_type = geom_types [ 0 ] if len ( geom_types ) > 0 else " None "
2025-11-06 07:23:24 +00:00
null_geom = result . geometry . isna ( ) . sum ( )
2025-12-22 08:19:20 +00:00
2025-11-06 07:23:24 +00:00
print ( f " [INFO] Tipe Geometry: { geom_type } " )
print ( f " [INFO] Jumlah geometry kosong: { null_geom } " )
else :
2025-11-17 03:53:15 +00:00
res = {
2025-11-06 07:23:24 +00:00
" message " : " Tidak menemukan tabel yang relevan. " ,
" file_type " : ext ,
" rows " : 0 ,
" columns " : 0 ,
" geometry_valid " : 0 ,
" geometry_empty " : 0 ,
" geometry_valid_percent " : 0 ,
" warnings " : [ ] ,
" warning_examples " : [ ] ,
" preview " : [ ]
}
2025-11-17 03:53:15 +00:00
return errorRes ( message = " Tidak berhasil mencocokan geometry pada tabel. " , details = res , status_code = 422 )
2025-11-06 07:23:24 +00:00
result = result . replace ( [ pd . NA , float ( ' inf ' ) , float ( ' -inf ' ) ] , None )
if isinstance ( result , gpd . GeoDataFrame ) and ' geometry ' in result . columns :
result [ ' geometry ' ] = result [ ' geometry ' ] . apply (
lambda g : g . wkt if g is not None else None
)
empty_count = result [ ' geometry ' ] . apply ( is_geom_empty ) . sum ( )
valid_count = len ( result ) - empty_count
match_percentage = ( valid_count / len ( result ) ) * 100
warnings = [ ]
if empty_count > 0 :
warnings . append (
f " { empty_count } dari { len ( result ) } baris tidak memiliki geometry yang valid "
f " ( { 100 - match_percentage : .2f } % data gagal cocok). "
)
if empty_count > 0 :
examples = result [ result [ ' geometry ' ] . apply ( is_geom_empty ) ] . head ( 500 )
warning_examples = examples . to_dict ( orient = " records " )
else :
warning_examples = [ ]
2025-12-01 02:22:43 +00:00
# preview_data = result.head(15).to_dict(orient="records")
preview_data = result . to_dict ( orient = " records " )
2025-11-06 07:23:24 +00:00
preview_safe = [
{ k : safe_json ( v ) for k , v in row . items ( ) } for row in preview_data
]
warning_safe = [
{ k : safe_json ( v ) for k , v in row . items ( ) } for row in warning_examples
]
2025-12-22 08:19:20 +00:00
ai_context = {
" nama_file_peta " : filename ,
" nama_opd " : " Badan Penanggulangan Bencana Daerah (BPBD) Provinsi Jatim " ,
" tipe_data_spasial " : geom_type ,
" deskripsi_singkat " : fileDesc ,
" struktur_atribut_data " : { } ,
# "metadata": {
# "judul": "",
# "abstrak": "",
# "tujuan": "",
# "keyword": [],
# "kategori": [],
# "kategori_mapset": ""
# }
}
ai_suggest = send_metadata ( ai_context )
# ai_suggest = {'judul': 'Peta Risiko Letusan Gunung Arjuna di Provinsi Jawa Timur', 'abstrak': 'Peta ini menggambarkan wilayah berisiko letusan Gunung Arjuna yang berada di Provinsi Jawa Timur. Data disajikan dalam bentuk poligon yang menunjukkan zona risiko berdasarkan analisis potensi aktivitas vulkanik.', 'tujuan': 'Data dapat digunakan untuk perencanaan mitigasi bencana dan pengambilan keputusan di wilayah Jawa Timur.', 'keyword': ['Risiko letusan', 'Gunung Arjuna', 'Bencana alam', 'Provinsi Jawa Timur', 'Geologi'], 'kategori': ['Geoscientific information', 'Environment'], 'kategori_mapset': 'Lingkungan Hidup'}
2026-01-28 05:42:46 +00:00
# print(ai_suggest)
2025-11-06 07:23:24 +00:00
response = {
" message " : " File berhasil dibaca dan dianalisis. " ,
2025-12-22 08:19:20 +00:00
" file_name " : filename ,
2025-11-06 07:23:24 +00:00
" file_type " : ext ,
" rows " : int ( len ( result ) ) ,
" columns " : list ( map ( str , result . columns ) ) ,
" geometry_valid " : int ( valid_count ) ,
" geometry_empty " : int ( empty_count ) ,
" geometry_valid_percent " : float ( round ( match_percentage , 2 ) ) ,
2025-12-22 08:19:20 +00:00
" geometry_type " : geom_type ,
2025-11-06 07:23:24 +00:00
" warnings " : warnings ,
2025-12-22 08:19:20 +00:00
" warning_rows " : warning_safe ,
" preview " : preview_safe ,
" metadata_suggest " : ai_suggest
2025-11-06 07:23:24 +00:00
}
2025-12-22 08:19:20 +00:00
2025-11-17 03:53:15 +00:00
# return successRes(content=response)
2025-11-06 07:23:24 +00:00
return response
2025-12-22 08:19:20 +00:00
async def handle_upload_file ( file : UploadFile = File ( . . . ) , page : Optional [ str ] = Form ( " " ) , sheet : Optional [ str ] = Form ( " " ) , fileDesc : Optional [ str ] = Form ( " " ) ) :
2025-11-06 07:23:24 +00:00
fname = file . filename
ext = os . path . splitext ( fname ) [ 1 ] . lower ( )
contents = await file . read ( )
size_mb = len ( contents ) / ( 1024 * 1024 )
if size_mb > MAX_FILE_MB :
2025-11-17 03:53:15 +00:00
raise errorRes ( status_code = 413 , message = " Ukuran File Terlalu Besar " )
2025-11-06 07:23:24 +00:00
tmp_path = UPLOAD_FOLDER / fname
with open ( tmp_path , " wb " ) as f :
f . write ( contents )
try :
df = None
print ( ' ext ' , ext )
if ext == " .csv " :
df = read_csv ( str ( tmp_path ) )
elif ext == " .xlsx " :
df = read_csv ( str ( tmp_path ) , sheet )
2025-11-08 09:07:58 +00:00
elif ext == " .mpk " :
df = read_mpk ( str ( tmp_path ) )
2025-11-06 07:23:24 +00:00
elif ext == " .pdf " :
tbl = read_pdf ( tmp_path , page )
if len ( tbl ) == 0 :
2025-11-17 03:53:15 +00:00
res = {
2026-01-28 05:42:46 +00:00
" message " : " Tidak ditemukan tabel valid pada halaman yang dipilih " ,
2025-11-17 03:53:15 +00:00
" tables " : { } ,
2025-11-06 07:23:24 +00:00
" file_type " : ext
}
2026-01-28 05:42:46 +00:00
return successRes ( message = " Tidak ditemukan tabel valid pada halaman yang dipilih " , data = res )
2025-11-06 07:23:24 +00:00
elif len ( tbl ) > 1 :
2025-11-17 03:53:15 +00:00
res = {
2025-11-06 07:23:24 +00:00
" message " : " File berhasil dibaca dan dianalisis. " ,
" tables " : tbl ,
" file_type " : ext
}
2025-11-17 03:53:15 +00:00
return successRes ( data = res , message = " File berhasil dibaca dan dianalisis. " )
2025-11-06 07:23:24 +00:00
else :
df = convert_df ( tbl [ 0 ] )
elif ext == " .zip " :
zip_type = detect_zip_type ( str ( tmp_path ) )
if zip_type == " shp " :
print ( " [INFO] ZIP terdeteksi sebagai Shapefile. " )
df = read_shp ( str ( tmp_path ) )
elif zip_type == " gdb " :
print ( " [INFO] ZIP terdeteksi sebagai Geodatabase (GDB). " )
df = read_gdb ( str ( tmp_path ) )
else :
2026-01-28 05:42:46 +00:00
return successRes ( message = " ZIP file tidak mengandung SHP / GDB valid. " )
2025-11-06 07:23:24 +00:00
else :
2025-11-17 03:53:15 +00:00
raise errorRes ( status_code = 400 , message = " Unsupported file type " )
2025-11-06 07:23:24 +00:00
if df is None or ( hasattr ( df , " empty " ) and df . empty ) :
2025-11-17 03:53:15 +00:00
return successRes ( message = " File berhasil dibaca, Tetapi tidak ditemukan tabel valid " )
2025-11-06 07:23:24 +00:00
2025-12-22 08:19:20 +00:00
res = process_data ( df , ext , fname , fileDesc )
2025-11-06 07:23:24 +00:00
tmp_path . unlink ( missing_ok = True )
2025-11-17 03:53:15 +00:00
return successRes ( data = res )
2025-11-06 07:23:24 +00:00
except Exception as e :
print ( f " [ERROR] { e } " )
2025-11-17 03:53:15 +00:00
return errorRes (
message = " Internal Server Error " ,
details = str ( e ) ,
status_code = 500
)
2025-11-06 07:23:24 +00:00
# finally:
# db_session.close()
class PdfRequest ( BaseModel ) :
title : str
columns : List [ str ]
rows : List [ List ]
2025-12-22 08:19:20 +00:00
fileName : str
fileDesc : str
2025-11-06 07:23:24 +00:00
async def handle_process_pdf ( payload : PdfRequest ) :
try :
df = convert_df ( payload . model_dump ( ) )
if df is None or ( hasattr ( df , " empty " ) and df . empty ) :
2025-11-17 03:53:15 +00:00
return errorRes ( message = " Tidak ada tabel " )
2025-11-06 07:23:24 +00:00
2025-12-22 08:19:20 +00:00
res = process_data ( df , ' .pdf ' , payload . fileName , payload . fileDesc )
2025-11-17 03:53:15 +00:00
return successRes ( data = res )
2025-11-06 07:23:24 +00:00
except Exception as e :
print ( f " [ERROR] { e } " )
2025-11-17 03:53:15 +00:00
return errorRes ( message = " Internal Server Error " , details = str ( e ) , status_code = 500 )
2025-11-06 07:23:24 +00:00
# finally:
# db_session.close()
class UploadRequest ( BaseModel ) :
title : str
rows : List [ dict ]
columns : List [ str ]
2025-11-24 01:57:43 +00:00
author : Dict [ str , Any ]
2025-12-22 08:19:20 +00:00
style : str
2025-11-06 07:23:24 +00:00
2025-12-01 02:22:43 +00:00
# generate _2 if exist
2025-11-24 01:57:43 +00:00
async def generate_unique_table_name ( base_name : str ) :
base_name = base_name . lower ( ) . replace ( " " , " _ " ) . replace ( " - " , " _ " )
table_name = base_name
counter = 2
2025-11-17 03:53:15 +00:00
2025-11-24 01:57:43 +00:00
async with engine . connect ( ) as conn :
while True :
result = await conn . execute (
text ( " SELECT to_regclass(:tname) " ) ,
{ " tname " : table_name }
)
exists = result . scalar ( )
2025-11-17 03:53:15 +00:00
2025-11-24 01:57:43 +00:00
if not exists :
return table_name
2025-11-17 03:53:15 +00:00
2025-11-24 01:57:43 +00:00
table_name = f " { base_name } _ { counter } "
counter + = 1
2025-11-17 03:53:15 +00:00
2025-11-24 01:57:43 +00:00
def str_to_date ( raw_date : str ) :
if raw_date :
try :
return datetime . strptime ( raw_date , " % Y- % m- %d " ) . date ( )
except Exception as e :
print ( " [WARNING] Tidak bisa parse dateCreated: " , e )
return None
2025-11-06 07:23:24 +00:00
2025-11-17 03:53:15 +00:00
2025-12-22 08:19:20 +00:00
def generate_job_id ( user_id : str ) - > str :
timestamp = datetime . now ( ) . strftime ( " % Y % m %d % H % M % S " )
return f " { user_id } _ { timestamp } "
def save_xml_to_sld ( xml_string , filename ) :
folder_path = ' style_temp '
os . makedirs ( folder_path , exist_ok = True )
file_path = os . path . join ( folder_path , f " { filename } .sld " )
with open ( file_path , " w " , encoding = " utf-8 " ) as f :
f . write ( xml_string )
return file_path
2025-12-01 02:22:43 +00:00
2025-11-24 01:57:43 +00:00
async def handle_to_postgis ( payload : UploadRequest , user_id : int = 2 ) :
try :
table_name = await generate_unique_table_name ( payload . title )
2025-12-01 02:22:43 +00:00
# DataFrame
2025-11-24 01:57:43 +00:00
df = pd . DataFrame ( payload . rows )
df . columns = [ col . upper ( ) for col in df . columns ]
if " GEOMETRY " not in df . columns :
raise HTTPException ( 400 , " Kolom GEOMETRY tidak ditemukan " )
2025-11-17 03:53:15 +00:00
2025-12-01 02:22:43 +00:00
# =====================================================================
# 1. LOAD WKT → SHAPELY
# =====================================================================
def safe_load_wkt ( g ) :
if not isinstance ( g , str ) :
return None
try :
geom = wkt . loads ( g )
return geom
except :
return None
df [ " GEOMETRY " ] = df [ " GEOMETRY " ] . apply ( safe_load_wkt )
df = df . rename ( columns = { " GEOMETRY " : " geom " } )
# =====================================================================
# 2. DROP ROW geometry NULL
# =====================================================================
df = df [ df [ " geom " ] . notnull ( ) ]
if df . empty :
raise HTTPException ( 400 , " Semua geometry invalid atau NULL " )
# =====================================================================
# 3. VALIDATE geometry (very important)
# =====================================================================
df [ " geom " ] = df [ " geom " ] . apply ( lambda g : g if g . is_valid else g . buffer ( 0 ) )
# =====================================================================
# 4. SERAGAMKAN TIPE GEOMETRY (Polygon→MultiPolygon, Line→MultiLine)
# =====================================================================
def unify_geometry_type ( g ) :
gtype = g . geom_type . upper ( )
if gtype == " POLYGON " :
return MultiPolygon ( [ g ] )
if gtype == " LINESTRING " :
return MultiLineString ( [ g ] )
return g # sudah MULTI atau POINT
df [ " geom " ] = df [ " geom " ] . apply ( unify_geometry_type )
# =====================================================================
# 5. DETEKSI CRS DARI METADATA / INPUT / DEFAULT
# =====================================================================
detected_crs = payload . author . get ( " crs " )
detected = payload . author . get ( " crs " )
print ( ' crs ' , detected )
if not detected_crs :
detected_crs = " EPSG:4326 "
2025-12-22 08:19:20 +00:00
detected_crs = ' EPSG:4326 '
2025-12-01 02:22:43 +00:00
# Buat GeoDataFrame
gdf = gpd . GeoDataFrame ( df , geometry = " geom " , crs = detected_crs )
2025-12-22 08:19:20 +00:00
row_count = len ( gdf )
2025-12-01 02:22:43 +00:00
# =====================================================================
# 6. VERIFY CRS (SRID) VALID di PROJ / PostGIS
# =====================================================================
try :
_ = gdf . to_crs ( gdf . crs ) # test CRS valid
except :
raise HTTPException ( 400 , f " CRS { detected_crs } tidak valid " )
2025-11-24 01:57:43 +00:00
2025-12-01 02:22:43 +00:00
# =====================================================================
# 7. SIMPAN KE POSTGIS (synchronous)
# =====================================================================
2025-11-24 01:57:43 +00:00
loop = asyncio . get_running_loop ( )
await loop . run_in_executor (
None ,
lambda : gdf . to_postgis (
table_name ,
2025-12-01 02:22:43 +00:00
sync_engine ,
2025-11-24 01:57:43 +00:00
if_exists = " replace " ,
index = False
)
)
2025-11-17 03:53:15 +00:00
2025-12-01 02:22:43 +00:00
# =====================================================================
# 8. ADD PRIMARY KEY (wajib untuk QGIS API)
# =====================================================================
2025-11-24 01:57:43 +00:00
async with engine . begin ( ) as conn :
await conn . execute ( text (
f ' ALTER TABLE " { table_name } " ADD COLUMN _ID SERIAL PRIMARY KEY; '
) )
2025-11-17 03:53:15 +00:00
2025-12-01 02:22:43 +00:00
# =====================================================================
# 9. SIMPAN METADATA (geom_type, author metadata)
# =====================================================================
unified_geom_type = list ( gdf . geom_type . unique ( ) )
2025-11-24 01:57:43 +00:00
author = payload . author
async with engine . begin ( ) as conn :
await conn . execute ( text ( """
INSERT INTO backend . author_metadata (
table_title ,
dataset_title ,
dataset_abstract ,
keywords ,
topic_category ,
date_created ,
dataset_status ,
organization_name ,
contact_person_name ,
contact_email ,
contact_phone ,
geom_type ,
2025-12-22 08:19:20 +00:00
user_id ,
process ,
geometry_count
2025-11-24 01:57:43 +00:00
) VALUES (
: table_title ,
: dataset_title ,
: dataset_abstract ,
: keywords ,
: topic_category ,
: date_created ,
: dataset_status ,
: organization_name ,
: contact_person_name ,
: contact_email ,
: contact_phone ,
: geom_type ,
2025-12-22 08:19:20 +00:00
: user_id ,
: process ,
: geometry_count
2025-11-24 01:57:43 +00:00
)
""" ), {
" table_title " : table_name ,
2025-12-22 08:19:20 +00:00
" dataset_title " : payload . title ,
2025-11-24 01:57:43 +00:00
" dataset_abstract " : author . get ( " abstract " ) ,
" keywords " : author . get ( " keywords " ) ,
2025-12-22 08:19:20 +00:00
# "topic_category": author.get("topicCategory"),
" topic_category " : " , " . join ( author . get ( " topicCategory " ) ) ,
2025-11-24 01:57:43 +00:00
" date_created " : str_to_date ( author . get ( " dateCreated " ) ) ,
" dataset_status " : author . get ( " status " ) ,
" organization_name " : author . get ( " organization " ) ,
" contact_person_name " : author . get ( " contactName " ) ,
" contact_email " : author . get ( " contactEmail " ) ,
" contact_phone " : author . get ( " contactPhone " ) ,
2025-12-01 02:22:43 +00:00
" geom_type " : json . dumps ( unified_geom_type ) ,
2025-12-22 08:19:20 +00:00
" user_id " : user_id ,
" process " : ' CLEANSING ' ,
" geometry_count " : row_count
2025-11-24 01:57:43 +00:00
} )
2025-12-01 02:22:43 +00:00
# =====================================================================
# 10. LOGGING
# =====================================================================
2025-11-24 01:57:43 +00:00
await log_activity (
user_id = user_id ,
action_type = " UPLOAD " ,
action_title = f " Upload dataset { table_name } " ,
details = { " table_name " : table_name , " rows " : len ( gdf ) }
)
2025-11-17 03:53:15 +00:00
2025-12-22 08:19:20 +00:00
job_id = generate_job_id ( str ( user_id ) )
2025-12-01 02:22:43 +00:00
result = {
2025-12-22 08:19:20 +00:00
" job_id " : job_id ,
2026-01-28 05:42:46 +00:00
" job_status " : " wait " ,
2025-11-24 01:57:43 +00:00
" table_name " : table_name ,
" status " : " success " ,
" message " : f " Tabel ' { table_name } ' berhasil dibuat. " ,
" total_rows " : len ( gdf ) ,
2025-12-01 02:22:43 +00:00
" geometry_type " : unified_geom_type ,
" crs " : detected_crs ,
2026-01-28 05:42:46 +00:00
" metadata_uuid " : " "
2025-11-24 01:57:43 +00:00
}
2025-12-22 08:19:20 +00:00
save_xml_to_sld ( payload . style , job_id )
await report_progress ( job_id , " upload " , 20 , " Upload selesai " )
2026-01-28 05:42:46 +00:00
# cleansing_data(table_name, job_id)
cleansing = await query_cleansing_data ( table_name )
result [ ' job_status ' ] = cleansing
publish = await publish_layer ( table_name , job_id )
result [ ' metadata_uuid ' ] = publish [ ' uuid ' ]
mapset = {
" name " : payload . title ,
" description " : author . get ( " abstract " ) ,
" scale " : " 1:25000 " ,
" projection_system_id " : " 0196c746-d1ba-7f1c-9706-5df738679cc7 " ,
" category_id " : author . get ( " mapsetCategory " ) ,
" data_status " : " sementara " ,
" classification_id " : " 01968b4b-d3f9-76c9-888c-ee887ac31ce4 " ,
" producer_id " : " 019bd4ea-eb33-704e-83c3-8253d457b187 " ,
" layer_type " : unified_geom_type [ 0 ] ,
" source_id " : [ " 019bd4e7-3df8-75c8-9b89-3f310967649c " ] ,
" layer_url " : publish [ ' geos_link ' ] ,
" metadata_url " : f " { GEONETWORK_URL } /srv/eng/catalog.search#/metadata/ { publish [ ' uuid ' ] } " ,
" coverage_level " : " provinsi " ,
" coverage_area " : " kabupaten " ,
" data_update_period " : " Tahunan " ,
" data_version " : " 2026 " ,
" is_popular " : False ,
" is_active " : True ,
" regional_id " : " 01968b53-a910-7a67-bd10-975b8923b92e " ,
" notes " : " Mapset baru dibuat " ,
" status_validation " : " on_verification " ,
}
print ( " mapset data " , mapset )
await upload_to_main ( mapset )
2025-12-01 02:22:43 +00:00
return successRes ( data = result )
2025-11-17 03:53:15 +00:00
2025-11-24 01:57:43 +00:00
except Exception as e :
await log_activity (
user_id = user_id ,
action_type = " ERROR " ,
action_title = " Upload gagal " ,
details = { " error " : str ( e ) }
)
raise HTTPException ( status_code = 500 , detail = str ( e ) )
2025-11-17 03:53:15 +00:00
2026-01-28 05:42:46 +00:00
# async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
# try:
# job_id = generate_job_id(str(user_id))
# result = {
# "job_id": job_id,
# "job_status": "done",
# "table_name": "just for test",
# "status": "success",
# "message": f"Tabel test berhasil dibuat.",
# "total_rows": 10,
# "geometry_type": "Polygon",
# "crs": "EPSG 4326",
# "metadata_uuid": "-"
# }
# mapset = {
# "name": "Resiko Letusan Gunung Arjuno",
# "description": "Testing Automation Upload",
# "scale": "1:25000",
# "projection_system_id": "0196c746-d1ba-7f1c-9706-5df738679cc7",
# "category_id": "0196c80c-855f-77f9-abd0-0c8a30b8c2f5",
# "data_status": "sementara",
# "classification_id": "01968b4b-d3f9-76c9-888c-ee887ac31ce4",
# "producer_id": "019bd4ea-eb33-704e-83c3-8253d457b187",
# "layer_type": "polygon",
# "source_id": ["019bd4e7-3df8-75c8-9b89-3f310967649c"],
# "layer_url": "http://192.168.60.24:8888/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=labai:risiko_letusan_gunung_arjuno_bromo&bbox=110.89528623700005,-8.780412043999945,116.26994997700001,-5.042971664999925&width=768&height=534&srs=EPSG:4326&styles=&format=application/openlayers",
# "metadata_url": "http://192.168.60.24:7777/geonetwork/srv/eng/catalog.search#/metadata/9e5e2f09-13ef-49b5-bb49-1cb12136f63b",
# "coverage_level": "provinsi",
# "coverage_area": "kabupaten",
# "data_update_period": "Tahunan",
# "data_version": "2026",
# "is_popular": False,
# "is_active": True,
# "regional_id": "01968b53-a910-7a67-bd10-975b8923b92e",
# "notes": "Mapset baru dibuat",
# "status_validation": "on_verification",
# }
# await upload_to_main(mapset)
# return successRes(data=result)
# except Exception as e:
# print("errot", e)
2025-11-06 07:23:24 +00:00
2025-11-17 03:53:15 +00:00
2025-11-06 07:23:24 +00:00
2025-12-01 02:22:43 +00:00
# ===================================
# partition +VIEW
# ===================================
# Daftar prefix WKT yang valid
# VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING")
def slugify ( value : str ) - > str :
""" Mengubah judul dataset jadi nama aman untuk VIEW """
return re . sub ( r ' [^a-zA-Z0-9]+ ' , ' _ ' , value . lower ( ) ) . strip ( ' _ ' )
# Partition + VIEW
# async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str):
# norm_title = slugify(title)
# view_name = f"v_user_{user_id}_{norm_title}"
# base_table = f"test_partition_user_{user_id}"
# # Ambil daftar field
# result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id})
# fields_json = result.scalar_one_or_none()
# base_columns = {"id", "user_id", "metadata_id", "geom"}
# columns_sql = ""
# field_list = []
# if fields_json:
# fields = json.loads(fields_json) if isinstance(fields_json, str) else fields_json
# field_list = fields
# for f in field_list:
# safe_col = slugify(f)
# alias_name = safe_col if safe_col not in base_columns else f"attr_{safe_col}"
# # CAST otomatis
# if safe_col in ["longitude", "latitude", "lon", "lat"]:
# columns_sql += f", (p.attributes->>'{f}')::float AS {alias_name}"
# else:
# columns_sql += f", p.attributes->>'{f}' AS {alias_name}"
# # Drop view lama
# await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;"))
# # 🔥 Buat VIEW baru yang punya FID unik
# create_view_query = f"""
# CREATE OR REPLACE VIEW {view_name} AS
# SELECT
# row_number() OVER() AS fid, -- FID unik untuk QGIS
# p.id,
# p.user_id,
# p.metadata_id,
# p.geom
# {columns_sql},
# m.title,
# m.year,
# m.description
# FROM {base_table} p
# JOIN dataset_metadata m ON m.id = p.metadata_id
# WHERE p.metadata_id = {metadata_id};
# """
# await conn.execute(text(create_view_query))
# # Register geometry untuk QGIS
# await conn.execute(text(f"DELETE FROM geometry_columns WHERE f_table_name = '{view_name}';"))
# await conn.execute(text(f"""
# INSERT INTO geometry_columns
# (f_table_schema, f_table_name, f_geometry_column, coord_dimension, srid, type)
# VALUES ('public', '{view_name}', 'geom', 2, 4326, 'GEOMETRY');
# """))
# print(f"[INFO] VIEW {view_name} dibuat dengan FID unik dan kompatibel dengan QGIS.")
# async def handle_to_postgis(payload, engine, user_id: int = 3):
# """
# Menangani upload data spasial ke PostGIS (dengan partition per user).
# - Jika partisi belum ada, akan dibuat otomatis
# - Metadata dataset disimpan di tabel dataset_metadata
# - Data spasial dimasukkan ke tabel partisi (test_partition_user_{id})
# - VIEW otomatis dibuat untuk QGIS
# """
# try:
# df = pd.DataFrame(payload.rows)
# print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# # --- Validasi kolom geometry ---
# if "geometry" not in df.columns:
# raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.")
# # --- Parsing geometry ke objek shapely ---
# df["geometry"] = df["geometry"].apply(
# lambda g: wkt.loads(g)
# if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES)
# else None
# )
# # --- Buat GeoDataFrame ---
# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# # --- Metadata info dari payload ---
# # dataset_title = getattr(payload, "dataset_title", None)
# # dataset_year = getattr(payload, "dataset_year", None)
# # dataset_desc = getattr(payload, "dataset_description", None)
# dataset_title = "hujan 2045"
# dataset_year = 2045
# dataset_desc = "test metadata"
# if not dataset_title:
# raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.")
# async with engine.begin() as conn:
# fields = [col for col in df.columns if col != "geometry"]
# # 💾 1️ ⃣ Simpan Metadata Dataset
# print("[INFO] Menyimpan metadata dataset...")
# result = await conn.execute(
# text("""
# INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at)
# VALUES (:user_id, :title, :year, :desc, :fields, :created_at)
# RETURNING id;
# """),
# {
# "user_id": user_id,
# "title": dataset_title,
# "year": dataset_year,
# "desc": dataset_desc,
# "fields": json.dumps(fields),
# "created_at": datetime.utcnow(),
# },
# )
# metadata_id = result.scalar_one()
# print(f"[INFO] Metadata disimpan dengan ID {metadata_id}")
# # ⚙️ 2️ ⃣ Auto-create Partisi Jika Belum Ada
# print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...")
# await conn.execute(
# text(f"""
# DO $$
# BEGIN
# IF NOT EXISTS (
# SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}'
# ) THEN
# EXECUTE format('
# CREATE TABLE test_partition_user_%s
# PARTITION OF test_partition
# FOR VALUES IN (%s);
# ', {user_id}, {user_id});
# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id});
# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id});
# END IF;
# END
# $$;
# """)
# )
# # 🧩 3️ ⃣ Insert Data Spasial ke Partisi
# print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...")
# insert_count = 0
# for _, row in gdf.iterrows():
# geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None
# attributes = row.drop(labels=["geometry"]).to_dict()
# await conn.execute(
# text("""
# INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at)
# VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)),
# CAST(:attr AS jsonb), :created_at);
# """),
# {
# "user_id": user_id,
# "metadata_id": metadata_id,
# "geom": geom_wkt,
# "attr": json.dumps(attributes),
# "created_at": datetime.utcnow(),
# },
# )
# insert_count += 1
# # 🧩 4️ ⃣ Membuat VIEW untuk dataset baru di QGIS
# await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title)
# print(f"[INFO] ✅ Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).")
# return {
# "status": "success",
# "user_id": user_id,
# "metadata_id": metadata_id,
# "dataset_title": dataset_title,
# "inserted_rows": insert_count,
# "geometry_type": list(gdf.geom_type.unique()),
# }
# except Exception as e:
# print(f"[ERROR] Gagal upload ke PostGIS partition: {e}")
# raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e))