import asyncio from uuid import uuid4 from fastapi import APIRouter, HTTPException import httpx import requests from sqlalchemy import text from sqlalchemy.exc import SQLAlchemyError from database.connection import engine from services.datasets.delete import delete_dataset_from_partition # import fungsi di atas from response import successRes, errorRes from services.datasets.publish_geonetwork import publish_metadata from services.datasets.publish_geoserver import publish_layer_to_geoserver from services.datasets.metadata import update_job_status from services.upload_file.upload_ws import report_progress from core.config import GEOSERVER_URL, GEOSERVER_USER, GEOSERVER_PASS, QGIS_URL, MAIN_API_URL, SERVICE_KEY router = APIRouter() def serialize_row(row_dict): new_dict = {} for key, value in row_dict.items(): if hasattr(value, "isoformat"): new_dict[key] = value.isoformat() else: new_dict[key] = value return new_dict @router.get("/metadata") async def get_author_metadata( # user = Depends(get_current_user) ): """ Mengambil data author_metadata: - Admin → semua data - User → hanya data miliknya """ try: async with engine.begin() as conn: query = text(""" SELECT * FROM backend.author_metadata ORDER BY CASE process WHEN 'CLEANSING' THEN 1 WHEN 'ERROR' THEN 2 WHEN 'FINISHED' THEN 3 WHEN 'TESTING' THEN 4 END; """) result = await conn.execute(query) rows = result.fetchall() # data = [dict(row._mapping) for row in rows] data = [serialize_row(dict(row._mapping)) for row in rows] return successRes( message="Berhasil mengambil data author metadata", data=data ) except Exception as e: print(f"[ERROR] Gagal ambil author_metadata: {e}") raise errorRes( status_code=500, message="Gagal mengambil data author_metadata", details=str(e) ) @router.delete("/delete/{user_id}/{metadata_id}") async def delete_dataset(user_id: int, metadata_id: int, title: str): """ Hapus dataset tertentu (berdasarkan user_id dan metadata_id) """ try: async with engine.begin() as conn: await delete_dataset_from_partition(conn, user_id, metadata_id, title) return successRes(message=f"Dataset {title} berhasil dihapus.", data="") except Exception as e: print(f"[ERROR] Gagal hapus dataset: {e}") raise errorRes(status_code=500, details=str(e), message="Gagal hapus dataset") # @router.post("/cleansing/{table_name}") def cleansing_data(table_name: str, job_id: str): payload = { "table_name": table_name, "job_id": job_id } print("cleansing_data runn") # response = requests.post( # f"{QGIS_URL}/process/{table_name}", # ) response = requests.post( f"{QGIS_URL}/process", json=payload, ) return response @router.post("/jobs/callback") async def job_callback(payload: dict): table = payload["table"] job_id = payload["job_id"] # await asyncio.sleep(10) await report_progress(job_id, "cleansing", 50, "Cleansing data selesai") # await asyncio.sleep(5) geos_link = publish_layer_to_geoserver(table, job_id) await report_progress(job_id, "publish_geoserver", 80, "Publish GeoServer selesai") # await asyncio.sleep(3) uuid = publish_metadata(table_name=table, geoserver_links=geos_link) await report_progress(job_id, "done", 100, "Publish GeoNetwork selesai") update_job_status(table, "FINISHED", job_id) return { "ok": True, "uuid": uuid } @router.get("/styles") def get_style_list(workspace: str = None): """ Mengambil daftar style yang ada di GeoServer. - Jika workspace = None → ambil style global - Jika workspace diisi → ambil style milik workspace tersebut """ # Tentukan URL sesuai workspace if workspace: url = f"{GEOSERVER_URL}/rest/workspaces/{workspace}/styles" else: url = f"{GEOSERVER_URL}/rest/styles" headers = {"Accept": "application/json"} try: response = requests.get( url, auth=(GEOSERVER_USER, GEOSERVER_PASS), headers=headers, timeout=15 ) if response.status_code == 200: data = response.json() styles = data.get("styles", {}).get("style", []) return { "status": "success", "workspace": workspace, "count": len(styles), "styles": styles } else: raise HTTPException(status_code=response.status_code, detail=response.text) except requests.exceptions.RequestException as e: raise HTTPException(status_code=500, detail=f"Request error: {str(e)}") @router.get("/styles/{style_name}") def get_style(style_name: str, workspace: str = None): """ Mengambil file SLD style dari GeoServer. - Jika workspace tidak diisi → ambil style global - Jika workspace diisi → ambil dari workspace """ # Tentukan endpoint sesuai workspace url = f"{GEOSERVER_URL}/rest/styles/{style_name}.sld" try: response = requests.get( url, auth=(GEOSERVER_USER, GEOSERVER_PASS), timeout=15 ) if response.status_code == 200: # Return isi SLD sebagai text return { "status": "success", "style_name": style_name, "workspace": workspace, "sld": response.text } elif response.status_code == 404: raise HTTPException(status_code=404, detail="Style tidak ditemukan di GeoServer") else: raise HTTPException(status_code=500, detail=f"GeoServer error: {response.text}") except requests.exceptions.RequestException as e: raise HTTPException(status_code=500, detail=f"Request error: {str(e)}") # ============================================================= # cleansing query # ============================================================= # async def query_cleansing_data_fn( # table_name: str # ): # try: # async with engine.begin() as conn: # await conn.execute( # text("SELECT public.fn_cleansing_satupeta_polygon(:table_name)"), # {"table_name": table_name} # ) # return "done" # except SQLAlchemyError as e: # raise RuntimeError(f"Fix geometry failed: {str(e)}") async def query_cleansing_data( table_name: str ): try: async with engine.begin() as conn: await conn.execute( text("CALL pr_cleansing_satupeta_polygon(:table_name, NULL);"), {"table_name": table_name} ) return "done" except SQLAlchemyError as e: raise RuntimeError(f"Fix geometry failed: {str(e)}") async def publish_layer(table_name: str, job_id: str): # await asyncio.sleep(10) try: await report_progress(job_id, "cleansing", 50, "Cleansing data selesai") # await asyncio.sleep(5) geos_link = publish_layer_to_geoserver(table_name, job_id) await report_progress(job_id, "publish_geoserver", 80, "Publish GeoServer selesai") # await asyncio.sleep(3) uuid = publish_metadata( table_name=table_name, geoserver_links=geos_link ) await report_progress(job_id, "done", 100, "Publish GeoNetwork selesai") update_job_status(table_name, "FINISHED", job_id) # return uuid return { "geos_link": geos_link["layer_url"], "uuid": uuid } except Exception as e: update_job_status(table_name, "FAILED", job_id) raise RuntimeError(f"Publish layer gagal: {e}") from e async def upload_to_main(payload: dict): try: async with httpx.AsyncClient(timeout=10) as client: response = await client.post( MAIN_API_URL+"/api/internal/mapsets", json=payload, headers={ "X-SERVICE-KEY": SERVICE_KEY } ) response.raise_for_status() print("GOOOOO") return { "status": "success", "data": response.json() } except httpx.RequestError as e: # error koneksi, DNS, timeout, dll raise HTTPException( status_code=500, detail=f"Gagal connect ke MAIN_API: {str(e)}" ) except httpx.HTTPStatusError as e: # API tujuan balas tapi error (4xx / 5xx) raise HTTPException( status_code=e.response.status_code, detail=f"MAIN_API error: {e.response.text}" ) except Exception as e: # fallback kalau ada error lain raise HTTPException( status_code=500, detail=f"Unexpected error: {str(e)}" )