From 9eb1b71786fc8cb27d1c5a2b70f55c967afaad12 Mon Sep 17 00:00:00 2001 From: DmsAnhr Date: Mon, 1 Dec 2025 09:22:43 +0700 Subject: [PATCH] update geometry saver --- api/routers/datasets_router.py | 7 + api/routers/system_router.py | 18 +- core/config.py | 1 + requirements.txt | 2 + services/upload_file/upload.py | 426 +++++++++++++++++- .../upload_file/utils/geometry_detector.py | 9 +- 6 files changed, 438 insertions(+), 25 deletions(-) diff --git a/api/routers/datasets_router.py b/api/routers/datasets_router.py index 6d2d61a..8c462f1 100644 --- a/api/routers/datasets_router.py +++ b/api/routers/datasets_router.py @@ -89,3 +89,10 @@ async def delete_dataset(user_id: int, metadata_id: int, title: str): except Exception as e: print(f"[ERROR] Gagal hapus dataset: {e}") raise errorRes(status_code=500, details=str(e), message="Gagal hapus dataset") + + + + + +# @router.get("/upload/geoserver") +# async def \ No newline at end of file diff --git a/api/routers/system_router.py b/api/routers/system_router.py index 27c272d..ca291e4 100644 --- a/api/routers/system_router.py +++ b/api/routers/system_router.py @@ -1,6 +1,7 @@ +import httpx from fastapi import APIRouter from datetime import datetime, timedelta -from core.config import API_VERSION +from core.config import API_VERSION, GEOSERVER_URL router = APIRouter() @@ -21,6 +22,21 @@ async def server_status(): } +@router.get("/status/geoserver") +async def check_geoserver_auth(): + url = f"{GEOSERVER_URL}/geoserver/rest/about/version." + auth = ("admin", "geoserver") + + try: + async with httpx.AsyncClient() as client: + response = await client.get(url, auth=auth, timeout=5) + return { + "status": "OK" if response.status_code == 200 else "ERROR", + "code": response.status_code, + "response": response.text + } + except Exception as e: + return {"status": "FAILED", "error": str(e)} diff --git a/core/config.py b/core/config.py index 4a8258a..9880cce 100644 --- a/core/config.py +++ b/core/config.py @@ -7,6 +7,7 @@ load_dotenv() API_VERSION = "2.1.3" POSTGIS_URL = os.getenv("POSTGIS_URL") +GEOSERVER_URL = os.getenv("GEOSERVER_PATH") POSTGIS_SYNC_URL = os.getenv("SYNC_URL") UPLOAD_FOLDER = Path(os.getenv("UPLOAD_FOLDER", "./uploads")) MAX_FILE_MB = int(os.getenv("MAX_FILE_MB", 30)) diff --git a/requirements.txt b/requirements.txt index 54d3c3d..187b7d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,8 @@ asyncpg py7zr bcrypt==4.0.1 passlib == 1.7.4 +urllib3<2 + # --- jika menggunakan ai --- groq diff --git a/services/upload_file/upload.py b/services/upload_file/upload.py index 01905d0..ccc5089 100644 --- a/services/upload_file/upload.py +++ b/services/upload_file/upload.py @@ -20,7 +20,7 @@ from database.connection import engine, sync_engine from database.models import Base from pydantic import BaseModel from typing import Any, Dict, List, Optional -from shapely import wkt +from shapely import MultiLineString, MultiPolygon, wkt from sqlalchemy import text from datetime import datetime from response import successRes, errorRes @@ -120,8 +120,8 @@ def process_data(df: pd.DataFrame, ext: str): else: warning_examples = [] - preview_data = result.head(15).to_dict(orient="records") - # preview_data = result.to_dict(orient="records") + # preview_data = result.head(15).to_dict(orient="records") + preview_data = result.to_dict(orient="records") preview_safe = [ {k: safe_json(v) for k, v in row.items()} for row in preview_data @@ -272,7 +272,7 @@ class UploadRequest(BaseModel): columns: List[str] author: Dict[str, Any] - +# generate _2 if exist async def generate_unique_table_name(base_name: str): base_name = base_name.lower().replace(" ", "_").replace("-", "_") table_name = base_name @@ -302,44 +302,222 @@ def str_to_date(raw_date: str): import asyncio +# async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): +# try: +# table_name = await generate_unique_table_name(payload.title) + +# df = pd.DataFrame(payload.rows) +# df.columns = [col.upper() for col in df.columns] + +# if "GEOMETRY" not in df.columns: +# raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan") + +# df["GEOMETRY"] = df["GEOMETRY"].apply( +# lambda g: wkt.loads(g) +# if isinstance(g, str) else None +# ) + +# # ====================================================== +# # RENAME kolom GEOMETRY → geom (WAJIB) +# # ====================================================== +# df = df.rename(columns={"GEOMETRY": "geom"}) + +# gdf = gpd.GeoDataFrame(df, geometry="geom", crs="EPSG:4326") + +# # --- Wajib: gunakan engine sync TANPA asyncpg +# loop = asyncio.get_running_loop() +# await loop.run_in_executor( +# None, +# lambda: gdf.to_postgis( +# table_name, +# sync_engine, +# if_exists="replace", +# index=False +# ) +# ) + +# # === STEP 4: add ID column === +# async with engine.begin() as conn: +# await conn.execute(text( +# f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;' +# )) + +# # === STEP 5: save author metadata === +# author = payload.author + +# async with engine.begin() as conn: +# await conn.execute(text(""" +# INSERT INTO backend.author_metadata ( +# table_title, +# dataset_title, +# dataset_abstract, +# keywords, +# topic_category, +# date_created, +# dataset_status, +# organization_name, +# contact_person_name, +# contact_email, +# contact_phone, +# geom_type, +# user_id +# ) VALUES ( +# :table_title, +# :dataset_title, +# :dataset_abstract, +# :keywords, +# :topic_category, +# :date_created, +# :dataset_status, +# :organization_name, +# :contact_person_name, +# :contact_email, +# :contact_phone, +# :geom_type, +# :user_id +# ) +# """), { +# "table_title": table_name, +# "dataset_title": author.get("title") or payload.title, +# "dataset_abstract": author.get("abstract"), +# "keywords": author.get("keywords"), +# "topic_category": author.get("topicCategory"), +# "date_created": str_to_date(author.get("dateCreated")), +# "dataset_status": author.get("status"), +# "organization_name": author.get("organization"), +# "contact_person_name": author.get("contactName"), +# "contact_email": author.get("contactEmail"), +# "contact_phone": author.get("contactPhone"), +# "geom_type": json.dumps(list(gdf.geom_type.unique())), +# "user_id": user_id +# }) + +# # === STEP 6: log success === +# await log_activity( +# user_id=user_id, +# action_type="UPLOAD", +# action_title=f"Upload dataset {table_name}", +# details={"table_name": table_name, "rows": len(gdf)} +# ) + +# res = { +# "table_name": table_name, +# "status": "success", +# "message": f"Tabel '{table_name}' berhasil dibuat.", +# "total_rows": len(gdf), +# "geometry_type": list(gdf.geom_type.unique()), +# } + +# return successRes(data=res) + +# except Exception as e: +# await log_activity( +# user_id=user_id, +# action_type="ERROR", +# action_title="Upload gagal", +# details={"error": str(e)} +# ) +# print(f"error : {str(e)}") +# raise HTTPException(status_code=500, detail=str(e)) + async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): try: table_name = await generate_unique_table_name(payload.title) - + # DataFrame df = pd.DataFrame(payload.rows) df.columns = [col.upper() for col in df.columns] - if "GEOMETRY" not in df.columns: raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan") - df["GEOMETRY"] = df["GEOMETRY"].apply( - lambda g: wkt.loads(g) - if isinstance(g, str) else None - ) + # ===================================================================== + # 1. LOAD WKT → SHAPELY + # ===================================================================== + def safe_load_wkt(g): + if not isinstance(g, str): + return None + try: + geom = wkt.loads(g) + return geom + except: + return None - gdf = gpd.GeoDataFrame(df, geometry="GEOMETRY", crs="EPSG:4326") + df["GEOMETRY"] = df["GEOMETRY"].apply(safe_load_wkt) + df = df.rename(columns={"GEOMETRY": "geom"}) - # --- Wajib: gunakan engine sync TANPA asyncpg + # ===================================================================== + # 2. DROP ROW geometry NULL + # ===================================================================== + df = df[df["geom"].notnull()] + if df.empty: + raise HTTPException(400, "Semua geometry invalid atau NULL") + + # ===================================================================== + # 3. VALIDATE geometry (very important) + # ===================================================================== + df["geom"] = df["geom"].apply(lambda g: g if g.is_valid else g.buffer(0)) + + # ===================================================================== + # 4. SERAGAMKAN TIPE GEOMETRY (Polygon→MultiPolygon, Line→MultiLine) + # ===================================================================== + def unify_geometry_type(g): + gtype = g.geom_type.upper() + if gtype == "POLYGON": + return MultiPolygon([g]) + if gtype == "LINESTRING": + return MultiLineString([g]) + return g # sudah MULTI atau POINT + df["geom"] = df["geom"].apply(unify_geometry_type) + + # ===================================================================== + # 5. DETEKSI CRS DARI METADATA / INPUT / DEFAULT + # ===================================================================== + detected_crs = payload.author.get("crs") + + detected = payload.author.get("crs") + print('crs', detected) + + if not detected_crs: + detected_crs = "EPSG:4326" + + + # Buat GeoDataFrame + gdf = gpd.GeoDataFrame(df, geometry="geom", crs=detected_crs) + + # ===================================================================== + # 6. VERIFY CRS (SRID) VALID di PROJ / PostGIS + # ===================================================================== + try: + _ = gdf.to_crs(gdf.crs) # test CRS valid + except: + raise HTTPException(400, f"CRS {detected_crs} tidak valid") + + # ===================================================================== + # 7. SIMPAN KE POSTGIS (synchronous) + # ===================================================================== loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: gdf.to_postgis( table_name, - sync_engine, # JANGAN ENGINE ASYNC + sync_engine, if_exists="replace", index=False ) ) - # === STEP 4: add ID column === + # ===================================================================== + # 8. ADD PRIMARY KEY (wajib untuk QGIS API) + # ===================================================================== async with engine.begin() as conn: await conn.execute(text( f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;' )) - # === STEP 5: save author metadata === + # ===================================================================== + # 9. SIMPAN METADATA (geom_type, author metadata) + # ===================================================================== + unified_geom_type = list(gdf.geom_type.unique()) author = payload.author - async with engine.begin() as conn: await conn.execute(text(""" INSERT INTO backend.author_metadata ( @@ -383,11 +561,14 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): "contact_person_name": author.get("contactName"), "contact_email": author.get("contactEmail"), "contact_phone": author.get("contactPhone"), - "geom_type": json.dumps(list(gdf.geom_type.unique())), + "geom_type": json.dumps(unified_geom_type), "user_id": user_id }) - # === STEP 6: log success === + + # ===================================================================== + # 10. LOGGING + # ===================================================================== await log_activity( user_id=user_id, action_type="UPLOAD", @@ -395,15 +576,15 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): details={"table_name": table_name, "rows": len(gdf)} ) - res = { + result = { "table_name": table_name, "status": "success", "message": f"Tabel '{table_name}' berhasil dibuat.", "total_rows": len(gdf), - "geometry_type": list(gdf.geom_type.unique()), + "geometry_type": unified_geom_type, + "crs": detected_crs, } - - return successRes(data=res) + return successRes(data=result) except Exception as e: await log_activity( @@ -412,7 +593,6 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): action_title="Upload gagal", details={"error": str(e)} ) - print(f"error : {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @@ -429,3 +609,203 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): + +# =================================== +# partition +VIEW +# =================================== + + +# Daftar prefix WKT yang valid +# VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING") + + +def slugify(value: str) -> str: + """Mengubah judul dataset jadi nama aman untuk VIEW""" + return re.sub(r'[^a-zA-Z0-9]+', '_', value.lower()).strip('_') + + + +# Partition + VIEW +# async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str): +# norm_title = slugify(title) +# view_name = f"v_user_{user_id}_{norm_title}" +# base_table = f"test_partition_user_{user_id}" + +# # Ambil daftar field +# result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id}) +# fields_json = result.scalar_one_or_none() + +# base_columns = {"id", "user_id", "metadata_id", "geom"} +# columns_sql = "" +# field_list = [] + +# if fields_json: +# fields = json.loads(fields_json) if isinstance(fields_json, str) else fields_json +# field_list = fields + +# for f in field_list: +# safe_col = slugify(f) +# alias_name = safe_col if safe_col not in base_columns else f"attr_{safe_col}" + +# # CAST otomatis +# if safe_col in ["longitude", "latitude", "lon", "lat"]: +# columns_sql += f", (p.attributes->>'{f}')::float AS {alias_name}" +# else: +# columns_sql += f", p.attributes->>'{f}' AS {alias_name}" + +# # Drop view lama +# await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;")) + +# # 🔥 Buat VIEW baru yang punya FID unik +# create_view_query = f""" +# CREATE OR REPLACE VIEW {view_name} AS +# SELECT +# row_number() OVER() AS fid, -- FID unik untuk QGIS +# p.id, +# p.user_id, +# p.metadata_id, +# p.geom +# {columns_sql}, +# m.title, +# m.year, +# m.description +# FROM {base_table} p +# JOIN dataset_metadata m ON m.id = p.metadata_id +# WHERE p.metadata_id = {metadata_id}; +# """ +# await conn.execute(text(create_view_query)) + +# # Register geometry untuk QGIS +# await conn.execute(text(f"DELETE FROM geometry_columns WHERE f_table_name = '{view_name}';")) +# await conn.execute(text(f""" +# INSERT INTO geometry_columns +# (f_table_schema, f_table_name, f_geometry_column, coord_dimension, srid, type) +# VALUES ('public', '{view_name}', 'geom', 2, 4326, 'GEOMETRY'); +# """)) + +# print(f"[INFO] VIEW {view_name} dibuat dengan FID unik dan kompatibel dengan QGIS.") + + +# async def handle_to_postgis(payload, engine, user_id: int = 3): +# """ +# Menangani upload data spasial ke PostGIS (dengan partition per user). +# - Jika partisi belum ada, akan dibuat otomatis +# - Metadata dataset disimpan di tabel dataset_metadata +# - Data spasial dimasukkan ke tabel partisi (test_partition_user_{id}) +# - VIEW otomatis dibuat untuk QGIS +# """ + +# try: +# df = pd.DataFrame(payload.rows) +# print(f"[INFO] Diterima {len(df)} baris data dari frontend.") + +# # --- Validasi kolom geometry --- +# if "geometry" not in df.columns: +# raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.") + +# # --- Parsing geometry ke objek shapely --- +# df["geometry"] = df["geometry"].apply( +# lambda g: wkt.loads(g) +# if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) +# else None +# ) + +# # --- Buat GeoDataFrame --- +# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") + +# # --- Metadata info dari payload --- +# # dataset_title = getattr(payload, "dataset_title", None) +# # dataset_year = getattr(payload, "dataset_year", None) +# # dataset_desc = getattr(payload, "dataset_description", None) +# dataset_title = "hujan 2045" +# dataset_year = 2045 +# dataset_desc = "test metadata" + +# if not dataset_title: +# raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.") + +# async with engine.begin() as conn: +# fields = [col for col in df.columns if col != "geometry"] +# # 💾 1️⃣ Simpan Metadata Dataset +# print("[INFO] Menyimpan metadata dataset...") +# result = await conn.execute( +# text(""" +# INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at) +# VALUES (:user_id, :title, :year, :desc, :fields, :created_at) +# RETURNING id; +# """), +# { +# "user_id": user_id, +# "title": dataset_title, +# "year": dataset_year, +# "desc": dataset_desc, +# "fields": json.dumps(fields), +# "created_at": datetime.utcnow(), +# }, +# ) +# metadata_id = result.scalar_one() +# print(f"[INFO] Metadata disimpan dengan ID {metadata_id}") + +# # ⚙️ 2️⃣ Auto-create Partisi Jika Belum Ada +# print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...") +# await conn.execute( +# text(f""" +# DO $$ +# BEGIN +# IF NOT EXISTS ( +# SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}' +# ) THEN +# EXECUTE format(' +# CREATE TABLE test_partition_user_%s +# PARTITION OF test_partition +# FOR VALUES IN (%s); +# ', {user_id}, {user_id}); +# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id}); +# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id}); +# END IF; +# END +# $$; +# """) +# ) + +# # 🧩 3️⃣ Insert Data Spasial ke Partisi +# print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...") +# insert_count = 0 +# for _, row in gdf.iterrows(): +# geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None +# attributes = row.drop(labels=["geometry"]).to_dict() + +# await conn.execute( +# text(""" +# INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at) +# VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)), +# CAST(:attr AS jsonb), :created_at); +# """), +# { +# "user_id": user_id, +# "metadata_id": metadata_id, +# "geom": geom_wkt, +# "attr": json.dumps(attributes), +# "created_at": datetime.utcnow(), +# }, +# ) +# insert_count += 1 + +# # 🧩 4️⃣ Membuat VIEW untuk dataset baru di QGIS +# await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title) + +# print(f"[INFO] ✅ Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).") + +# return { +# "status": "success", +# "user_id": user_id, +# "metadata_id": metadata_id, +# "dataset_title": dataset_title, +# "inserted_rows": insert_count, +# "geometry_type": list(gdf.geom_type.unique()), +# } + +# except Exception as e: +# print(f"[ERROR] Gagal upload ke PostGIS partition: {e}") +# raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e)) + diff --git a/services/upload_file/utils/geometry_detector.py b/services/upload_file/utils/geometry_detector.py index ac00b31..80b0278 100644 --- a/services/upload_file/utils/geometry_detector.py +++ b/services/upload_file/utils/geometry_detector.py @@ -182,7 +182,14 @@ def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFram """ if isinstance(df, gpd.GeoDataFrame): - if "geometry" in df.columns and df.geometry.notna().any(): + geom_cols = [ + c for c in df.columns + if re.match(r'^(geometry|geom|the_geom|wkb_geometry)$', c, re.IGNORECASE) + or c.lower().startswith("geom") + or c.lower().endswith("geometry") + ] + # if "geometry" in df.columns and df.geometry.notna().any(): + if geom_cols: geom_count = df.geometry.notna().sum() geom_type = list(df.geom_type.unique()) print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).")