fixing saving data model

This commit is contained in:
DmsAnhr 2025-11-24 08:57:43 +07:00
parent 17b296d0b4
commit d81d18dcc3
14 changed files with 276 additions and 353 deletions

View File

@ -1,11 +1,82 @@
from fastapi import APIRouter
from core.config import engine
from sqlalchemy import text
from database.connection import engine
from services.datasets.delete import delete_dataset_from_partition # import fungsi di atas
from response import successRes, errorRes
router = APIRouter()
@router.delete("/dataset/{user_id}/{metadata_id}")
def serialize_row(row_dict):
new_dict = {}
for key, value in row_dict.items():
if hasattr(value, "isoformat"):
new_dict[key] = value.isoformat()
else:
new_dict[key] = value
return new_dict
@router.get("/metadata")
async def get_author_metadata(
# user = Depends(get_current_user)
):
"""
Mengambil data author_metadata:
- Admin semua data
- User hanya data miliknya
"""
try:
async with engine.begin() as conn:
# if user.role == "admin":
# query = text("""
# SELECT *
# FROM backend.author_metadata
# ORDER BY created_at DESC
# """)
# result = await conn.execute(query)
# else:
# query = text("""
# SELECT *
# FROM backend.author_metadata
# WHERE user_id = :uid
# ORDER BY created_at DESC
# """)
# result = await conn.execute(query, {"uid": user.id})
# rows = result.fetchall()
query = text("""
SELECT *
FROM backend.author_metadata
ORDER BY created_at DESC
""")
result = await conn.execute(query)
rows = result.fetchall()
# data = [dict(row._mapping) for row in rows]
data = [serialize_row(dict(row._mapping)) for row in rows]
return successRes(
message="Berhasil mengambil data author metadata",
data=data
)
except Exception as e:
print(f"[ERROR] Gagal ambil author_metadata: {e}")
raise errorRes(
status_code=500,
message="Gagal mengambil data author_metadata",
details=str(e)
)
@router.delete("/delete/{user_id}/{metadata_id}")
async def delete_dataset(user_id: int, metadata_id: int, title: str):
"""
Hapus dataset tertentu (berdasarkan user_id dan metadata_id)

View File

@ -1,7 +1,7 @@
from fastapi import APIRouter, File, Form, UploadFile, Depends
from pydantic import BaseModel
from typing import List, Optional
from typing import Any, Dict, List, Optional
from services.upload_file.upload import handle_upload_file, handle_process_pdf, handle_to_postgis
from api.deps.role_dependency import require_role
from database.connection import engine
@ -30,7 +30,9 @@ class UploadRequest(BaseModel):
title: str
rows: List[dict]
columns: List[str]
author: Dict[str, Any]
@router.post("/to-postgis")
async def upload_to_postgis(payload: UploadRequest):
return await handle_to_postgis(payload, engine)
# return await handle_to_postgis(payload, engine)
return await handle_to_postgis(payload)

View File

@ -7,6 +7,7 @@ load_dotenv()
API_VERSION = "2.1.3"
POSTGIS_URL = os.getenv("POSTGIS_URL")
POSTGIS_SYNC_URL = os.getenv("SYNC_URL")
UPLOAD_FOLDER = Path(os.getenv("UPLOAD_FOLDER", "./uploads"))
MAX_FILE_MB = int(os.getenv("MAX_FILE_MB", 30))
@ -18,6 +19,7 @@ ALLOWED_ORIGINS = [
"192.168.60.24:5173",
"http://labai.polinema.ac.id:666",
"https://kkqc31ns-5173.asse.devtunnels.ms"
]
REFERENCE_DB_URL = os.getenv("REFERENCE_DB_URL")

View File

@ -2,8 +2,10 @@ from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from core.config import POSTGIS_URL
from core.config import POSTGIS_URL, POSTGIS_SYNC_URL
engine = create_async_engine(POSTGIS_URL, pool_pre_ping=True)
# SessionLocal = sessionmaker(bind=engine)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
sync_engine = create_engine(POSTGIS_SYNC_URL)

50
main.py
View File

@ -6,8 +6,9 @@ from database.models import Base
from api.routers.system_router import router as system_router
from api.routers.upload_file_router import router as upload_router
from api.routers.auth_router import router as auth_router
from contextlib import asynccontextmanager
from utils.qgis_init import init_qgis
from api.routers.datasets_router import router as dataset_router
# from contextlib import asynccontextmanager
# from utils.qgis_init import init_qgis
app = FastAPI(
title="ETL Geo Upload Service",
@ -26,35 +27,36 @@ app.add_middleware(
# Base.metadata.create_all(bind=engine)
# qgis setup
@asynccontextmanager
async def lifespan(app: FastAPI):
global qgs
qgs = init_qgis()
print("QGIS initialized")
# @asynccontextmanager
# async def lifespan(app: FastAPI):
# global qgs
# qgs = init_qgis()
# print("QGIS initialized")
yield
# yield
# SHUTDOWN (optional)
print("Shutting down...")
# # SHUTDOWN (optional)
# print("Shutting down...")
app = FastAPI(lifespan=lifespan)
# app = FastAPI(lifespan=lifespan)
@app.get("/qgis/status")
def qgis_status():
try:
version = QgsApplication.qgisVersion()
return {
"qgis_status": "connected",
"qgis_version": version
}
except Exception as e:
return {
"qgis_status": "error",
"error": str(e)
}
# @app.get("/qgis/status")
# def qgis_status():
# try:
# version = QgsApplication.qgisVersion()
# return {
# "qgis_status": "connected",
# "qgis_version": version
# }
# except Exception as e:
# return {
# "qgis_status": "error",
# "error": str(e)
# }
# Register routers
app.include_router(system_router, tags=["System"])
app.include_router(auth_router, prefix="/auth", tags=["Auth"])
app.include_router(upload_router, prefix="/upload", tags=["Upload"])
app.include_router(dataset_router, prefix="/dataset", tags=["Upload"])

View File

@ -16,14 +16,15 @@ from services.upload_file.readers.reader_mpk import read_mpk
from services.upload_file.readers.reader_pdf import convert_df, read_pdf
from services.upload_file.utils.geometry_detector import detect_and_build_geometry
from services.upload_file.utils.geometry_detector import attach_polygon_geometry_auto
from database.connection import engine
from database.connection import engine, sync_engine
from database.models import Base
from pydantic import BaseModel
from typing import List, Optional
from typing import Any, Dict, List, Optional
from shapely import wkt
from sqlalchemy import text
from datetime import datetime
from response import successRes, errorRes
from utils.logger_config import log_activity
# Base.metadata.create_all(bind=engine)
@ -119,7 +120,7 @@ def process_data(df: pd.DataFrame, ext: str):
else:
warning_examples = []
preview_data = result.head(10).to_dict(orient="records")
preview_data = result.head(15).to_dict(orient="records")
# preview_data = result.to_dict(orient="records")
preview_safe = [
@ -269,352 +270,162 @@ class UploadRequest(BaseModel):
title: str
rows: List[dict]
columns: List[str]
# import time
# def handle_to_postgis(payload: UploadRequest):
# # time.sleep(2)
# # return {
# # "table_name": 'test',
# # "status": "success",
# # "message": f"Tabel test berhasil diunggah ke PostGIS.",
# # "total_rows": 999,
# # "geometry_type": 'POINT'
# # }
# try:
# table_name = payload.title.lower().replace(" ", "_").replace("-","_")
# df = pd.DataFrame(payload.rows)
# print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# if "geometry" in df.columns:
# df["geometry"] = df["geometry"].apply(
# lambda g: wkt.loads(g) if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) else None
# )
# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# else:
# raise HTTPException(status_code=400, detail="Kolom geometry tidak ditemukan dalam data.")
# with engine.begin() as conn:
# conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
# gdf.to_postgis(table_name, engine, if_exists="replace", index=False)
# with engine.begin() as conn:
# conn.execute(text(f'ALTER TABLE "{table_name}" ADD COLUMN _id SERIAL PRIMARY KEY;'))
# print(f"[INFO] Tabel '{table_name}' berhasil dibuat di PostGIS ({len(gdf)} baris).")
# return {
# "table_name": table_name,
# "status": "success",
# "message": f"Tabel '{table_name}' berhasil diunggah ke PostGIS.",
# "total_rows": len(gdf),
# "geometry_type": list(gdf.geom_type.unique())
# }
# except Exception as e:
# print(f"[ERROR] Gagal upload ke PostGIS: {e}")
# raise HTTPException(status_code=500, detail=str(e))
author: Dict[str, Any]
async def generate_unique_table_name(base_name: str):
base_name = base_name.lower().replace(" ", "_").replace("-", "_")
table_name = base_name
counter = 2
async with engine.connect() as conn:
while True:
result = await conn.execute(
text("SELECT to_regclass(:tname)"),
{"tname": table_name}
)
exists = result.scalar()
if not exists:
return table_name
table_name = f"{base_name}_{counter}"
counter += 1
# VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING")
# def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
# try:
# table_name = "test_partition"
# df = pd.DataFrame(payload.rows)
# print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# if "geometry" not in df.columns:
# raise HTTPException(status_code=400, detail="Kolom geometry tidak ditemukan dalam data.")
# df["geometry"] = df["geometry"].apply(
# lambda g: wkt.loads(g) if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) else None
# )
# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# insert_count = 0
# with engine.begin() as conn:
# # 💡 Tambahkan blok auto-create partisi di sini
# conn.execute(text(f"""
# DO $$
# BEGIN
# IF NOT EXISTS (
# SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}'
# ) THEN
# EXECUTE format('
# CREATE TABLE test_partition_user_%s
# PARTITION OF test_partition
# FOR VALUES IN (%s);
# ', {user_id}, {user_id});
# EXECUTE format('CREATE INDEX ON test_partition_user_%s USING GIST (geom);', {user_id});
# EXECUTE format('CREATE INDEX ON test_partition_user_%s USING GIN (properties);', {user_id});
# END IF;
# END
# $$;
# """))
# # Lanjut insert data seperti biasa
# for _, row in gdf.iterrows():
# geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None
# properties = row.drop(labels=["geometry"]).to_dict()
# conn.execute(
# text("""
# INSERT INTO test_partition (user_id, geom, properties, created_at)
# VALUES (:user_id, ST_Force2D(ST_GeomFromText(:geom, 4326)), CAST(:properties AS jsonb), :created_at)
# """),
# {
# "user_id": user_id,
# "geom": geom_wkt,
# "properties": json.dumps(properties),
# "created_at": datetime.utcnow()
# }
# )
# insert_count += 1
# print(f"[INFO] Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id}.")
# return {
# "table_name": table_name,
# "user_id": user_id,
# "status": "success",
# "message": f"Data berhasil ditambahkan ke partisi user_id={user_id}.",
# "total_rows": insert_count,
# "geometry_type": list(gdf.geom_type.unique())
# }
# except Exception as e:
# print(f"[ERROR] Gagal upload ke PostGIS partition: {e}")
# raise HTTPException(status_code=500, detail=str(e))
# Daftar prefix WKT yang valid
VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING")
def slugify(value: str) -> str:
"""Mengubah judul dataset jadi nama aman untuk VIEW"""
return re.sub(r'[^a-zA-Z0-9]+', '_', value.lower()).strip('_')
# async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str):
# """Membuat VIEW PostgreSQL berdasarkan metadata dataset dan registrasi geometry untuk QGIS."""
# norm_title = slugify(title)
# view_name = f"v_user_{user_id}_{norm_title}"
# base_table = f"test_partition_user_{user_id}"
# # 1⃣ Hapus view lama jika ada
# drop_query = text(f"DROP VIEW IF EXISTS {view_name} CASCADE;")
# await conn.execute(drop_query)
# # 2⃣ Buat view baru
# create_view_query = text(f"""
# CREATE OR REPLACE VIEW {view_name} AS
# SELECT p.*, m.title, m.year, m.description
# FROM {base_table} p
# JOIN dataset_metadata m ON m.id = p.metadata_id
# WHERE p.metadata_id = {metadata_id};
# """)
# await conn.execute(create_view_query)
# # 3⃣ Daftarkan geometry column agar QGIS mengenali layer ini
# # (gunakan Populate_Geometry_Columns jika PostGIS >= 3)
# populate_query = text(f"SELECT Populate_Geometry_Columns('{view_name}'::regclass);")
# await conn.execute(populate_query)
# print(f"[INFO] VIEW {view_name} berhasil dibuat dan geometry terdaftar.")
async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str):
"""Buat VIEW dinamis sesuai struktur atribut JSON pada dataset (hindari duplikasi nama kolom)."""
norm_title = slugify(title)
view_name = f"v_user_{user_id}_{norm_title}"
base_table = f"test_partition_user_{user_id}"
# Ambil daftar field dari metadata
result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id})
fields_json = result.scalar_one_or_none()
# --- daftar kolom bawaan dari tabel utama
base_columns = {"id", "user_id", "metadata_id", "geom"}
columns_sql = ""
field_list = []
if fields_json:
def str_to_date(raw_date: str):
if raw_date:
try:
# handle jika data sudah berupa list atau string JSON
if isinstance(fields_json, str):
field_list = json.loads(fields_json)
elif isinstance(fields_json, list):
field_list = fields_json
else:
raise ValueError(f"Tipe data fields_json tidak dikenali: {type(fields_json)}")
for f in field_list:
safe_col = slugify(f)
# Hindari duplikat nama dengan kolom utama
if safe_col in base_columns:
alias_name = f"attr_{safe_col}"
else:
alias_name = safe_col
columns_sql += f", p.attributes->>'{f}' AS {alias_name}"
return datetime.strptime(raw_date, "%Y-%m-%d").date()
except Exception as e:
print(f"[WARN] Gagal parse field list metadata: {e}")
print("[WARNING] Tidak bisa parse dateCreated:", e)
return None
# 1⃣ Drop view lama
await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;"))
# 2⃣ Buat view baru dinamis
create_view_query = f"""
CREATE OR REPLACE VIEW {view_name} AS
SELECT p.id, p.user_id, p.metadata_id, p.geom
{columns_sql},
m.title, m.year, m.description
FROM {base_table} p
JOIN dataset_metadata m ON m.id = p.metadata_id
WHERE p.metadata_id = {metadata_id};
"""
await conn.execute(text(create_view_query))
# 3⃣ Register geometry untuk QGIS
await conn.execute(text(f"SELECT Populate_Geometry_Columns('{view_name}'::regclass);"))
print(f"[INFO] VIEW {view_name} berhasil dibuat (kolom dinamis: {field_list if field_list else '(none)'}).")
async def handle_to_postgis(payload, engine, user_id: int = 3):
"""
Menangani upload data spasial ke PostGIS (dengan partition per user).
- Jika partisi belum ada, akan dibuat otomatis
- Metadata dataset disimpan di tabel dataset_metadata
- Data spasial dimasukkan ke tabel partisi (test_partition_user_{id})
- VIEW otomatis dibuat untuk QGIS
"""
import asyncio
async def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
try:
table_name = await generate_unique_table_name(payload.title)
df = pd.DataFrame(payload.rows)
print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
df.columns = [col.upper() for col in df.columns]
# --- Validasi kolom geometry ---
if "geometry" not in df.columns:
raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.")
if "GEOMETRY" not in df.columns:
raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan")
# --- Parsing geometry ke objek shapely ---
df["geometry"] = df["geometry"].apply(
df["GEOMETRY"] = df["GEOMETRY"].apply(
lambda g: wkt.loads(g)
if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES)
else None
if isinstance(g, str) else None
)
# --- Buat GeoDataFrame ---
gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
gdf = gpd.GeoDataFrame(df, geometry="GEOMETRY", crs="EPSG:4326")
# --- Metadata info dari payload ---
# dataset_title = getattr(payload, "dataset_title", None)
# dataset_year = getattr(payload, "dataset_year", None)
# dataset_desc = getattr(payload, "dataset_description", None)
dataset_title = "longsor 2020"
dataset_year = 2020
dataset_desc = "test metadata"
# --- Wajib: gunakan engine sync TANPA asyncpg
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
lambda: gdf.to_postgis(
table_name,
sync_engine, # JANGAN ENGINE ASYNC
if_exists="replace",
index=False
)
)
if not dataset_title:
raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.")
# === STEP 4: add ID column ===
async with engine.begin() as conn:
await conn.execute(text(
f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;'
))
# === STEP 5: save author metadata ===
author = payload.author
async with engine.begin() as conn:
fields = [col for col in df.columns if col != "geometry"]
# 💾 1⃣ Simpan Metadata Dataset
print("[INFO] Menyimpan metadata dataset...")
result = await conn.execute(
text("""
INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at)
VALUES (:user_id, :title, :year, :desc, :fields, :created_at)
RETURNING id;
"""),
{
"user_id": user_id,
"title": dataset_title,
"year": dataset_year,
"desc": dataset_desc,
"fields": json.dumps(fields),
"created_at": datetime.utcnow(),
},
await conn.execute(text("""
INSERT INTO backend.author_metadata (
table_title,
dataset_title,
dataset_abstract,
keywords,
topic_category,
date_created,
dataset_status,
organization_name,
contact_person_name,
contact_email,
contact_phone,
geom_type,
user_id
) VALUES (
:table_title,
:dataset_title,
:dataset_abstract,
:keywords,
:topic_category,
:date_created,
:dataset_status,
:organization_name,
:contact_person_name,
:contact_email,
:contact_phone,
:geom_type,
:user_id
)
metadata_id = result.scalar_one()
print(f"[INFO] Metadata disimpan dengan ID {metadata_id}")
"""), {
"table_title": table_name,
"dataset_title": author.get("title") or payload.title,
"dataset_abstract": author.get("abstract"),
"keywords": author.get("keywords"),
"topic_category": author.get("topicCategory"),
"date_created": str_to_date(author.get("dateCreated")),
"dataset_status": author.get("status"),
"organization_name": author.get("organization"),
"contact_person_name": author.get("contactName"),
"contact_email": author.get("contactEmail"),
"contact_phone": author.get("contactPhone"),
"geom_type": json.dumps(list(gdf.geom_type.unique())),
"user_id": user_id
})
# ⚙️ 2⃣ Auto-create Partisi Jika Belum Ada
print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...")
await conn.execute(
text(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}'
) THEN
EXECUTE format('
CREATE TABLE test_partition_user_%s
PARTITION OF test_partition
FOR VALUES IN (%s);
', {user_id}, {user_id});
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id});
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id});
END IF;
END
$$;
""")
# === STEP 6: log success ===
await log_activity(
user_id=user_id,
action_type="UPLOAD",
action_title=f"Upload dataset {table_name}",
details={"table_name": table_name, "rows": len(gdf)}
)
# 🧩 3⃣ Insert Data Spasial ke Partisi
print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...")
insert_count = 0
for _, row in gdf.iterrows():
geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None
attributes = row.drop(labels=["geometry"]).to_dict()
await conn.execute(
text("""
INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at)
VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)),
CAST(:attr AS jsonb), :created_at);
"""),
{
"user_id": user_id,
"metadata_id": metadata_id,
"geom": geom_wkt,
"attr": json.dumps(attributes),
"created_at": datetime.utcnow(),
},
)
insert_count += 1
# 🧩 4⃣ Membuat VIEW untuk dataset baru di QGIS
await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title)
print(f"[INFO] ✅ Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).")
return {
res = {
"table_name": table_name,
"status": "success",
"user_id": user_id,
"metadata_id": metadata_id,
"dataset_title": dataset_title,
"inserted_rows": insert_count,
"message": f"Tabel '{table_name}' berhasil dibuat.",
"total_rows": len(gdf),
"geometry_type": list(gdf.geom_type.unique()),
}
return successRes(data=res)
except Exception as e:
print(f"[ERROR] Gagal upload ke PostGIS partition: {e}")
raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e))
await log_activity(
user_id=user_id,
action_type="ERROR",
action_title="Upload gagal",
details={"error": str(e)}
)
print(f"error : {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

View File

@ -1,5 +1,8 @@
import logging
import os
import json
from sqlalchemy import text
from database.connection import engine
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)
@ -30,3 +33,33 @@ def setup_logger(name: str):
logger.addHandler(console_handler)
return logger
async def log_activity(user_id, action_type, action_title, details=None):
payload = {
"user_id": user_id,
"action_type": action_type,
"action_title": action_title,
"action_details": json.dumps(details) if details else None
}
async with engine.begin() as conn:
await conn.execute(
text("""
INSERT INTO backend.system_logs (
user_id,
action_type,
action_title,
action_details
) VALUES (
:user_id,
:action_type,
:action_title,
:action_details
)
"""),
payload
)