update geometry saver

This commit is contained in:
DmsAnhr 2025-12-01 09:22:43 +07:00
parent d81d18dcc3
commit 9eb1b71786
6 changed files with 438 additions and 25 deletions

View File

@ -89,3 +89,10 @@ async def delete_dataset(user_id: int, metadata_id: int, title: str):
except Exception as e:
print(f"[ERROR] Gagal hapus dataset: {e}")
raise errorRes(status_code=500, details=str(e), message="Gagal hapus dataset")
# @router.get("/upload/geoserver")
# async def

View File

@ -1,6 +1,7 @@
import httpx
from fastapi import APIRouter
from datetime import datetime, timedelta
from core.config import API_VERSION
from core.config import API_VERSION, GEOSERVER_URL
router = APIRouter()
@ -21,6 +22,21 @@ async def server_status():
}
@router.get("/status/geoserver")
async def check_geoserver_auth():
url = f"{GEOSERVER_URL}/geoserver/rest/about/version."
auth = ("admin", "geoserver")
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, auth=auth, timeout=5)
return {
"status": "OK" if response.status_code == 200 else "ERROR",
"code": response.status_code,
"response": response.text
}
except Exception as e:
return {"status": "FAILED", "error": str(e)}

View File

@ -7,6 +7,7 @@ load_dotenv()
API_VERSION = "2.1.3"
POSTGIS_URL = os.getenv("POSTGIS_URL")
GEOSERVER_URL = os.getenv("GEOSERVER_PATH")
POSTGIS_SYNC_URL = os.getenv("SYNC_URL")
UPLOAD_FOLDER = Path(os.getenv("UPLOAD_FOLDER", "./uploads"))
MAX_FILE_MB = int(os.getenv("MAX_FILE_MB", 30))

View File

@ -26,6 +26,8 @@ asyncpg
py7zr
bcrypt==4.0.1
passlib == 1.7.4
urllib3<2
# --- jika menggunakan ai ---
groq

View File

@ -20,7 +20,7 @@ from database.connection import engine, sync_engine
from database.models import Base
from pydantic import BaseModel
from typing import Any, Dict, List, Optional
from shapely import wkt
from shapely import MultiLineString, MultiPolygon, wkt
from sqlalchemy import text
from datetime import datetime
from response import successRes, errorRes
@ -120,8 +120,8 @@ def process_data(df: pd.DataFrame, ext: str):
else:
warning_examples = []
preview_data = result.head(15).to_dict(orient="records")
# preview_data = result.to_dict(orient="records")
# preview_data = result.head(15).to_dict(orient="records")
preview_data = result.to_dict(orient="records")
preview_safe = [
{k: safe_json(v) for k, v in row.items()} for row in preview_data
@ -272,7 +272,7 @@ class UploadRequest(BaseModel):
columns: List[str]
author: Dict[str, Any]
# generate _2 if exist
async def generate_unique_table_name(base_name: str):
base_name = base_name.lower().replace(" ", "_").replace("-", "_")
table_name = base_name
@ -302,44 +302,222 @@ def str_to_date(raw_date: str):
import asyncio
# async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
# try:
# table_name = await generate_unique_table_name(payload.title)
# df = pd.DataFrame(payload.rows)
# df.columns = [col.upper() for col in df.columns]
# if "GEOMETRY" not in df.columns:
# raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan")
# df["GEOMETRY"] = df["GEOMETRY"].apply(
# lambda g: wkt.loads(g)
# if isinstance(g, str) else None
# )
# # ======================================================
# # RENAME kolom GEOMETRY → geom (WAJIB)
# # ======================================================
# df = df.rename(columns={"GEOMETRY": "geom"})
# gdf = gpd.GeoDataFrame(df, geometry="geom", crs="EPSG:4326")
# # --- Wajib: gunakan engine sync TANPA asyncpg
# loop = asyncio.get_running_loop()
# await loop.run_in_executor(
# None,
# lambda: gdf.to_postgis(
# table_name,
# sync_engine,
# if_exists="replace",
# index=False
# )
# )
# # === STEP 4: add ID column ===
# async with engine.begin() as conn:
# await conn.execute(text(
# f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;'
# ))
# # === STEP 5: save author metadata ===
# author = payload.author
# async with engine.begin() as conn:
# await conn.execute(text("""
# INSERT INTO backend.author_metadata (
# table_title,
# dataset_title,
# dataset_abstract,
# keywords,
# topic_category,
# date_created,
# dataset_status,
# organization_name,
# contact_person_name,
# contact_email,
# contact_phone,
# geom_type,
# user_id
# ) VALUES (
# :table_title,
# :dataset_title,
# :dataset_abstract,
# :keywords,
# :topic_category,
# :date_created,
# :dataset_status,
# :organization_name,
# :contact_person_name,
# :contact_email,
# :contact_phone,
# :geom_type,
# :user_id
# )
# """), {
# "table_title": table_name,
# "dataset_title": author.get("title") or payload.title,
# "dataset_abstract": author.get("abstract"),
# "keywords": author.get("keywords"),
# "topic_category": author.get("topicCategory"),
# "date_created": str_to_date(author.get("dateCreated")),
# "dataset_status": author.get("status"),
# "organization_name": author.get("organization"),
# "contact_person_name": author.get("contactName"),
# "contact_email": author.get("contactEmail"),
# "contact_phone": author.get("contactPhone"),
# "geom_type": json.dumps(list(gdf.geom_type.unique())),
# "user_id": user_id
# })
# # === STEP 6: log success ===
# await log_activity(
# user_id=user_id,
# action_type="UPLOAD",
# action_title=f"Upload dataset {table_name}",
# details={"table_name": table_name, "rows": len(gdf)}
# )
# res = {
# "table_name": table_name,
# "status": "success",
# "message": f"Tabel '{table_name}' berhasil dibuat.",
# "total_rows": len(gdf),
# "geometry_type": list(gdf.geom_type.unique()),
# }
# return successRes(data=res)
# except Exception as e:
# await log_activity(
# user_id=user_id,
# action_type="ERROR",
# action_title="Upload gagal",
# details={"error": str(e)}
# )
# print(f"error : {str(e)}")
# raise HTTPException(status_code=500, detail=str(e))
async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
try:
table_name = await generate_unique_table_name(payload.title)
# DataFrame
df = pd.DataFrame(payload.rows)
df.columns = [col.upper() for col in df.columns]
if "GEOMETRY" not in df.columns:
raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan")
df["GEOMETRY"] = df["GEOMETRY"].apply(
lambda g: wkt.loads(g)
if isinstance(g, str) else None
)
# =====================================================================
# 1. LOAD WKT → SHAPELY
# =====================================================================
def safe_load_wkt(g):
if not isinstance(g, str):
return None
try:
geom = wkt.loads(g)
return geom
except:
return None
gdf = gpd.GeoDataFrame(df, geometry="GEOMETRY", crs="EPSG:4326")
df["GEOMETRY"] = df["GEOMETRY"].apply(safe_load_wkt)
df = df.rename(columns={"GEOMETRY": "geom"})
# --- Wajib: gunakan engine sync TANPA asyncpg
# =====================================================================
# 2. DROP ROW geometry NULL
# =====================================================================
df = df[df["geom"].notnull()]
if df.empty:
raise HTTPException(400, "Semua geometry invalid atau NULL")
# =====================================================================
# 3. VALIDATE geometry (very important)
# =====================================================================
df["geom"] = df["geom"].apply(lambda g: g if g.is_valid else g.buffer(0))
# =====================================================================
# 4. SERAGAMKAN TIPE GEOMETRY (Polygon→MultiPolygon, Line→MultiLine)
# =====================================================================
def unify_geometry_type(g):
gtype = g.geom_type.upper()
if gtype == "POLYGON":
return MultiPolygon([g])
if gtype == "LINESTRING":
return MultiLineString([g])
return g # sudah MULTI atau POINT
df["geom"] = df["geom"].apply(unify_geometry_type)
# =====================================================================
# 5. DETEKSI CRS DARI METADATA / INPUT / DEFAULT
# =====================================================================
detected_crs = payload.author.get("crs")
detected = payload.author.get("crs")
print('crs', detected)
if not detected_crs:
detected_crs = "EPSG:4326"
# Buat GeoDataFrame
gdf = gpd.GeoDataFrame(df, geometry="geom", crs=detected_crs)
# =====================================================================
# 6. VERIFY CRS (SRID) VALID di PROJ / PostGIS
# =====================================================================
try:
_ = gdf.to_crs(gdf.crs) # test CRS valid
except:
raise HTTPException(400, f"CRS {detected_crs} tidak valid")
# =====================================================================
# 7. SIMPAN KE POSTGIS (synchronous)
# =====================================================================
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
lambda: gdf.to_postgis(
table_name,
sync_engine, # JANGAN ENGINE ASYNC
sync_engine,
if_exists="replace",
index=False
)
)
# === STEP 4: add ID column ===
# =====================================================================
# 8. ADD PRIMARY KEY (wajib untuk QGIS API)
# =====================================================================
async with engine.begin() as conn:
await conn.execute(text(
f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;'
))
# === STEP 5: save author metadata ===
# =====================================================================
# 9. SIMPAN METADATA (geom_type, author metadata)
# =====================================================================
unified_geom_type = list(gdf.geom_type.unique())
author = payload.author
async with engine.begin() as conn:
await conn.execute(text("""
INSERT INTO backend.author_metadata (
@ -383,11 +561,14 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
"contact_person_name": author.get("contactName"),
"contact_email": author.get("contactEmail"),
"contact_phone": author.get("contactPhone"),
"geom_type": json.dumps(list(gdf.geom_type.unique())),
"geom_type": json.dumps(unified_geom_type),
"user_id": user_id
})
# === STEP 6: log success ===
# =====================================================================
# 10. LOGGING
# =====================================================================
await log_activity(
user_id=user_id,
action_type="UPLOAD",
@ -395,15 +576,15 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
details={"table_name": table_name, "rows": len(gdf)}
)
res = {
result = {
"table_name": table_name,
"status": "success",
"message": f"Tabel '{table_name}' berhasil dibuat.",
"total_rows": len(gdf),
"geometry_type": list(gdf.geom_type.unique()),
"geometry_type": unified_geom_type,
"crs": detected_crs,
}
return successRes(data=res)
return successRes(data=result)
except Exception as e:
await log_activity(
@ -412,7 +593,6 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
action_title="Upload gagal",
details={"error": str(e)}
)
print(f"error : {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@ -429,3 +609,203 @@ async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
# ===================================
# partition +VIEW
# ===================================
# Daftar prefix WKT yang valid
# VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING")
def slugify(value: str) -> str:
"""Mengubah judul dataset jadi nama aman untuk VIEW"""
return re.sub(r'[^a-zA-Z0-9]+', '_', value.lower()).strip('_')
# Partition + VIEW
# async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str):
# norm_title = slugify(title)
# view_name = f"v_user_{user_id}_{norm_title}"
# base_table = f"test_partition_user_{user_id}"
# # Ambil daftar field
# result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id})
# fields_json = result.scalar_one_or_none()
# base_columns = {"id", "user_id", "metadata_id", "geom"}
# columns_sql = ""
# field_list = []
# if fields_json:
# fields = json.loads(fields_json) if isinstance(fields_json, str) else fields_json
# field_list = fields
# for f in field_list:
# safe_col = slugify(f)
# alias_name = safe_col if safe_col not in base_columns else f"attr_{safe_col}"
# # CAST otomatis
# if safe_col in ["longitude", "latitude", "lon", "lat"]:
# columns_sql += f", (p.attributes->>'{f}')::float AS {alias_name}"
# else:
# columns_sql += f", p.attributes->>'{f}' AS {alias_name}"
# # Drop view lama
# await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;"))
# # 🔥 Buat VIEW baru yang punya FID unik
# create_view_query = f"""
# CREATE OR REPLACE VIEW {view_name} AS
# SELECT
# row_number() OVER() AS fid, -- FID unik untuk QGIS
# p.id,
# p.user_id,
# p.metadata_id,
# p.geom
# {columns_sql},
# m.title,
# m.year,
# m.description
# FROM {base_table} p
# JOIN dataset_metadata m ON m.id = p.metadata_id
# WHERE p.metadata_id = {metadata_id};
# """
# await conn.execute(text(create_view_query))
# # Register geometry untuk QGIS
# await conn.execute(text(f"DELETE FROM geometry_columns WHERE f_table_name = '{view_name}';"))
# await conn.execute(text(f"""
# INSERT INTO geometry_columns
# (f_table_schema, f_table_name, f_geometry_column, coord_dimension, srid, type)
# VALUES ('public', '{view_name}', 'geom', 2, 4326, 'GEOMETRY');
# """))
# print(f"[INFO] VIEW {view_name} dibuat dengan FID unik dan kompatibel dengan QGIS.")
# async def handle_to_postgis(payload, engine, user_id: int = 3):
# """
# Menangani upload data spasial ke PostGIS (dengan partition per user).
# - Jika partisi belum ada, akan dibuat otomatis
# - Metadata dataset disimpan di tabel dataset_metadata
# - Data spasial dimasukkan ke tabel partisi (test_partition_user_{id})
# - VIEW otomatis dibuat untuk QGIS
# """
# try:
# df = pd.DataFrame(payload.rows)
# print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# # --- Validasi kolom geometry ---
# if "geometry" not in df.columns:
# raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.")
# # --- Parsing geometry ke objek shapely ---
# df["geometry"] = df["geometry"].apply(
# lambda g: wkt.loads(g)
# if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES)
# else None
# )
# # --- Buat GeoDataFrame ---
# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# # --- Metadata info dari payload ---
# # dataset_title = getattr(payload, "dataset_title", None)
# # dataset_year = getattr(payload, "dataset_year", None)
# # dataset_desc = getattr(payload, "dataset_description", None)
# dataset_title = "hujan 2045"
# dataset_year = 2045
# dataset_desc = "test metadata"
# if not dataset_title:
# raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.")
# async with engine.begin() as conn:
# fields = [col for col in df.columns if col != "geometry"]
# # 💾 1⃣ Simpan Metadata Dataset
# print("[INFO] Menyimpan metadata dataset...")
# result = await conn.execute(
# text("""
# INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at)
# VALUES (:user_id, :title, :year, :desc, :fields, :created_at)
# RETURNING id;
# """),
# {
# "user_id": user_id,
# "title": dataset_title,
# "year": dataset_year,
# "desc": dataset_desc,
# "fields": json.dumps(fields),
# "created_at": datetime.utcnow(),
# },
# )
# metadata_id = result.scalar_one()
# print(f"[INFO] Metadata disimpan dengan ID {metadata_id}")
# # ⚙️ 2⃣ Auto-create Partisi Jika Belum Ada
# print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...")
# await conn.execute(
# text(f"""
# DO $$
# BEGIN
# IF NOT EXISTS (
# SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}'
# ) THEN
# EXECUTE format('
# CREATE TABLE test_partition_user_%s
# PARTITION OF test_partition
# FOR VALUES IN (%s);
# ', {user_id}, {user_id});
# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id});
# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id});
# END IF;
# END
# $$;
# """)
# )
# # 🧩 3⃣ Insert Data Spasial ke Partisi
# print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...")
# insert_count = 0
# for _, row in gdf.iterrows():
# geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None
# attributes = row.drop(labels=["geometry"]).to_dict()
# await conn.execute(
# text("""
# INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at)
# VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)),
# CAST(:attr AS jsonb), :created_at);
# """),
# {
# "user_id": user_id,
# "metadata_id": metadata_id,
# "geom": geom_wkt,
# "attr": json.dumps(attributes),
# "created_at": datetime.utcnow(),
# },
# )
# insert_count += 1
# # 🧩 4⃣ Membuat VIEW untuk dataset baru di QGIS
# await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title)
# print(f"[INFO] ✅ Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).")
# return {
# "status": "success",
# "user_id": user_id,
# "metadata_id": metadata_id,
# "dataset_title": dataset_title,
# "inserted_rows": insert_count,
# "geometry_type": list(gdf.geom_type.unique()),
# }
# except Exception as e:
# print(f"[ERROR] Gagal upload ke PostGIS partition: {e}")
# raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e))

View File

@ -182,7 +182,14 @@ def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFram
"""
if isinstance(df, gpd.GeoDataFrame):
if "geometry" in df.columns and df.geometry.notna().any():
geom_cols = [
c for c in df.columns
if re.match(r'^(geometry|geom|the_geom|wkb_geometry)$', c, re.IGNORECASE)
or c.lower().startswith("geom")
or c.lower().endswith("geometry")
]
# if "geometry" in df.columns and df.geometry.notna().any():
if geom_cols:
geom_count = df.geometry.notna().sum()
geom_type = list(df.geom_type.unique())
print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).")