import json import os import pandas as pd import geopandas as gpd import numpy as np import re import zipfile import tempfile import asyncio from pyproj import CRS from shapely.geometry.base import BaseGeometry from shapely.geometry import base as shapely_base from fastapi import Depends, File, Form, UploadFile, HTTPException from api.routers.datasets_router import cleansing_data, publish_layer, query_cleansing_data, upload_to_main from core.config import UPLOAD_FOLDER, MAX_FILE_MB, VALID_WKT_PREFIXES, GEONETWORK_URL from services.upload_file.ai_generate import send_metadata from services.upload_file.readers.reader_csv import read_csv from services.upload_file.readers.reader_shp import read_shp from services.upload_file.readers.reader_gdb import read_gdb from services.upload_file.readers.reader_mpk import read_mpk from services.upload_file.readers.reader_pdf import convert_df, read_pdf from services.upload_file.utils.geometry_detector import detect_and_build_geometry, attach_polygon_geometry_auto from database.connection import engine, sync_engine from pydantic import BaseModel from typing import Any, Dict, List, Optional from shapely import MultiLineString, MultiPolygon, wkt from sqlalchemy import text from datetime import datetime from response import successRes, errorRes from utils.logger_config import log_activity # Base.metadata.create_all(bind=engine) def is_geom_empty(g): if g is None: return True if isinstance(g, float) and pd.isna(g): return True if isinstance(g, BaseGeometry): return g.is_empty return False def safe_json(value): """Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable""" if isinstance(value, (np.int64, np.int32)): return int(value) if isinstance(value, (np.float64, np.float32)): return float(value) if isinstance(value, pd.Timestamp): return value.isoformat() if isinstance(value, shapely_base.BaseGeometry): return str(value) # convert to WKT string if pd.isna(value): return None return value def detect_zip_type(zip_path: str) -> str: with zipfile.ZipFile(zip_path, "r") as zip_ref: files = zip_ref.namelist() if any(f.lower().endswith(".gdb/") or ".gdb/" in f.lower() for f in files): return "gdb" if any(f.lower().endswith(ext) for ext in [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files): return "gdb" if any(f.lower().endswith(".shp") for f in files): return "shp" return "unknown" # def detect_zip_type(zip_path: str) -> str: # with zipfile.ZipFile(zip_path, "r") as zip_ref: # files = zip_ref.namelist() # # ------------------------------------------------------------- # # 1) DETECT FileGDB # # ------------------------------------------------------------- # is_gdb = ( # any(".gdb/" in f.lower() for f in files) # or any(f.lower().endswith(ext) for ext in # [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files) # ) # if is_gdb: # print("\n[INFO] ZIP terdeteksi berisi FileGDB.") # with tempfile.TemporaryDirectory() as temp_dir: # # extract ZIP # with zipfile.ZipFile(zip_path, "r") as zip_ref: # zip_ref.extractall(temp_dir) # # find folder *.gdb # gdb_path = None # for root, dirs, _ in os.walk(temp_dir): # for d in dirs: # if d.lower().endswith(".gdb"): # gdb_path = os.path.join(root, d) # break # if not gdb_path: # print("[ERROR] Folder .gdb tidak ditemukan.") # return "gdb" # print(f"[INFO] GDB Path: {gdb_path}") # # Cari seluruh file .gdbtable # table_files = [ # os.path.join(gdb_path, f) # for f in os.listdir(gdb_path) # if f.lower().endswith(".gdbtable") # ] # if not table_files: # print("[ERROR] Tidak ada file .gdbtable ditemukan.") # return "gdb" # # Scan semua table file untuk mencari SpatialReference # found_crs = False # for table_file in table_files: # try: # with open(table_file, "rb") as f: # raw = f.read(15000) # baca awal file, cukup untuk header JSON # text = raw.decode("utf-8", errors="ignore") # start = text.find("{") # end = text.rfind("}") + 1 # if start == -1 or end == -1: # continue # json_str = text[start:end] # meta = json.loads(json_str) # spatial_ref = meta.get("SpatialReference") # if not spatial_ref: # continue # wkt = spatial_ref.get("WKT") # if not wkt: # continue # print(f"[FOUND] CRS metadata pada: {os.path.basename(table_file)}") # print(f"[CRS WKT] {wkt[:200]}...") # # Convert to EPSG # try: # epsg = CRS.from_wkt(wkt).to_epsg() # print(f"[EPSG] {epsg}") # except: # print("[EPSG] Tidak ditemukan EPSG.") # found_crs = True # break # except Exception: # continue # if not found_crs: # print("[WARNING] Tidak ditemukan CRS di file .gdbtable manapun.") # return "gdb" # # ----------------------------------------------------- # # 2. DETEKSI SHP # # ----------------------------------------------------- # if any(f.lower().endswith(".shp") for f in files): # print("\n[INFO] ZIP terdeteksi berisi SHP.") # # cari file .prj # prj_files = [f for f in files if f.lower().endswith(".prj")] # if not prj_files: # print("[WARNING] Tidak ada file .prj → CRS tidak diketahui.") # return "shp" # with zipfile.ZipFile(zip_path, "r") as zip_ref: # with tempfile.TemporaryDirectory() as temp_dir: # prj_path = os.path.join(temp_dir, os.path.basename(prj_files[0])) # zip_ref.extract(prj_files[0], temp_dir) # # baca isi prj # with open(prj_path, "r") as f: # prj_text = f.read() # try: # crs = CRS.from_wkt(prj_text) # print(f"[CRS WKT] {crs.to_wkt()[:200]}...") # epsg = crs.to_epsg() # if epsg: # print(f"[EPSG] {epsg}") # else: # print("[EPSG] Tidak ditemukan dalam database EPSG.") # except Exception as e: # print("[ERROR] Gagal membaca CRS dari file PRJ:", e) # return "shp" # # ----------------------------------------------------- # # 3. UNKNOWN # # ----------------------------------------------------- # return "unknown" def process_data(df: pd.DataFrame, ext: str, filename: str, fileDesc: str): result = detect_and_build_geometry(df, master_polygons=None) if not hasattr(result, "geometry") or result.geometry.isna().all(): result = attach_polygon_geometry_auto(result) # if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: # geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \ # if not result.empty else "None" # null_geom = result.geometry.isna().sum() def normalize_geom_type(geom_type): if geom_type.startswith("Multi"): return geom_type.replace("Multi", "") return geom_type if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: geom_types = ( result.geometry .dropna() .geom_type .apply(normalize_geom_type) .unique() ) geom_type = geom_types[0] if len(geom_types) > 0 else "None" null_geom = result.geometry.isna().sum() print(f"[INFO] Tipe Geometry: {geom_type}") print(f"[INFO] Jumlah geometry kosong: {null_geom}") else: res = { "message": "Tidak menemukan tabel yang relevan.", "file_type": ext, "rows": 0, "columns": 0, "geometry_valid": 0, "geometry_empty": 0, "geometry_valid_percent": 0, "warnings": [], "warning_examples": [], "preview": [] } return errorRes(message="Tidak berhasil mencocokan geometry pada tabel." ,details=res, status_code=422) result = result.replace([pd.NA, float('inf'), float('-inf')], None) if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns: result['geometry'] = result['geometry'].apply( lambda g: g.wkt if g is not None else None ) empty_count = result['geometry'].apply(is_geom_empty).sum() valid_count = len(result) - empty_count match_percentage = (valid_count / len(result)) * 100 warnings = [] if empty_count > 0: warnings.append( f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid " f"({100 - match_percentage:.2f}% data gagal cocok)." ) if empty_count > 0: examples = result[result['geometry'].apply(is_geom_empty)].head(500) warning_examples = examples.to_dict(orient="records") else: warning_examples = [] # preview_data = result.head(15).to_dict(orient="records") preview_data = result.to_dict(orient="records") preview_safe = [ {k: safe_json(v) for k, v in row.items()} for row in preview_data ] warning_safe = [ {k: safe_json(v) for k, v in row.items()} for row in warning_examples ] ai_context = { "nama_file_peta": filename, "nama_opd": "Badan Penanggulangan Bencana Daerah (BPBD) Provinsi Jatim", "tipe_data_spasial": geom_type, "deskripsi_singkat": fileDesc, "struktur_atribut_data": {}, # "metadata": { # "judul": "", # "abstrak": "", # "tujuan": "", # "keyword": [], # "kategori": [], # "kategori_mapset": "" # } } ai_suggest = send_metadata(ai_context) # ai_suggest = {'judul': 'Peta Risiko Letusan Gunung Arjuna di Provinsi Jawa Timur', 'abstrak': 'Peta ini menggambarkan wilayah berisiko letusan Gunung Arjuna yang berada di Provinsi Jawa Timur. Data disajikan dalam bentuk poligon yang menunjukkan zona risiko berdasarkan analisis potensi aktivitas vulkanik.', 'tujuan': 'Data dapat digunakan untuk perencanaan mitigasi bencana dan pengambilan keputusan di wilayah Jawa Timur.', 'keyword': ['Risiko letusan', 'Gunung Arjuna', 'Bencana alam', 'Provinsi Jawa Timur', 'Geologi'], 'kategori': ['Geoscientific information', 'Environment'], 'kategori_mapset': 'Lingkungan Hidup'} # print(ai_suggest) response = { "message": "File berhasil dibaca dan dianalisis.", "file_name": filename, "file_type": ext, "rows": int(len(result)), "columns": list(map(str, result.columns)), "geometry_valid": int(valid_count), "geometry_empty": int(empty_count), "geometry_valid_percent": float(round(match_percentage, 2)), "geometry_type": geom_type, "warnings": warnings, "warning_rows": warning_safe, "preview": preview_safe, "metadata_suggest": ai_suggest } # return successRes(content=response) return response async def handle_upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form(""), fileDesc: Optional[str] = Form("")): fname = file.filename ext = os.path.splitext(fname)[1].lower() contents = await file.read() size_mb = len(contents) / (1024*1024) if size_mb > MAX_FILE_MB: raise errorRes(status_code=413, message="Ukuran File Terlalu Besar") tmp_path = UPLOAD_FOLDER / fname with open(tmp_path, "wb") as f: f.write(contents) try: df = None print('ext', ext) if ext == ".csv": df = read_csv(str(tmp_path)) elif ext == ".xlsx": df = read_csv(str(tmp_path), sheet) elif ext == ".mpk": df = read_mpk(str(tmp_path)) elif ext == ".pdf": tbl = read_pdf(tmp_path, page) if len(tbl) == 0: res = { "message": "Tidak ditemukan tabel valid pada halaman yang dipilih", "tables": {}, "file_type": ext } return successRes(message="Tidak ditemukan tabel valid pada halaman yang dipilih", data=res) elif len(tbl) > 1: res = { "message": "File berhasil dibaca dan dianalisis.", "tables": tbl, "file_type": ext } return successRes(data=res, message="File berhasil dibaca dan dianalisis.") else: df = convert_df(tbl[0]) elif ext == ".zip": zip_type = detect_zip_type(str(tmp_path)) if zip_type == "shp": print("[INFO] ZIP terdeteksi sebagai Shapefile.") df = read_shp(str(tmp_path)) elif zip_type == "gdb": print("[INFO] ZIP terdeteksi sebagai Geodatabase (GDB).") df = read_gdb(str(tmp_path)) else: return successRes(message="ZIP file tidak mengandung SHP / GDB valid.") else: raise errorRes(status_code=400, message="Unsupported file type") if df is None or (hasattr(df, "empty") and df.empty): return successRes(message="File berhasil dibaca, Tetapi tidak ditemukan tabel valid") res = process_data(df, ext, fname, fileDesc) tmp_path.unlink(missing_ok=True) return successRes(data=res) except Exception as e: print(f"[ERROR] {e}") return errorRes( message="Internal Server Error", details=str(e), status_code=500 ) # finally: # db_session.close() class PdfRequest(BaseModel): title: str columns: List[str] rows: List[List] fileName: str fileDesc: str async def handle_process_pdf(payload: PdfRequest): try: df = convert_df(payload.model_dump()) if df is None or (hasattr(df, "empty") and df.empty): return errorRes(message="Tidak ada tabel") res = process_data(df, '.pdf', payload.fileName, payload.fileDesc) return successRes(data=res) except Exception as e: print(f"[ERROR] {e}") return errorRes(message="Internal Server Error", details= str(e), status_code=500) # finally: # db_session.close() class UploadRequest(BaseModel): title: str rows: List[dict] columns: List[str] author: Dict[str, Any] style: str # generate _2 if exist async def generate_unique_table_name(base_name: str): base_name = base_name.lower().replace(" ", "_").replace("-", "_") table_name = base_name counter = 2 async with engine.connect() as conn: while True: result = await conn.execute( text("SELECT to_regclass(:tname)"), {"tname": table_name} ) exists = result.scalar() if not exists: return table_name table_name = f"{base_name}_{counter}" counter += 1 def str_to_date(raw_date: str): if raw_date: try: return datetime.strptime(raw_date, "%Y-%m-%d").date() except Exception as e: print("[WARNING] Tidak bisa parse dateCreated:", e) return None def generate_job_id(user_id: str) -> str: timestamp = datetime.now().strftime("%Y%m%d%H%M%S") return f"{user_id}_{timestamp}" def save_xml_to_sld(xml_string, filename): folder_path = 'style_temp' os.makedirs(folder_path, exist_ok=True) file_path = os.path.join(folder_path, f"{filename}.sld") with open(file_path, "w", encoding="utf-8") as f: f.write(xml_string) return file_path async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): try: table_name = await generate_unique_table_name(payload.title) # DataFrame df = pd.DataFrame(payload.rows) df.columns = [col.upper() for col in df.columns] if "GEOMETRY" not in df.columns: raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan") # ===================================================================== # 1. LOAD WKT → SHAPELY # ===================================================================== def safe_load_wkt(g): if not isinstance(g, str): return None try: geom = wkt.loads(g) return geom except: return None df["GEOMETRY"] = df["GEOMETRY"].apply(safe_load_wkt) df = df.rename(columns={"GEOMETRY": "geom"}) # ===================================================================== # 2. DROP ROW geometry NULL # ===================================================================== df = df[df["geom"].notnull()] if df.empty: raise HTTPException(400, "Semua geometry invalid atau NULL") # ===================================================================== # 3. VALIDATE geometry (very important) # ===================================================================== df["geom"] = df["geom"].apply(lambda g: g if g.is_valid else g.buffer(0)) # ===================================================================== # 4. SERAGAMKAN TIPE GEOMETRY (Polygon→MultiPolygon, Line→MultiLine) # ===================================================================== def unify_geometry_type(g): gtype = g.geom_type.upper() if gtype == "POLYGON": return MultiPolygon([g]) if gtype == "LINESTRING": return MultiLineString([g]) return g # sudah MULTI atau POINT df["geom"] = df["geom"].apply(unify_geometry_type) # ===================================================================== # 5. DETEKSI CRS DARI METADATA / INPUT / DEFAULT # ===================================================================== detected_crs = payload.author.get("crs") detected = payload.author.get("crs") print('crs', detected) if not detected_crs: detected_crs = "EPSG:4326" detected_crs = 'EPSG:4326' # Buat GeoDataFrame gdf = gpd.GeoDataFrame(df, geometry="geom", crs=detected_crs) row_count = len(gdf) # ===================================================================== # 6. VERIFY CRS (SRID) VALID di PROJ / PostGIS # ===================================================================== try: _ = gdf.to_crs(gdf.crs) # test CRS valid except: raise HTTPException(400, f"CRS {detected_crs} tidak valid") # ===================================================================== # 7. SIMPAN KE POSTGIS (synchronous) # ===================================================================== loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: gdf.to_postgis( table_name, sync_engine, if_exists="replace", index=False ) ) # ===================================================================== # 8. ADD PRIMARY KEY (wajib untuk QGIS API) # ===================================================================== async with engine.begin() as conn: await conn.execute(text( f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;' )) # ===================================================================== # 9. SIMPAN METADATA (geom_type, author metadata) # ===================================================================== unified_geom_type = list(gdf.geom_type.unique()) author = payload.author async with engine.begin() as conn: await conn.execute(text(""" INSERT INTO backend.author_metadata ( table_title, dataset_title, dataset_abstract, keywords, topic_category, date_created, dataset_status, organization_name, contact_person_name, contact_email, contact_phone, geom_type, user_id, process, geometry_count ) VALUES ( :table_title, :dataset_title, :dataset_abstract, :keywords, :topic_category, :date_created, :dataset_status, :organization_name, :contact_person_name, :contact_email, :contact_phone, :geom_type, :user_id, :process, :geometry_count ) """), { "table_title": table_name, "dataset_title": payload.title, "dataset_abstract": author.get("abstract"), "keywords": author.get("keywords"), # "topic_category": author.get("topicCategory"), "topic_category": ", ".join(author.get("topicCategory")), "date_created": str_to_date(author.get("dateCreated")), "dataset_status": author.get("status"), "organization_name": author.get("organization"), "contact_person_name": author.get("contactName"), "contact_email": author.get("contactEmail"), "contact_phone": author.get("contactPhone"), "geom_type": json.dumps(unified_geom_type), "user_id": user_id, "process": 'CLEANSING', "geometry_count": row_count }) # ===================================================================== # 10. LOGGING # ===================================================================== await log_activity( user_id=user_id, action_type="UPLOAD", action_title=f"Upload dataset {table_name}", details={"table_name": table_name, "rows": len(gdf)} ) job_id = generate_job_id(str(user_id)) result = { "job_id": job_id, "job_status": "wait", "table_name": table_name, "status": "success", "message": f"Tabel '{table_name}' berhasil dibuat.", "total_rows": len(gdf), "geometry_type": unified_geom_type, "crs": detected_crs, "metadata_uuid": "" } save_xml_to_sld(payload.style, job_id) # await report_progress(job_id, "upload", 20, "Upload selesai") # cleansing_data(table_name, job_id) cleansing = await query_cleansing_data(table_name) result['job_status'] = cleansing publish = await publish_layer(table_name, job_id) result['metadata_uuid'] = publish['uuid'] mapset = { "name": payload.title, "description": author.get("abstract"), "scale": "1:25000", "projection_system_id": "0196c746-d1ba-7f1c-9706-5df738679cc7", "category_id": author.get("mapsetCategory"), "data_status": "sementara", "classification_id": "01968b4b-d3f9-76c9-888c-ee887ac31ce4", "producer_id": "019bd4ea-eb33-704e-83c3-8253d457b187", "layer_type": unified_geom_type[0], "source_id": ["019bd4e7-3df8-75c8-9b89-3f310967649c"], "layer_url": publish['geos_link'], "metadata_url": f"{GEONETWORK_URL}/srv/eng/catalog.search#/metadata/{publish['uuid']}", "coverage_level": "provinsi", "coverage_area": "kabupaten", "data_update_period": "Tahunan", "data_version": "2026", "is_popular": False, "is_active": True, "regional_id": "01968b53-a910-7a67-bd10-975b8923b92e", "notes": "Mapset baru dibuat", "status_validation": "on_verification", } print("mapset data",mapset) await upload_to_main(mapset) return successRes(data=result) except Exception as e: await log_activity( user_id=user_id, action_type="ERROR", action_title="Upload gagal", details={"error": str(e)} ) raise HTTPException(status_code=500, detail=str(e)) # async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): # try: # job_id = generate_job_id(str(user_id)) # result = { # "job_id": job_id, # "job_status": "done", # "table_name": "just for test", # "status": "success", # "message": f"Tabel test berhasil dibuat.", # "total_rows": 10, # "geometry_type": "Polygon", # "crs": "EPSG 4326", # "metadata_uuid": "-" # } # mapset = { # "name": "Resiko Letusan Gunung Arjuno", # "description": "Testing Automation Upload", # "scale": "1:25000", # "projection_system_id": "0196c746-d1ba-7f1c-9706-5df738679cc7", # "category_id": "0196c80c-855f-77f9-abd0-0c8a30b8c2f5", # "data_status": "sementara", # "classification_id": "01968b4b-d3f9-76c9-888c-ee887ac31ce4", # "producer_id": "019bd4ea-eb33-704e-83c3-8253d457b187", # "layer_type": "polygon", # "source_id": ["019bd4e7-3df8-75c8-9b89-3f310967649c"], # "layer_url": "http://192.168.60.24:8888/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=labai:risiko_letusan_gunung_arjuno_bromo&bbox=110.89528623700005,-8.780412043999945,116.26994997700001,-5.042971664999925&width=768&height=534&srs=EPSG:4326&styles=&format=application/openlayers", # "metadata_url": "http://192.168.60.24:7777/geonetwork/srv/eng/catalog.search#/metadata/9e5e2f09-13ef-49b5-bb49-1cb12136f63b", # "coverage_level": "provinsi", # "coverage_area": "kabupaten", # "data_update_period": "Tahunan", # "data_version": "2026", # "is_popular": False, # "is_active": True, # "regional_id": "01968b53-a910-7a67-bd10-975b8923b92e", # "notes": "Mapset baru dibuat", # "status_validation": "on_verification", # } # await upload_to_main(mapset) # return successRes(data=result) # except Exception as e: # print("errot", e) # =================================== # partition +VIEW # =================================== # Daftar prefix WKT yang valid # VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING") def slugify(value: str) -> str: """Mengubah judul dataset jadi nama aman untuk VIEW""" return re.sub(r'[^a-zA-Z0-9]+', '_', value.lower()).strip('_') # Partition + VIEW # async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str): # norm_title = slugify(title) # view_name = f"v_user_{user_id}_{norm_title}" # base_table = f"test_partition_user_{user_id}" # # Ambil daftar field # result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id}) # fields_json = result.scalar_one_or_none() # base_columns = {"id", "user_id", "metadata_id", "geom"} # columns_sql = "" # field_list = [] # if fields_json: # fields = json.loads(fields_json) if isinstance(fields_json, str) else fields_json # field_list = fields # for f in field_list: # safe_col = slugify(f) # alias_name = safe_col if safe_col not in base_columns else f"attr_{safe_col}" # # CAST otomatis # if safe_col in ["longitude", "latitude", "lon", "lat"]: # columns_sql += f", (p.attributes->>'{f}')::float AS {alias_name}" # else: # columns_sql += f", p.attributes->>'{f}' AS {alias_name}" # # Drop view lama # await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;")) # # 🔥 Buat VIEW baru yang punya FID unik # create_view_query = f""" # CREATE OR REPLACE VIEW {view_name} AS # SELECT # row_number() OVER() AS fid, -- FID unik untuk QGIS # p.id, # p.user_id, # p.metadata_id, # p.geom # {columns_sql}, # m.title, # m.year, # m.description # FROM {base_table} p # JOIN dataset_metadata m ON m.id = p.metadata_id # WHERE p.metadata_id = {metadata_id}; # """ # await conn.execute(text(create_view_query)) # # Register geometry untuk QGIS # await conn.execute(text(f"DELETE FROM geometry_columns WHERE f_table_name = '{view_name}';")) # await conn.execute(text(f""" # INSERT INTO geometry_columns # (f_table_schema, f_table_name, f_geometry_column, coord_dimension, srid, type) # VALUES ('public', '{view_name}', 'geom', 2, 4326, 'GEOMETRY'); # """)) # print(f"[INFO] VIEW {view_name} dibuat dengan FID unik dan kompatibel dengan QGIS.") # async def handle_to_postgis(payload, engine, user_id: int = 3): # """ # Menangani upload data spasial ke PostGIS (dengan partition per user). # - Jika partisi belum ada, akan dibuat otomatis # - Metadata dataset disimpan di tabel dataset_metadata # - Data spasial dimasukkan ke tabel partisi (test_partition_user_{id}) # - VIEW otomatis dibuat untuk QGIS # """ # try: # df = pd.DataFrame(payload.rows) # print(f"[INFO] Diterima {len(df)} baris data dari frontend.") # # --- Validasi kolom geometry --- # if "geometry" not in df.columns: # raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.") # # --- Parsing geometry ke objek shapely --- # df["geometry"] = df["geometry"].apply( # lambda g: wkt.loads(g) # if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) # else None # ) # # --- Buat GeoDataFrame --- # gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") # # --- Metadata info dari payload --- # # dataset_title = getattr(payload, "dataset_title", None) # # dataset_year = getattr(payload, "dataset_year", None) # # dataset_desc = getattr(payload, "dataset_description", None) # dataset_title = "hujan 2045" # dataset_year = 2045 # dataset_desc = "test metadata" # if not dataset_title: # raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.") # async with engine.begin() as conn: # fields = [col for col in df.columns if col != "geometry"] # # 💾 1️⃣ Simpan Metadata Dataset # print("[INFO] Menyimpan metadata dataset...") # result = await conn.execute( # text(""" # INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at) # VALUES (:user_id, :title, :year, :desc, :fields, :created_at) # RETURNING id; # """), # { # "user_id": user_id, # "title": dataset_title, # "year": dataset_year, # "desc": dataset_desc, # "fields": json.dumps(fields), # "created_at": datetime.utcnow(), # }, # ) # metadata_id = result.scalar_one() # print(f"[INFO] Metadata disimpan dengan ID {metadata_id}") # # ⚙️ 2️⃣ Auto-create Partisi Jika Belum Ada # print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...") # await conn.execute( # text(f""" # DO $$ # BEGIN # IF NOT EXISTS ( # SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}' # ) THEN # EXECUTE format(' # CREATE TABLE test_partition_user_%s # PARTITION OF test_partition # FOR VALUES IN (%s); # ', {user_id}, {user_id}); # EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id}); # EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id}); # END IF; # END # $$; # """) # ) # # 🧩 3️⃣ Insert Data Spasial ke Partisi # print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...") # insert_count = 0 # for _, row in gdf.iterrows(): # geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None # attributes = row.drop(labels=["geometry"]).to_dict() # await conn.execute( # text(""" # INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at) # VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)), # CAST(:attr AS jsonb), :created_at); # """), # { # "user_id": user_id, # "metadata_id": metadata_id, # "geom": geom_wkt, # "attr": json.dumps(attributes), # "created_at": datetime.utcnow(), # }, # ) # insert_count += 1 # # 🧩 4️⃣ Membuat VIEW untuk dataset baru di QGIS # await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title) # print(f"[INFO] ✅ Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).") # return { # "status": "success", # "user_id": user_id, # "metadata_id": metadata_id, # "dataset_title": dataset_title, # "inserted_rows": insert_count, # "geometry_type": list(gdf.geom_type.unique()), # } # except Exception as e: # print(f"[ERROR] Gagal upload ke PostGIS partition: {e}") # raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e))