From 4784b86cf05193d484830e612730daaa8257e409 Mon Sep 17 00:00:00 2001 From: DmsAnhr Date: Tue, 27 Jan 2026 09:52:02 +0700 Subject: [PATCH] initial commit upload microservice --- .DS_Store | Bin 0 -> 8196 bytes .gitignore | 22 + README.md | 136 +++ api/deps/auth_dependency.py | 34 + api/deps/role_dependency.py | 20 + api/routers/auth_router.py | 14 + api/routers/datasets_router.py | 273 +++++ api/routers/system_router.py | 110 ++ api/routers/upload_file_router.py | 41 + api/routers/ws/manager.py | 23 + api/routers/ws/upload_progress_ws.py | 25 + core/config.py | 78 ++ database/connection.py | 11 + database/models.py | 42 + main.py | 64 ++ requirements.txt | 35 + response.py | 22 + services/.DS_Store | Bin 0 -> 6148 bytes services/auth/login.py | 49 + services/datasets/delete.py | 33 + services/datasets/metadata.py | 28 + services/datasets/pub.py | 679 +++++++++++++ services/datasets/publish_geonetwork.py | 696 +++++++++++++ services/datasets/publish_geoserver.py | 285 ++++++ services/upload_file/.DS_Store | Bin 0 -> 6148 bytes services/upload_file/ai_generate.py | 49 + services/upload_file/readers/reader_csv.py | 116 +++ services/upload_file/readers/reader_gdb.py | 75 ++ services/upload_file/readers/reader_mpk.py | 72 ++ services/upload_file/readers/reader_pdf.py | 288 ++++++ services/upload_file/readers/reader_shp.py | 60 ++ services/upload_file/upload.py | 941 ++++++++++++++++++ services/upload_file/upload_exceptions.py | 9 + services/upload_file/upload_ws.py | 27 + .../upload_file/utils/geometry_detector.py | 466 +++++++++ services/upload_file/utils/pdf_cleaner.py | 208 ++++ utils/logger_config.py | 65 ++ utils/qgis_init.py | 30 + 38 files changed, 5126 insertions(+) create mode 100644 .DS_Store create mode 100644 .gitignore create mode 100644 README.md create mode 100644 api/deps/auth_dependency.py create mode 100644 api/deps/role_dependency.py create mode 100644 api/routers/auth_router.py create mode 100644 api/routers/datasets_router.py create mode 100644 api/routers/system_router.py create mode 100644 api/routers/upload_file_router.py create mode 100644 api/routers/ws/manager.py create mode 100644 api/routers/ws/upload_progress_ws.py create mode 100644 core/config.py create mode 100644 database/connection.py create mode 100644 database/models.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 response.py create mode 100644 services/.DS_Store create mode 100644 services/auth/login.py create mode 100644 services/datasets/delete.py create mode 100644 services/datasets/metadata.py create mode 100644 services/datasets/pub.py create mode 100644 services/datasets/publish_geonetwork.py create mode 100644 services/datasets/publish_geoserver.py create mode 100644 services/upload_file/.DS_Store create mode 100644 services/upload_file/ai_generate.py create mode 100644 services/upload_file/readers/reader_csv.py create mode 100644 services/upload_file/readers/reader_gdb.py create mode 100644 services/upload_file/readers/reader_mpk.py create mode 100644 services/upload_file/readers/reader_pdf.py create mode 100644 services/upload_file/readers/reader_shp.py create mode 100644 services/upload_file/upload.py create mode 100644 services/upload_file/upload_exceptions.py create mode 100644 services/upload_file/upload_ws.py create mode 100644 services/upload_file/utils/geometry_detector.py create mode 100644 services/upload_file/utils/pdf_cleaner.py create mode 100644 utils/logger_config.py create mode 100644 utils/qgis_init.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..4ec72143a265cb525586a69bbc0893a89b2dd785 GIT binary patch literal 8196 zcmeHM%Wl&^6upzAjne>%ct~AFWm`gDKoOfz+C>)#sX?&d5u7+}O$`hGulad7Rd=kBnJ;sm2eXE*qYoZU=$cvU=EK8RHF|0bdWl~uc5B?O(sd4 z0d9v}ig4`!A5f1vI!ADODko1ib;(VkKBQ1n)CR6iyXp3F{F?Xf9d~S6; zH;fh*zKC4DFg0DYidNZr(bx?JjmVDr;hNoh#&?f{z-yhB?bZ{wGiX+>J_!8CcKuFI z5?rSP$n&SJ?*xOjpzk}OaCSKC-c?gyGsl6)s@>TCnqKA z+Vz|Fwwin1z8}1kx{*j0ExH)rkMj2@DI+>)^n5Syckq-Mm!XJzIkw-L;e(j;ZO={31jOR7lK&?I^!Lg#YQh+7_S_*ZXj1(1Dq?9v zA}%T8zc4~KQR={xFpDSkReLEbya1j#Su9q2tT)!c9w}I(L$s8dO;%9jl`xtBJoc{! zdq|N!E>=rpqB8Kv1UFzn&6jI z;H{!}S+YWS$MJyJ$<&Q!No7p~*#?u>l&*T&p)tZ~fo-Gpf7_|f=z@DVloSfpBC6}4u7is%T)cpR*EYWy;yRZbBM zj+2OPz@PLA%#oFimg|h}kBllE-Ny{@d&m)-rO{8&e|z0tUgfoVdG$Bf?IL1d(IB;U zPob9gNpDOYjsR7i0wQ$6?wfn5!~mATGfw%qy;(m0Z%qFE|BQPz%^3xZ0_RBqQCM%R z;}b=GbY?k*&jooLf0b17$LSCkEEhRr2wQm^haLT4h;lWlt0#6CTht)^@gD-T|0w0o VX7+!gA72Y*|Npgf+dsble+M9Y-;)3U literal 0 HcmV?d00001 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..69a4676 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +.env +main_old.py +main_mess.py +sijalinmaja.json +geonetwork_ISO.json +metadata.xml +notes.txt + +venv/ +pdf/ +data_cache/ +service_tmp/ +testing/ +test-ai/ +uploads/ +scrapp/ +logs/ +style_temp/ +services/styles/ + + +cleansing_func.sql \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..11d43d3 --- /dev/null +++ b/README.md @@ -0,0 +1,136 @@ +# πŸ›°οΈ FastAPI Backend β€” Sistem Referensi & Validasi Data Geospasial + +Proyek ini adalah backend berbasis **FastAPI** yang menangani proses **pembacaan data spasial (Shapefile, GeoJSON)**, **ekstraksi PDF**, serta **validasi dan sinkronisasi data** terhadap referensi basis data menggunakan **PostgreSQL/PostGIS** dan **RapidFuzz** untuk pencocokan string. + +--- + +## βš™οΈ Fitur Utama + +βœ… Upload dan ekstrak file `.zip` berisi `.shp` atau `.gdb` +βœ… Parsing PDF menggunakan `pdfplumber` +βœ… Konversi dan validasi geometri (Shapely + GeoPandas) +βœ… Pencocokan fuzzy string terhadap referensi DB (`RapidFuzz`) +βœ… Integrasi PostgreSQL / PostGIS melalui SQLAlchemy +βœ… Middleware CORS untuk komunikasi dengan frontend +βœ… Dukungan konfigurasi `.env` (via `python-dotenv`) + +--- + +## 🧱 Struktur Proyek + +``` +project-root/ +β”‚ +β”œβ”€β”€ core/ +β”‚ β”œβ”€β”€ config.py # Konfigurasi environment & DB URL +β”‚ └── utils/ # Fungsi tambahan (opsional) +β”‚ +β”œβ”€β”€ routes/ +β”‚ └── upload_routes.py # Endpoint untuk upload & validasi +β”‚ +β”œβ”€β”€ services/ +β”‚ └── pdf_service.py # Parser PDF +β”‚ └── shapefile_service.py # Pembaca dan validator shapefile +β”‚ +β”œβ”€β”€ main.py # Entry point FastAPI +β”œβ”€β”€ requirements.txt # Daftar dependensi +β”œβ”€β”€ .env # File konfigurasi (DB_URL, schema, dll) +└── README.md # Dokumentasi proyek ini +``` + +--- + +## πŸ”§ Instalasi dan Setup + +### 1️⃣ Clone Repository +```bash +git clone https://git.labmu.ac.id/username/nama-proyek.git +cd nama-proyek +``` + +### 2️⃣ Buat Virtual Environment +```bash +python -m venv venv +source venv/bin/activate # (Linux/Mac) +venv\Scripts\activate # (Windows) +``` + +### 3️⃣ Instal Dependensi +```bash +pip install -r requirements.txt +``` + +### 4️⃣ Konfigurasi File `.env` +Buat file `.env` di root proyek: + +```env +REFERENCE_DB_URL=postgresql+psycopg2://user:password@localhost:5432/nama_db +REFERENCE_SCHEMA=public +``` + +--- + +## πŸš€ Menjalankan Server + +Jalankan server FastAPI menggunakan **Uvicorn**: + +```bash +uvicorn main:app --reload +``` + +Server akan berjalan di: +πŸ‘‰ http://127.0.0.1:8000 + +--- + +## 🧠 Contoh Endpoint + +| Method | Endpoint | Deskripsi | +|--------|-----------|-----------| +| `POST` | `/upload/shapefile` | Upload file `.zip` berisi `.shp` | +| `POST` | `/upload/pdf` | Ekstrak tabel dari file PDF | +| `GET` | `/reference/check` | Validasi data terhadap referensi DB | + +--- + +## 🧩 Teknologi yang Digunakan + +| Kategori | Library | +|-----------|----------| +| Framework | FastAPI, Starlette | +| Database | SQLAlchemy, psycopg2, PostgreSQL/PostGIS | +| Data & Geo | Pandas, GeoPandas, Shapely, Fiona, PyProj | +| Parsing | pdfplumber | +| Matching | RapidFuzz | +| Utilitas | python-dotenv, pathlib, zipfile | +| Server | Uvicorn | + +--- + +## πŸ§ͺ Testing + +Jalankan server dan uji dengan **Swagger UI**: +``` +http://127.0.0.1:8000/docs +``` + +Atau gunakan **cURL / Postman** untuk pengujian manual. + +--- + +## 🧰 Tips Penggunaan + +- Pastikan `GDAL`, `GEOS`, dan `PROJ` sudah terinstal di sistem jika menggunakan `GeoPandas` / `Fiona`. +- Gunakan `pip freeze > requirements.txt` untuk memperbarui dependensi. +- Gunakan `.gitignore` agar file sensitif seperti `.env` tidak ikut ter-push. + +--- + +## πŸ‘¨β€πŸ’» Pengembang +**Nama:** Dimas Anhar + +--- + +## πŸ“„ Lisensi +Proyek ini dikembangkan untuk keperluan penelitian dan pengembangan internal. +Lisensi dapat disesuaikan sesuai kebijakan lab atau institusi. diff --git a/api/deps/auth_dependency.py b/api/deps/auth_dependency.py new file mode 100644 index 0000000..fbb1d0d --- /dev/null +++ b/api/deps/auth_dependency.py @@ -0,0 +1,34 @@ +from fastapi import Depends, Header +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from datetime import datetime +from response import errorRes + +from database.connection import SessionLocal +from database.models import User + +async def get_db(): + async with SessionLocal() as session: + yield session + + +async def get_current_user( + authorization: str = Header(None), + db: AsyncSession = Depends(get_db) +): + if not authorization or not authorization.startswith("Bearer "): + raise errorRes(status_code=401, message="Missing or invalid token") + + token = authorization.split(" ")[1] + result = await db.execute(select(User).where(User.active_token == token)) + user = result.scalar_one_or_none() + + # Case 1: Token not found β†’ maybe replaced by new login + if not user: + raise errorRes(status_code=401, message="Token invalid or used by another login") + + # Case 2: Token expired + if user.token_expired_at and user.token_expired_at < datetime.utcnow(): + raise errorRes(status_code=401, message="Token expired") + + return user diff --git a/api/deps/role_dependency.py b/api/deps/role_dependency.py new file mode 100644 index 0000000..c45069a --- /dev/null +++ b/api/deps/role_dependency.py @@ -0,0 +1,20 @@ +from fastapi import Depends, status +from api.deps.auth_dependency import get_current_user +from response import errorRes + +def require_role(required_role: str): + """ + Return a dependency function that ensures the current user has a specific role. + Example usage: + @router.get("/admin", dependencies=[Depends(require_role("admin"))]) + """ + async def role_checker(user = Depends(get_current_user)): + if user.role != required_role: + raise errorRes( + status_code=status.HTTP_403_FORBIDDEN, + message="Access denied", + detail=f"Access denied: requires role '{required_role}'", + ) + return user + + return role_checker diff --git a/api/routers/auth_router.py b/api/routers/auth_router.py new file mode 100644 index 0000000..8e8140e --- /dev/null +++ b/api/routers/auth_router.py @@ -0,0 +1,14 @@ +from fastapi import APIRouter, Depends +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession +from services.auth.login import loginService, get_db + +router = APIRouter() + +class LoginRequest(BaseModel): + username: str + password: str + +@router.post("/login") +async def login(request: LoginRequest, db: AsyncSession = Depends(get_db)): + return await loginService(request.username, request.password, db) diff --git a/api/routers/datasets_router.py b/api/routers/datasets_router.py new file mode 100644 index 0000000..531125d --- /dev/null +++ b/api/routers/datasets_router.py @@ -0,0 +1,273 @@ +import asyncio +from uuid import uuid4 +from fastapi import APIRouter, HTTPException +import requests +from sqlalchemy import text +from sqlalchemy.exc import SQLAlchemyError +from database.connection import engine +from services.datasets.delete import delete_dataset_from_partition # import fungsi di atas +from response import successRes, errorRes +from services.datasets.publish_geonetwork import publish_metadata +from services.datasets.publish_geoserver import publish_layer_to_geoserver +from services.datasets.metadata import update_job_status +from services.upload_file.upload_ws import report_progress +from core.config import GEOSERVER_URL, GEOSERVER_USER, GEOSERVER_PASS, QGIS_URL + +router = APIRouter() + +def serialize_row(row_dict): + new_dict = {} + for key, value in row_dict.items(): + if hasattr(value, "isoformat"): + new_dict[key] = value.isoformat() + else: + new_dict[key] = value + return new_dict + + + + +@router.get("/metadata") +async def get_author_metadata( + # user = Depends(get_current_user) +): + """ + Mengambil data author_metadata: + - Admin β†’ semua data + - User β†’ hanya data miliknya + """ + + try: + async with engine.begin() as conn: + + query = text(""" + SELECT * + FROM backend.author_metadata + ORDER BY CASE process + WHEN 'CLEANSING' THEN 1 + WHEN 'ERROR' THEN 2 + WHEN 'FINISHED' THEN 3 + WHEN 'TESTING' THEN 4 + END; + """) + result = await conn.execute(query) + rows = result.fetchall() + + + # data = [dict(row._mapping) for row in rows] + data = [serialize_row(dict(row._mapping)) for row in rows] + + return successRes( + message="Berhasil mengambil data author metadata", + data=data + ) + + except Exception as e: + print(f"[ERROR] Gagal ambil author_metadata: {e}") + raise errorRes( + status_code=500, + message="Gagal mengambil data author_metadata", + details=str(e) + ) + + + +@router.delete("/delete/{user_id}/{metadata_id}") +async def delete_dataset(user_id: int, metadata_id: int, title: str): + """ + Hapus dataset tertentu (berdasarkan user_id dan metadata_id) + """ + try: + async with engine.begin() as conn: + await delete_dataset_from_partition(conn, user_id, metadata_id, title) + return successRes(message=f"Dataset {title} berhasil dihapus.", data="") + + except Exception as e: + print(f"[ERROR] Gagal hapus dataset: {e}") + raise errorRes(status_code=500, details=str(e), message="Gagal hapus dataset") + + + +# @router.post("/cleansing/{table_name}") +def cleansing_data(table_name: str, job_id: str): + payload = { + "table_name": table_name, + "job_id": job_id + } + print("cleansing_data runn") + # response = requests.post( + # f"{QGIS_URL}/process/{table_name}", + # ) + response = requests.post( + f"{QGIS_URL}/process", + json=payload, + ) + return response + + +@router.post("/jobs/callback") +async def job_callback(payload: dict): + table = payload["table"] + job_id = payload["job_id"] + # await asyncio.sleep(10) + + await report_progress(job_id, "cleansing", 50, "Cleansing data selesai") + # await asyncio.sleep(5) + + geos_link = publish_layer_to_geoserver(table, job_id) + await report_progress(job_id, "publish_geoserver", 80, "Publish GeoServer selesai") + # await asyncio.sleep(3) + + uuid = publish_metadata(table_name=table, geoserver_links=geos_link) + await report_progress(job_id, "done", 100, "Publish GeoNetwork selesai") + + update_job_status(table, "FINISHED", job_id) + return { + "ok": True, + "uuid": uuid + } + + + +@router.get("/styles") +def get_style_list(workspace: str = None): + """ + Mengambil daftar style yang ada di GeoServer. + - Jika workspace = None β†’ ambil style global + - Jika workspace diisi β†’ ambil style milik workspace tersebut + """ + + # Tentukan URL sesuai workspace + if workspace: + url = f"{GEOSERVER_URL}/rest/workspaces/{workspace}/styles" + else: + url = f"{GEOSERVER_URL}/rest/styles" + + headers = {"Accept": "application/json"} + + try: + response = requests.get( + url, + auth=(GEOSERVER_USER, GEOSERVER_PASS), + headers=headers, + timeout=15 + ) + + if response.status_code == 200: + data = response.json() + + styles = data.get("styles", {}).get("style", []) + + return { + "status": "success", + "workspace": workspace, + "count": len(styles), + "styles": styles + } + + else: + raise HTTPException(status_code=response.status_code, detail=response.text) + + except requests.exceptions.RequestException as e: + raise HTTPException(status_code=500, detail=f"Request error: {str(e)}") + + + +@router.get("/styles/{style_name}") +def get_style(style_name: str, workspace: str = None): + """ + Mengambil file SLD style dari GeoServer. + - Jika workspace tidak diisi β†’ ambil style global + - Jika workspace diisi β†’ ambil dari workspace + """ + + # Tentukan endpoint sesuai workspace + url = f"{GEOSERVER_URL}/rest/styles/{style_name}.sld" + + try: + response = requests.get( + url, + auth=(GEOSERVER_USER, GEOSERVER_PASS), + timeout=15 + ) + + if response.status_code == 200: + # Return isi SLD sebagai text + return { + "status": "success", + "style_name": style_name, + "workspace": workspace, + "sld": response.text + } + + elif response.status_code == 404: + raise HTTPException(status_code=404, detail="Style tidak ditemukan di GeoServer") + + else: + raise HTTPException(status_code=500, detail=f"GeoServer error: {response.text}") + + except requests.exceptions.RequestException as e: + raise HTTPException(status_code=500, detail=f"Request error: {str(e)}") + + +# ============================================================= +# cleansing query +# ============================================================= + +async def query_cleansing_data( + table_name: str +): + try: + async with engine.begin() as conn: + await conn.execute( + text("SELECT public.fn_cleansing_satupeta_polygon(:table_name)"), + {"table_name": table_name} + ) + return "done" + + except SQLAlchemyError as e: + raise RuntimeError(f"Fix geometry failed: {str(e)}") + +async def publish_layer(table_name: str, job_id: str): + # await asyncio.sleep(10) + + await report_progress(job_id, "cleansing", 50, "Cleansing data selesai") + # await asyncio.sleep(5) + + geos_link = publish_layer_to_geoserver(table_name, job_id) + await report_progress(job_id, "publish_geoserver", 80, "Publish GeoServer selesai") + # await asyncio.sleep(3) + + uuid = publish_metadata(table_name=table_name, geoserver_links=geos_link) + await report_progress(job_id, "done", 100, "Publish GeoNetwork selesai") + + update_job_status(table_name, "FINISHED", job_id) + return { + "ok": True, + "uuid": uuid + } + + + +async def publish_layer(table_name: str, job_id: str): + # await asyncio.sleep(10) + try: + await report_progress(job_id, "cleansing", 50, "Cleansing data selesai") + # await asyncio.sleep(5) + + geos_link = publish_layer_to_geoserver(table_name, job_id) + await report_progress(job_id, "publish_geoserver", 80, "Publish GeoServer selesai") + # await asyncio.sleep(3) + + uuid = publish_metadata( + table_name=table_name, + geoserver_links=geos_link + ) + await report_progress(job_id, "done", 100, "Publish GeoNetwork selesai") + + update_job_status(table_name, "FINISHED", job_id) + return uuid + + except Exception as e: + update_job_status(table_name, "FAILED", job_id) + raise RuntimeError(f"Publish layer gagal: {e}") from e diff --git a/api/routers/system_router.py b/api/routers/system_router.py new file mode 100644 index 0000000..d950bec --- /dev/null +++ b/api/routers/system_router.py @@ -0,0 +1,110 @@ +import httpx +from fastapi import APIRouter +from datetime import datetime, timedelta +import requests +from core.config import API_VERSION, GEOSERVER_URL, GEOSERVER_USER, GEOSERVER_PASS, GEONETWORK_URL, GEONETWORK_USER, GEONETWORK_PASS + +router = APIRouter() + +@router.get("/status") +async def server_status(): + utc_time = datetime.utcnow() + wib_time = utc_time + timedelta(hours=7) + formatted_time = wib_time.strftime("%d-%m-%Y %H:%M:%S") + + return { + "status": "success", + "message": "Server is running smoothly βœ…", + "data": { + "service": "upload_automation", + "timestamp": f"{formatted_time} WIB" + }, + "meta": {"version": API_VERSION, "environment": "deployment"} + } + + +@router.get("/status/geoserver") +async def check_geoserver_auth(): + url = f"{GEOSERVER_URL}/rest/about/version." + auth = (GEOSERVER_USER, GEOSERVER_PASS) + + try: + async with httpx.AsyncClient() as client: + response = await client.get(url, auth=auth, timeout=5) + return { + "status": "OK" if response.status_code == 200 else "ERROR", + "code": response.status_code, + "response": response.text + } + except Exception as e: + return {"status": "FAILED", "error": str(e)} + + +@router.get("/status/geonetwork") +def test_geonetwork_connection(): + + url = f"{GEONETWORK_URL}/srv/api/site" + + headers = { + "Accept": "application/json", + "X-Requested-With": "XMLHttpRequest" + } + + try: + response = requests.get( + url, + auth=(GEONETWORK_USER, GEONETWORK_PASS), + headers=headers, + timeout=10 + ) + + if response.status_code == 401: + return { + "status": "ERROR", + "message": "Unauthorized β€” cek username/password GeoNetwork." + } + + if response.status_code == 403: + return { + "status": "ERROR", + "message": "Forbidden β€” akun tidak punya akses ke API." + } + + if response.status_code != 200: + return { + "status": "ERROR", + "message": "GeoNetwork merespon dengan error.", + "code": response.status_code, + "detail": response.text + } + + return { + "status": "OK", + "code": response.status_code, + "message": "Terhubung ke GeoNetwork.", + "geonetwork_info": response.json() + } + + except requests.exceptions.ConnectionError: + return { + "status": "ERROR", + "message": "Tidak dapat terhubung ke GeoNetwork (server offline / URL salah)" + } + + except requests.exceptions.Timeout: + return { + "status": "ERROR", + "message": "Timeout menghubungi GeoNetwork." + } + + except Exception as e: + return { + "status": "ERROR", + "message": "Unexpected error", + "detail": str(e) + } + + + + + diff --git a/api/routers/upload_file_router.py b/api/routers/upload_file_router.py new file mode 100644 index 0000000..e7b8ede --- /dev/null +++ b/api/routers/upload_file_router.py @@ -0,0 +1,41 @@ + +from fastapi import APIRouter, File, Form, UploadFile, Depends +from pydantic import BaseModel +from typing import Any, Dict, List, Optional +from services.upload_file.upload import handle_upload_file, handle_process_pdf, handle_to_postgis +from api.deps.role_dependency import require_role +from database.connection import engine + +router = APIRouter() + +@router.post("/file") +# async def upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form(""), user = Depends(require_role("admin"))): +async def upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form(""), file_desc: Optional[str] = Form("")): + return await handle_upload_file(file, page, sheet, file_desc) + + + +class PdfRequest(BaseModel): + title: str + columns: List[str] + rows: List[List] + fileName: str + fileDesc: str + +@router.post("/process-pdf") +async def upload_file(payload: PdfRequest): + return await handle_process_pdf(payload) + + + +class UploadRequest(BaseModel): + title: str + rows: List[dict] + columns: List[str] + author: Dict[str, Any] + style: str + +@router.post("/to-postgis") +async def upload_to_postgis(payload: UploadRequest): + # return await handle_to_postgis(payload, engine) + return await handle_to_postgis(payload) \ No newline at end of file diff --git a/api/routers/ws/manager.py b/api/routers/ws/manager.py new file mode 100644 index 0000000..5833059 --- /dev/null +++ b/api/routers/ws/manager.py @@ -0,0 +1,23 @@ +from typing import Dict, List +from fastapi import WebSocket + +class JobWSManager: + def __init__(self): + self.connections: Dict[str, List[WebSocket]] = {} + + async def connect(self, job_id: str, ws: WebSocket): + await ws.accept() + self.connections.setdefault(job_id, []).append(ws) + + def disconnect(self, job_id: str, ws: WebSocket): + if job_id in self.connections: + self.connections[job_id].remove(ws) + if not self.connections[job_id]: + del self.connections[job_id] + + async def send(self, job_id: str, data: dict): + for ws in self.connections.get(job_id, []): + await ws.send_json(data) + + +manager = JobWSManager() diff --git a/api/routers/ws/upload_progress_ws.py b/api/routers/ws/upload_progress_ws.py new file mode 100644 index 0000000..746472d --- /dev/null +++ b/api/routers/ws/upload_progress_ws.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from services.upload_file.upload_ws import job_state +from api.routers.ws.manager import manager + +router = APIRouter() + +@router.websocket("/ws/test") +async def ws_test(ws: WebSocket): + await ws.accept() + await ws.send_text("OK") + + +@router.websocket("/ws/job/{job_id}") +async def ws_job(job_id: str, ws: WebSocket): + await manager.connect(job_id, ws) + + # kirim progress terakhir (jika reconnect) + if job_id in job_state: + await ws.send_json(job_state[job_id]) + + try: + while True: + await ws.receive_text() # keep alive + except WebSocketDisconnect: + manager.disconnect(job_id, ws) diff --git a/core/config.py b/core/config.py new file mode 100644 index 0000000..3de9c18 --- /dev/null +++ b/core/config.py @@ -0,0 +1,78 @@ +from pathlib import Path +from dotenv import load_dotenv +import os + +load_dotenv() + +API_VERSION = "2.1.3" + +POSTGIS_URL = os.getenv("POSTGIS_URL") +POSTGIS_SYNC_URL = os.getenv("SYNC_URL") + +QGIS_URL = os.getenv("QGIS_API_URL") + +GEN_AI_URL = os.getenv("GEN_AI_URL") + +GEOSERVER_URL = os.getenv("GEOSERVER_PATH") +GEOSERVER_USER = os.getenv("GEOSERVER_UNAME") +GEOSERVER_PASS = os.getenv("GEOSERVER_PASS") +GEOSERVER_WORKSPACE = os.getenv("GEOSERVER_WORKSPACE") + +GEONETWORK_URL=os.getenv("GEONETWORK_URL") +GEONETWORK_USER=os.getenv("GEONETWORK_USER") +GEONETWORK_PASS=os.getenv("GEONETWORK_PASS") + +UPLOAD_FOLDER = Path(os.getenv("UPLOAD_FOLDER", "./uploads")) +MAX_FILE_MB = int(os.getenv("MAX_FILE_MB", 30)) + +ALLOWED_ORIGINS = [ + "http://localhost:4000", + "http://localhost:3000", + "http://127.0.0.1:3000", + "http://localhost:5173", + "http://127.0.0.1:5173", + + "192.168.60.24:5173", + "http://labai.polinema.ac.id:666", + "https://kkqc31ns-5173.asse.devtunnels.ms" +] + +REFERENCE_DB_URL = os.getenv("REFERENCE_DB_URL") +REFERENCE_SCHEMA = os.getenv("REFERENCE_SCHEMA", "batas_wilayah") +DESA_REF = "WADMKD" +KEC_REF = "WADMKC" +KAB_REF = "WADMKK" + +CACHE_FOLDER = Path(os.getenv("CACHE_FOLDER", "./cache")) + + +VALID_WKT_PREFIXES = ( + "POINT", + "POINT Z", + "POINT M", + "POINT ZM", + "MULTIPOINT", + "MULTIPOINT Z", + "MULTIPOINT M", + "MULTIPOINT ZM", + "LINESTRING", + "LINESTRING Z", + "LINESTRING M", + "LINESTRING ZM", + "MULTILINESTRING", + "MULTILINESTRING Z", + "MULTILINESTRING M", + "MULTILINESTRING ZM", + "POLYGON", + "POLYGON Z", + "POLYGON M", + "POLYGON ZM", + "MULTIPOLYGON", + "MULTIPOLYGON Z", + "MULTIPOLYGON M", + "MULTIPOLYGON ZM", + "GEOMETRYCOLLECTION", + "GEOMETRYCOLLECTION Z", + "GEOMETRYCOLLECTION M", + "GEOMETRYCOLLECTION ZM", +) \ No newline at end of file diff --git a/database/connection.py b/database/connection.py new file mode 100644 index 0000000..710396f --- /dev/null +++ b/database/connection.py @@ -0,0 +1,11 @@ +from sqlalchemy import create_engine +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from core.config import POSTGIS_URL, POSTGIS_SYNC_URL + +engine = create_async_engine(POSTGIS_URL, pool_pre_ping=True) +# SessionLocal = sessionmaker(bind=engine) +SessionLocal = async_sessionmaker(engine, expire_on_commit=False) + +sync_engine = create_engine(POSTGIS_SYNC_URL) diff --git a/database/models.py b/database/models.py new file mode 100644 index 0000000..27b297d --- /dev/null +++ b/database/models.py @@ -0,0 +1,42 @@ +from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, TIMESTAMP +from sqlalchemy.orm import relationship +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.sql import func + +Base = declarative_base() + +class UploadLog(Base): + __tablename__ = "upload_logs" + id = Column(Integer, primary_key=True, index=True) + filename = Column(String, nullable=False) + table_name = Column(String, nullable=False) + file_type = Column(String, nullable=False) + rows_count = Column(Integer) + uploaded_at = Column(TIMESTAMP, server_default=func.now()) + status = Column(String) + message = Column(Text) + + +class Organization(Base): + __tablename__ = "organizations" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String(100), unique=True, nullable=False) + address = Column(String(200), nullable=True) + + users = relationship("User", back_populates="organization") + + +class User(Base): + __tablename__ = "users" + + id = Column(Integer, primary_key=True, index=True) + username = Column(String(50), unique=True, nullable=False) + password_hash = Column(String(255), nullable=False) + role = Column(String(50), nullable=False, default="user") # <── Added role + organization_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) + active_token = Column(String(255), nullable=True) + token_expired_at = Column(DateTime, nullable=True) + last_login = Column(DateTime, nullable=True) + + organization = relationship("Organization", back_populates="users") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3dbf0f5 --- /dev/null +++ b/main.py @@ -0,0 +1,64 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from core.config import API_VERSION, ALLOWED_ORIGINS +from database.connection import engine +from database.models import Base +from api.routers.system_router import router as system_router +from api.routers.upload_file_router import router as upload_router +from api.routers.auth_router import router as auth_router +from api.routers.datasets_router import router as dataset_router +from api.routers.ws.upload_progress_ws import router as ws_router +# from contextlib import asynccontextmanager +# from utils.qgis_init import init_qgis + +app = FastAPI( + title="ETL Geo Upload Service", + version=API_VERSION, + description="Upload Automation API" +) + +app.add_middleware( + CORSMiddleware, + allow_origins=ALLOWED_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Base.metadata.create_all(bind=engine) + +# qgis setup +# @asynccontextmanager +# async def lifespan(app: FastAPI): +# global qgs +# qgs = init_qgis() +# print("QGIS initialized") + +# yield + +# # SHUTDOWN (optional) +# print("Shutting down...") + +# app = FastAPI(lifespan=lifespan) + + +# @app.get("/qgis/status") +# def qgis_status(): +# try: +# version = QgsApplication.qgisVersion() +# return { +# "qgis_status": "connected", +# "qgis_version": version +# } +# except Exception as e: +# return { +# "qgis_status": "error", +# "error": str(e) +# } + +# Register routers +app.include_router(ws_router) +app.include_router(system_router, tags=["System"]) +app.include_router(auth_router, prefix="/auth", tags=["Auth"]) +app.include_router(upload_router, prefix="/upload", tags=["Upload"]) +app.include_router(dataset_router, prefix="/dataset", tags=["Upload"]) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..72d185a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,35 @@ +fastapi +uvicorn[standard] +pandas +numpy +geopandas +shapely +fiona +pyproj +SQLAlchemy +sqlalchemy +psycopg2-binary +rapidfuzz +pdfplumber +zipfile36 +python-dotenv +pydantic +python-multipart +aiofiles +starlette +openpyxl +requests +pathlib +pyarrow +geoalchemy2 +asyncpg +py7zr +bcrypt==4.0.1 +passlib == 1.7.4 +urllib3<2 +osgeo +websockets + + +# --- jika menggunakan ai --- +groq diff --git a/response.py b/response.py new file mode 100644 index 0000000..3804dc3 --- /dev/null +++ b/response.py @@ -0,0 +1,22 @@ +from fastapi import HTTPException +from fastapi.responses import JSONResponse + +def successRes(data=None, message="Success", status_code=200): + return JSONResponse( + status_code=status_code, + content={ + "status": "success", + "message": message, + "data": data, + } + ) + +def errorRes(message="Error", status_code=400, details=None): + return HTTPException( + status_code=status_code, + detail={ + "status": "error", + "message": message, + "details": details + } + ) diff --git a/services/.DS_Store b/services/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..9a030516c40377590eff6ebbb90728008ea5fc3b GIT binary patch literal 6148 zcmeHK!Ab)$5S_HuZYe?!3OxqA7Ob@j;$^Az2VBvEO5J5wU0gS%+uB1Z>{)-vFY$Yv zNwQ+8J$Ml*Gcb9R$;^hlESUrVL~j!A12h1@K_x8Ju=zn~oODS_) zvJ}mZ|HuICT^%Mc#2r)cegC3hnhk?Q1sL%j43jt;x7#10RIY4nR-LL-cW%8$HTA~+ zc#`$}(KYohluCla_JhkfnhqMm2>zM(k=F|p_ z=4^J@J#5KN_h8Z1c2x<1l)g^&bo zdP@*Wi>}4oAda92lZt3kg?(ZOla79A<6MimL6Z(b&y3%(GYk7d5qfs?OC1ivHOMV9 zzzi%hP&VBL)&H~A@BhUl?lA+*z*;dNDt)it!zJ0;y0kc|wG#CXm4xDQgI_6V=u(Wa eREnFZTF@`aKy)qU2GN7U7XeKJH_X7FGVlri&Q4 str: + """ + Escape karakter berbahaya di dalam URL agar valid dalam XML. + Khususnya mengganti '&' menjadi '&' kecuali jika sudah '&'. + """ + # Ganti semua & yang bukan bagian dari & + url = re.sub(r'&(?!amp;)', '&', url) + return url + + +def fix_xml_urls(xml: str) -> str: + """ + Temukan semua ... dalam XML dan escape URL-nya. + """ + def replacer(match): + original = match.group(1).strip() + fixed = escape_url_params(original) + return f"{fixed}" + + # Replace semua ... + xml_fixed = re.sub( + r"(.*?)", + replacer, + xml, + flags=re.DOTALL + ) + + return xml_fixed + + + +def get_extent(table_name: str): + + sql = f""" + SELECT + ST_XMin(extent), ST_YMin(extent), + ST_XMax(extent), ST_YMax(extent) + FROM ( + SELECT ST_Extent(geom) AS extent + FROM public.{table_name} + ) AS box; + """ + + conn = engine.connect() + try: + row = conn.execute(text(sql)).fetchone() + finally: + conn.close() + + if not row or row[0] is None: + return None + + # return { + # "xmin": float(row[0]), + # "ymin": float(row[1]), + # "xmax": float(row[2]), + # "ymax": float(row[3]) + # } + + return { + "xmin": 110.1372, # west + "ymin": -9.3029, # south + "xmax": 114.5287, # east + "ymax": -5.4819 # north + } + +def get_author_metadata(table_name: str): + + sql = """ + SELECT am.table_title, am.dataset_title, am.dataset_abstract, am.keywords, am.date_created, + am.organization_name, am.contact_person_name, am.created_at, + am.contact_email, am.contact_phone, am.geom_type, + u.organization_id, + o.address AS organization_address, + o.email AS organization_email, + o.phone_number AS organization_phone + FROM backend.author_metadata AS am + LEFT JOIN backend.users u ON am.user_id = u.id + LEFT JOIN backend.organizations o ON u.organization_id = o.id + WHERE am.table_title = :table + LIMIT 1 + """ + + conn = engine.connect() + try: + row = conn.execute(text(sql), {"table": table_name}).fetchone() + finally: + conn.close() + + if not row: + raise Exception(f"Tidak ada metadata untuk tabel: {table_name}") + + return dict(row._mapping) + + +def map_geom_type(gtype): + + if gtype is None: + return "surface" + + # Jika LIST β†’ ambil elemen pertama + if isinstance(gtype, list): + if len(gtype) > 0: + gtype = gtype[0] + else: + return "surface" + + # Setelah pasti string + gtype = str(gtype).lower() + + if "polygon" in gtype or "multi" in gtype: + return "surface" + if "line" in gtype: + return "curve" + if "point" in gtype: + return "point" + + return "surface" + + +def generate_metadata_xml(table_name, meta, extent, geoserver_links): + + keywords_xml = "".join([ + f""" + {kw.strip()} + """ for kw in meta["keywords"].split(",") + ]) + + geom_type_code = map_geom_type(meta["geom_type"]) + print('type', geom_type_code) + uuid = str(uuid4()) + + return f""" + + + {uuid} + + + + + + + + + + + + + + {meta['contact_person_name']} + + + {meta['organization_name']} + + + + + + + {meta['organization_phone']} + + + {meta['organization_phone']} + + + + + + + {meta['organization_address']} + + + Surabaya + + + Jawa Timur + + + Indonesia + + + {meta['organization_email']} + + + + + 08.00-16.00 + + + + + + + + + + {datetime.utcnow().isoformat()}+07:00 + + + ISO 19115:2003/19139 + + + 1.0 + + + + + + + + + + 38 + + + + + + + + + + + 4326 + + + EPSG + + + + + + + + + + + {meta['dataset_title']} + + + + + {meta['created_at'].isoformat()}+07:00 + + + + + + + + {meta['date_created'].year} + + + + + {meta['contact_person_name']} + + + {meta['organization_name']} + + + + + + + {meta['organization_phone']} + + + {meta['organization_phone']} + + + + + + + {meta['organization_address']} + + + Surabaya + + + Indonesia + + + {meta['organization_email']} + + + + + 08.00-16.00 + + + + + + + + + + Timezone: UTC+7 (Asia/Jakarta) + + + + + {meta['dataset_abstract']} + + + {meta['dataset_abstract']} + + + + + + + + Dinas Tenaga Kerja dan Transmigrasi Provinsi Jawa Timur + + + Dinas Tenaga Kerja dan Transmigrasi Provinsi Jawa Timur + + + + + + + + {meta['organization_phone']} + + + {meta['organization_phone']} + + + + + + + {meta['organization_address']} + + + Surabaya + + + Jawa Timur + + + Indonesia + + + {meta['organization_email']} + + + + + + + + + + + + + + + + + + + + {keywords_xml} + + + + + + + + + + + + Penggunaan data harus mencantumkan sumber: {meta['organization_name']}. + + + + + + + + + + + + 25000 + + + + + + + + + + + + + + + + {extent['xmin']} + {extent['xmax']} + {extent['ymin']} + {extent['ymax']} + + + + + + + + + + true + + + + + + {meta['dataset_title']} + + + + + {meta['created_at'].isoformat()}+07:00 + + + + + + + + {meta['date_created'].year} + + + + + + + + + + + + + {geoserver_links["wms_url"]} + + + DB:POSTGIS + + + {meta["dataset_title"]} + + + {meta["dataset_title"]} + + + + + + + {geoserver_links["wms_url"]} + + + WWW:LINK-1.0-http--link + + + {meta["dataset_title"]} + + + {meta["dataset_title"]} + + + + + + + {geoserver_links["wms_url"]} + + + OGC:WMS + + + {meta["dataset_title"]} + + + + + + + + {geoserver_links["wfs_url"]} + + + OGC:WFS + + + {meta["dataset_title"]} + + + + + + + + + + + + + + + + + + + + Data dihasilkan dari digitasi peta dasar skala 1:25000 menggunakan QGIS. + + + + + + +""" + + +# Geonetwork version 4.4.9.0 +def upload_metadata_to_geonetwork(xml_metadata: str): + # session = requests.Session() + # session.auth = (GEONETWORK_USER, GEONETWORK_PASS) + + # # 1. Get XSRF token + # try: + # info_url = f"{GEONETWORK_URL}/srv/eng/info?type=me" + # session.get(info_url) + # except requests.exceptions.RequestException as e: + # raise HTTPException(status_code=503, detail=f"Failed to connect to GeoNetwork: {e}") + + # xsrf_token = session.cookies.get('XSRF-TOKEN') + # if not xsrf_token: + # raise HTTPException(status_code=500, detail="Could not retrieve XSRF-TOKEN from GeoNetwork.") + + session, xsrf_token = create_gn_session() + + headers = { + 'X-XSRF-TOKEN': xsrf_token, + 'Accept': 'application/json' + } + + GN_API_RECORDS_URL = f"{GEONETWORK_URL}/srv/api/records" + + # 2. GeoNetwork requires a multipart/form-data upload + files = { + 'file': ('metadata.xml', xml_metadata, 'application/xml') + } + + params = { + "ownerGroup": 1, # all + "ownerUser": 1 # admin + } + + response = session.post( + GN_API_RECORDS_URL, + params=params, + files=files, + headers=headers, + cookies=session.cookies.get_dict() + ) + + metadata_infos = response.json().get("metadataInfos", {}) + uuid = None + for records in metadata_infos.values(): + if records and isinstance(records, list): + uuid = records[0].get("uuid") + break + if not uuid: + raise ValueError("UUID not found in GeoNetwork response") + + record = publish_record(session, uuid) + print('[record]', record) + + # print("response", response.json()) + return response.json() + + + +def publish_metadata(table_name: str, geoserver_links: dict): + + extent = get_extent(table_name) + meta = get_author_metadata(table_name) + xml = generate_metadata_xml( + table_name=meta["dataset_title"], + meta=meta, + extent=extent, + geoserver_links=geoserver_links + ) + + xml_clean = fix_xml_urls(xml) + response = upload_metadata_to_geonetwork(xml_clean) + + uuid = response.get("uuid") + print(f"[GeoNetwork] Metadata uploaded. UUID = {uuid}") + + return uuid + + + +def publish_record(session, uuid): + print('[uuid]', uuid) + xsrf_token = session.cookies.get('XSRF-TOKEN') + + headers = { + "X-XSRF-TOKEN": xsrf_token, + "Accept": "application/json", + "Content-Type": "application/json" + } + + url = f"{GEONETWORK_URL}/srv/api/records/{uuid}/sharing" + + payload = { + "clear": True, + "privileges": [ + { + "group": 1, + "operations": { + "view": True + } + } + ] + } + + response = session.put(url, json=payload, headers=headers) + response.raise_for_status() + return response.json() + diff --git a/services/datasets/publish_geonetwork.py b/services/datasets/publish_geonetwork.py new file mode 100644 index 0000000..decca62 --- /dev/null +++ b/services/datasets/publish_geonetwork.py @@ -0,0 +1,696 @@ +from fastapi import HTTPException +import requests +from sqlalchemy import text +from core.config import GEONETWORK_PASS, GEONETWORK_URL, GEONETWORK_USER +from database.connection import sync_engine as engine +from datetime import datetime +from uuid import uuid4 +import re + + + +def create_gn_session(): + session = requests.Session() + session.auth = (GEONETWORK_USER, GEONETWORK_PASS) + + session.get(f"{GEONETWORK_URL}/srv/eng/info?type=me") + xsrf_token = session.cookies.get("XSRF-TOKEN") + + if not xsrf_token: + raise Exception("XSRF token missing") + + return session, xsrf_token + + + +def escape_url_params(url: str) -> str: + """ + Escape karakter berbahaya di dalam URL agar valid dalam XML. + Khususnya mengganti '&' menjadi '&' kecuali jika sudah '&'. + """ + # Ganti semua & yang bukan bagian dari & + url = re.sub(r'&(?!amp;)', '&', url) + return url + + +def fix_xml_urls(xml: str) -> str: + """ + Temukan semua ... dalam XML dan escape URL-nya. + """ + def replacer(match): + original = match.group(1).strip() + fixed = escape_url_params(original) + return f"{fixed}" + + # Replace semua ... + xml_fixed = re.sub( + r"(.*?)", + replacer, + xml, + flags=re.DOTALL + ) + + return xml_fixed + + + +def get_extent(table_name: str): + + sql = f""" + SELECT + ST_XMin(extent), ST_YMin(extent), + ST_XMax(extent), ST_YMax(extent) + FROM ( + SELECT ST_Extent(geom) AS extent + FROM public.{table_name} + ) AS box; + """ + + conn = engine.connect() + try: + row = conn.execute(text(sql)).fetchone() + finally: + conn.close() + + if not row or row[0] is None: + return None + + # return { + # "xmin": float(row[0]), + # "ymin": float(row[1]), + # "xmax": float(row[2]), + # "ymax": float(row[3]) + # } + + return { + "xmin": 110.1372, # west + "ymin": -9.3029, # south + "xmax": 114.5287, # east + "ymax": -5.4819 # north + } + +def get_author_metadata(table_name: str): + + sql = """ + SELECT am.table_title, am.dataset_title, am.dataset_abstract, am.keywords, am.date_created, + am.organization_name, am.contact_person_name, am.created_at, + am.contact_email, am.contact_phone, am.geom_type, + u.organization_id, + o.address AS organization_address, + o.email AS organization_email, + o.phone_number AS organization_phone + FROM backend.author_metadata AS am + LEFT JOIN backend.users u ON am.user_id = u.id + LEFT JOIN backend.organizations o ON u.organization_id = o.id + WHERE am.table_title = :table + LIMIT 1 + """ + + conn = engine.connect() + try: + row = conn.execute(text(sql), {"table": table_name}).fetchone() + finally: + conn.close() + + if not row: + raise Exception(f"Tidak ada metadata untuk tabel: {table_name}") + + return dict(row._mapping) + + +def map_geom_type(gtype): + + if gtype is None: + return "surface" + + # Jika LIST β†’ ambil elemen pertama + if isinstance(gtype, list): + if len(gtype) > 0: + gtype = gtype[0] + else: + return "surface" + + # Setelah pasti string + gtype = str(gtype).lower() + + if "polygon" in gtype or "multi" in gtype: + return "surface" + if "line" in gtype: + return "curve" + if "point" in gtype: + return "point" + + return "surface" + + +def generate_metadata_xml(table_name, meta, extent, geoserver_links): + + keywords_xml = "".join([ + f""" + {kw.strip()} + """ for kw in meta["keywords"].split(",") + ]) + + geom_type_code = map_geom_type(meta["geom_type"]) + print('type', geom_type_code) + uuid = str(uuid4()) + + return f""" + + + {uuid} + + + + + + + + + + + + + + {meta['contact_person_name']} + + + {meta['organization_name']} + + + + + + + {meta['organization_phone']} + + + {meta['organization_phone']} + + + + + + + {meta['organization_address']} + + + Surabaya + + + Jawa Timur + + + Indonesia + + + {meta['organization_email']} + + + + + 08.00-16.00 + + + + + + + + + + {datetime.utcnow().isoformat()}+07:00 + + + ISO 19115:2003/19139 + + + 1.0 + + + + + + + + + + 38 + + + + + + + + + + + 4326 + + + EPSG + + + + + + + + + + + {meta['dataset_title']} + + + + + {meta['created_at'].isoformat()}+07:00 + + + + + + + + {meta['date_created'].year} + + + + + {meta['contact_person_name']} + + + {meta['organization_name']} + + + + + + + {meta['organization_phone']} + + + {meta['organization_phone']} + + + + + + + {meta['organization_address']} + + + Surabaya + + + Indonesia + + + {meta['organization_email']} + + + + + 08.00-16.00 + + + + + + + + + + Timezone: UTC+7 (Asia/Jakarta) + + + + + {meta['dataset_abstract']} + + + {meta['dataset_abstract']} + + + + + + + + Dinas Tenaga Kerja dan Transmigrasi Provinsi Jawa Timur + + + Dinas Tenaga Kerja dan Transmigrasi Provinsi Jawa Timur + + + + + + + + {meta['organization_phone']} + + + {meta['organization_phone']} + + + + + + + {meta['organization_address']} + + + Surabaya + + + Jawa Timur + + + Indonesia + + + {meta['organization_email']} + + + + + + + + + + + + + + + + + + + + {keywords_xml} + + + + + + + + + + + + Penggunaan data harus mencantumkan sumber: {meta['organization_name']}. + + + + + + + + + + + + 25000 + + + + + + + + + + + + + + + + {extent['xmin']} + {extent['xmax']} + {extent['ymin']} + {extent['ymax']} + + + + + + + + + + true + + + + + + {meta['dataset_title']} + + + + + {meta['created_at'].isoformat()}+07:00 + + + + + + + + {meta['date_created'].year} + + + + + + + + + + + + + {geoserver_links["wms_url"]} + + + DB:POSTGIS + + + {meta["dataset_title"]} + + + {meta["dataset_title"]} + + + + + + + {geoserver_links["wms_url"]} + + + WWW:LINK-1.0-http--link + + + {meta["dataset_title"]} + + + {meta["dataset_title"]} + + + + + + + {geoserver_links["wms_url"]} + + + OGC:WMS + + + {meta["dataset_title"]} + + + + + + + + {geoserver_links["wfs_url"]} + + + OGC:WFS + + + {meta["dataset_title"]} + + + + + + + + + + + + + + + + + + + + Data dihasilkan dari digitasi peta dasar skala 1:25000 menggunakan QGIS. + + + + + + +""" + + +# Geonetwork version 4.4.9.0 +def upload_metadata_to_geonetwork(xml_metadata: str): + # session = requests.Session() + # session.auth = (GEONETWORK_USER, GEONETWORK_PASS) + + # # 1. Get XSRF token + # try: + # info_url = f"{GEONETWORK_URL}/srv/eng/info?type=me" + # session.get(info_url) + # except requests.exceptions.RequestException as e: + # raise HTTPException(status_code=503, detail=f"Failed to connect to GeoNetwork: {e}") + + # xsrf_token = session.cookies.get('XSRF-TOKEN') + # if not xsrf_token: + # raise HTTPException(status_code=500, detail="Could not retrieve XSRF-TOKEN from GeoNetwork.") + + session, xsrf_token = create_gn_session() + headers = { + 'X-XSRF-TOKEN': xsrf_token, + 'Accept': 'application/json' + } + + GN_API_RECORDS_URL = f"{GEONETWORK_URL}/srv/api/records" + + # 2. GeoNetwork requires a multipart/form-data upload + files = { + 'file': ('metadata.xml', xml_metadata, 'application/xml') + } + + params = { + "ownerGroup": 1, # all + "ownerUser": 1 # admin + } + + response = session.post( + GN_API_RECORDS_URL, + params=params, + files=files, + headers=headers, + cookies=session.cookies.get_dict() + ) + + metadata_infos = response.json().get("metadataInfos", {}) + uuid = None + for records in metadata_infos.values(): + if records and isinstance(records, list): + uuid = records[0].get("uuid") + break + if not uuid: + raise ValueError("UUID not found in GeoNetwork response") + + publish_record(session, uuid) + + # print("response", response.json()) + return uuid + + + +def publish_metadata(table_name: str, geoserver_links: dict): + + extent = get_extent(table_name) + meta = get_author_metadata(table_name) + xml = generate_metadata_xml( + table_name=meta["dataset_title"], + meta=meta, + extent=extent, + geoserver_links=geoserver_links + ) + + xml_clean = fix_xml_urls(xml) + uuid = upload_metadata_to_geonetwork(xml_clean) + + print(f"[GeoNetwork] Metadata uploaded. UUID = {uuid}") + + return uuid + + + +def publish_record(session, uuid): + print('[uuid]', uuid) + xsrf_token = session.cookies.get('XSRF-TOKEN') + + headers = { + "X-XSRF-TOKEN": xsrf_token, + "Accept": "application/json", + "Content-Type": "application/json" + } + + url = f"{GEONETWORK_URL}/srv/api/records/{uuid}/sharing" + + payload = { + "clear": True, + "privileges": [ + { + "group": 1, + "operations": { + "view": True + } + } + ] + } + + response = session.put(url, json=payload, headers=headers) + response.raise_for_status() + + +# single stand func +# def publish_record(uuid): +# session, xsrf_token = create_gn_session() + +# headers = { +# "X-XSRF-TOKEN": xsrf_token, +# "Content-Type": "application/json" +# } + +# url = f"{GEONETWORK_URL}/srv/api/records/{uuid}/sharing" + +# payload = { +# "clear": True, +# "privileges": [ +# {"group": 1, "operations": {"view": True}} +# ] +# } + +# resp = session.put(url, json=payload, headers=headers) +# resp.raise_for_status() diff --git a/services/datasets/publish_geoserver.py b/services/datasets/publish_geoserver.py new file mode 100644 index 0000000..833d8f3 --- /dev/null +++ b/services/datasets/publish_geoserver.py @@ -0,0 +1,285 @@ +import requests +import json +import os +from core.config import GEOSERVER_URL, GEOSERVER_USER, GEOSERVER_PASS, GEOSERVER_WORKSPACE + +# DATASTORE = "postgis" #per OPD +DATASTORE = "server_lokal" +# SLD_DIR = "./styles" + +# BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +# SLD_DIR = os.path.join(BASE_DIR, "styles") + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +MAIN_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "..")) +SLD_DIR = os.path.join(MAIN_DIR, "style_temp") + + +def publish_layer_to_geoserver(table: str, job_id: str): + print(f"[GeoServer] Publish layer + upload SLD: {table}") + + # ========================== + # 1. Publish Feature Type + # ========================== + # ft_url = f"{GEOSERVER_URL}/rest/workspaces/{GEOSERVER_WORKSPACE}/datastores/{DATASTORE}/featuretypes" + ft_url = f"{GEOSERVER_URL}/rest/workspaces/{GEOSERVER_WORKSPACE}/datastores/{DATASTORE}/featuretypes?computeDefault=true" + + payload = { + "featureType": { + "name": table, + "nativeName": table, + "enabled": True + } + } + + requests.post( + ft_url, + auth=(GEOSERVER_USER, GEOSERVER_PASS), + headers={"Content-Type": "application/json"}, + data=json.dumps(payload) + ) + + print(f"[GeoServer] FeatureType published for: {table}") + + # ========================================== + # 2. Upload SLD file to GeoServer + # ========================================== + + sld_file = f"{SLD_DIR}/{job_id}.sld" + style_name = table # style name sama dengan table + + if not os.path.exists(sld_file): + print(f"[WARNING] SLD file tidak ditemukan: {sld_file}") + else: + print(f"[GeoServer] Upload SLD {sld_file}") + + #old + # style_url = f"{GEOSERVER_URL}/rest/styles" + + # with open(sld_file, "rb") as sld: + # requests.post( + # f"{style_url}?name={style_name}&workspace={GEOSERVER_WORKSPACE}", + # auth=(GEOSERVER_USER, GEOSERVER_PASS), + # headers={"Content-Type": "application/vnd.ogc.sld+xml"}, + # data=sld.read() + # ) + + # print(f"[GeoServer] SLD uploaded: {style_name}") + + + + #new + style_url = ( + f"{GEOSERVER_URL}/rest/workspaces/" + f"{GEOSERVER_WORKSPACE}/styles" + ) + + with open(sld_file, "r", encoding="utf-8") as f: + sld_content = f.read() + + # πŸ”₯ INI BARIS PENTINGNYA + sld_content = sld_content.lstrip("\ufeff \t\r\n") + + resp = requests.post( + f"{style_url}?name={style_name}", + auth=(GEOSERVER_USER, GEOSERVER_PASS), + headers={"Content-Type": "application/vnd.ogc.sld+xml"}, + data=sld_content.encode("utf-8") + ) + + + if resp.status_code not in (200, 201): + raise Exception( + f"Upload SLD gagal ({resp.status_code}): {resp.text}" + ) + + print(f"[GeoServer] SLD uploaded: {style_name}") + + + + + # ========================================== + # 3. Apply SLD to the layer + # ========================================== + + layer_url = f"{GEOSERVER_URL}/rest/layers/{GEOSERVER_WORKSPACE}:{table}" + + payload = { + "layer": { + "defaultStyle": { + "name": style_name, + "workspace": GEOSERVER_WORKSPACE + }, + "enabled": True + } + } + + requests.put( + layer_url, + auth=(GEOSERVER_USER, GEOSERVER_PASS), + headers={"Content-Type": "application/json"}, + data=json.dumps(payload) + ) + + print(f"[GeoServer] SLD applied as default style for {table}") + + # ========================================== + # 4. Delete SLD file from local folder + # ========================================== + + os.remove(sld_file) + print(f"[CLEANUP] SLD file removed: {sld_file}") + + # ============================================== + # 5. Reload GeoServer (optional but recommended) + # ============================================== + requests.post( + f"{GEOSERVER_URL}/rest/reload", + auth=(GEOSERVER_USER, GEOSERVER_PASS) + ) + + # ==================================================== + # 7. Generate GeoServer WMS/WFS link untuk GeoNetwork + # ==================================================== + + wms_link = ( + f"{GEOSERVER_URL}/{GEOSERVER_WORKSPACE}/wms?" + f"service=WMS&request=GetMap&layers={GEOSERVER_WORKSPACE}:{table}" + ) + wfs_link = ( + f"{GEOSERVER_URL}/{GEOSERVER_WORKSPACE}/wfs?" + f"service=WFS&request=GetFeature&typeName={GEOSERVER_WORKSPACE}:{table}" + ) + print(f"[GeoServer] WMS URL: {wms_link}") + print(f"[GeoServer] WFS URL: {wfs_link}") + print(f"[GeoServer] Reload completed. Layer {table} ready.") + return { + "table": table, + "style": style_name, + "wms_url": wms_link, + "wfs_url": wfs_link + } + + + + + +# use default style +# def publish_layer_to_geoserver(table: str): + +# print(f"[GeoServer] Publish layer: {table}") + +# # ========== 1. Publish Feature Type ========== +# ft_url = f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{DATASTORE}/featuretypes" + +# payload = { +# "featureType": { +# "name": table, +# "nativeName": table, +# "enabled": True +# } +# } + +# requests.post( +# ft_url, +# auth=(GEOSERVER_USER, GEOSERVER_PASS), +# headers={"Content-Type": "application/json"}, +# data=json.dumps(payload) +# ) + +# # =================================================== +# # 2. Tentukan SLD file (prioritas table.sld β†’ fallback default) +# # =================================================== +# table_sld = SLD_DIR / f"{table}.sld" +# default_sld = SLD_DIR / "default_style.sld" + +# if table_sld.exists(): +# chosen_sld = table_sld +# delete_after = True +# style_name = table # pakai nama style sama dengan layer +# print(f"[SLD] Menggunakan SLD khusus: {chosen_sld}") +# else: +# chosen_sld = default_sld +# delete_after = False +# style_name = "default_style" +# print(f"[SLD] Menggunakan default SLD: {chosen_sld}") + +# # ========================================== +# # 3. Upload SLD +# # ========================================== +# style_url = f"{GEOSERVER_URL}/rest/styles" + +# with open(chosen_sld, "rb") as sld: +# requests.post( +# f"{style_url}?name={style_name}&workspace={WORKSPACE}", +# auth=(GEOSERVER_USER, GEOSERVER_PASS), +# headers={"Content-Type": "application/vnd.ogc.sld+xml"}, +# data=sld.read() +# ) + +# print(f"[GeoServer] SLD uploaded: {style_name}") + +# # ========================================== +# # 4. Apply SLD ke layer +# # ========================================== +# layer_url = f"{GEOSERVER_URL}/rest/layers/{WORKSPACE}:{table}" + +# payload = { +# "layer": { +# "defaultStyle": { +# "name": style_name, +# "workspace": WORKSPACE +# }, +# "enabled": True +# } +# } + +# requests.put( +# layer_url, +# auth=(GEOSERVER_USER, GEOSERVER_PASS), +# headers={"Content-Type": "application/json"}, +# data=json.dumps(payload) +# ) + +# print(f"[GeoServer] Style '{style_name}' applied to layer '{table}'") + +# # ========================================== +# # 5. Delete table.sld jika ada +# # ========================================== +# if delete_after: +# table_sld.unlink() +# print(f"[CLEANUP] File SLD '{table}.sld' dihapus") + +# # ==================================================== +# # 6. Reload GeoServer (opsional tapi aman) +# # ==================================================== +# requests.post( +# f"{GEOSERVER_URL}/rest/reload", +# auth=(GEOSERVER_USER, GEOSERVER_PASS) +# ) + +# # ==================================================== +# # 7. Generate GeoServer WMS/WFS link untuk GeoNetwork +# # ==================================================== + +# wms_link = ( +# f"{GEOSERVER_URL}/{WORKSPACE}/wms?" +# f"service=WMS&request=GetMap&layers={WORKSPACE}:{table}" +# ) + +# wfs_link = ( +# f"{GEOSERVER_URL}/{WORKSPACE}/wfs?" +# f"service=WFS&request=GetFeature&typeName={WORKSPACE}:{table}" +# ) + +# print(f"[GeoServer] WMS URL: {wms_link}") +# print(f"[GeoServer] WFS URL: {wfs_link}") + +# return { +# "table": table, +# "style": style_name, +# "wms_url": wms_link, +# "wfs_url": wfs_link +# } + + diff --git a/services/upload_file/.DS_Store b/services/upload_file/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 Dict[str, Any]: + headers = { + "Content-Type": "application/json", + "API_KEY": "testsatupeta" + } + + try: + response = requests.post( + f"{URL}", + json=payload, + headers=headers, + ) + + # response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + return { + "success": False, + "error": str(e) + } + + +if __name__ == "__main__": + # Contoh payload + payload = { + "nama_file_peta": "peta bencana.pdf", + "nama_opd": "Badan Penanggulangan Bencana Daerah (BPBD)", + "tipe_data_spasial": "Multipolygon", + "struktur_atribut_data": {}, + "metadata": { + "judul": "", + "abstrak": "", + "tujuan": "", + "keyword": [], + "kategori": [], + "kategori_mapset": "" + } + } + + result = send_metadata(payload) + print(result) diff --git a/services/upload_file/readers/reader_csv.py b/services/upload_file/readers/reader_csv.py new file mode 100644 index 0000000..b958ee9 --- /dev/null +++ b/services/upload_file/readers/reader_csv.py @@ -0,0 +1,116 @@ +import pandas as pd +import re +import csv +import os + +def detect_header_line(path, max_rows=10): + with open(path, 'r', encoding='utf-8', errors='ignore') as f: + lines = [next(f) for _ in range(max_rows)] + header_line_idx = 0 + best_score = -1 + for i, line in enumerate(lines): + cells = re.split(r'[;,|\t]', line.strip()) + alpha_ratio = sum(bool(re.search(r'[A-Za-z]', c)) for c in cells) / max(len(cells), 1) + digit_ratio = sum(bool(re.search(r'\d', c)) for c in cells) / max(len(cells), 1) + score = alpha_ratio - digit_ratio + if score > best_score: + best_score = score + header_line_idx = i + return header_line_idx + +def detect_delimiter(path, sample_size=2048): + with open(path, 'r', encoding='utf-8', errors='ignore') as f: + sample = f.read(sample_size) + sniffer = csv.Sniffer() + try: + dialect = sniffer.sniff(sample) + return dialect.delimiter + except Exception: + for delim in [',', ';', '\t', '|']: + if delim in sample: + return delim + return ',' + + +def read_csv(path: str, sheet: str = None): + ext = os.path.splitext(path)[1].lower() + + try: + if ext in ['.csv']: + header_line = detect_header_line(path) + delimiter = detect_delimiter(path) + print(f"[INFO] Detected header line: {header_line + 1}, delimiter: '{delimiter}'") + + df = pd.read_csv( + path, + header=header_line, + sep=delimiter, + encoding='utf-8', + low_memory=False, + thousands=',' + ) + + elif ext in ['.xlsx', '.xls']: + print(f"[INFO] Membaca file Excel: {os.path.basename(path)}") + xls = pd.ExcelFile(path) + print(f"[INFO] Ditemukan {len(xls.sheet_names)} sheet: {xls.sheet_names}") + + if sheet: + if sheet not in xls.sheet_names: + raise ValueError(f"Sheet '{sheet}' tidak ditemukan dalam file {os.path.basename(path)}") + print(f"[INFO] Membaca sheet yang ditentukan: '{sheet}'") + df = pd.read_excel(xls, sheet_name=sheet, header=0, dtype=str) + df = df.dropna(how='all').dropna(axis=1, how='all') + + else: + print("[INFO] Tidak ada sheet yang ditentukan, mencari sheet paling relevan...") + best_sheet = None + best_score = -1 + best_df = None + + for sheet_name in xls.sheet_names: + try: + temp_df = pd.read_excel(xls, sheet_name=sheet_name, header=0, dtype=str) + temp_df = temp_df.dropna(how='all').dropna(axis=1, how='all') + + if len(temp_df) == 0 or len(temp_df.columns) < 2: + continue + + # hitung skor relevansi + text_ratio = temp_df.applymap(lambda x: isinstance(x, str)).sum().sum() / (temp_df.size or 1) + row_score = len(temp_df) + score = (row_score * 0.7) + (text_ratio * 100) + + if score > best_score: + best_score = score + best_sheet = sheet_name + best_df = temp_df + + except Exception as e: + print(f"[WARN] Gagal membaca sheet {sheet_name}: {e}") + continue + + if best_df is not None: + print(f"[INFO] Sheet terpilih: '{best_sheet}' dengan skor {best_score:.2f}") + df = best_df + else: + raise ValueError("Tidak ada sheet valid yang dapat dibaca.") + + for col in df.columns: + if df[col].astype(str).str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any(): + df[col] = df[col].astype(str).str.replace(',', '', regex=False) + df[col] = pd.to_numeric(df[col], errors='ignore') + + else: + raise ValueError("Format file tidak dikenali (hanya .csv, .xlsx, .xls)") + + except Exception as e: + print(f"[WARN] Gagal membaca file ({e}), fallback ke default reader.") + df = pd.read_csv(path, encoding='utf-8', low_memory=False, thousands=',') + + df = df.loc[:, ~df.columns.astype(str).str.contains('^Unnamed')] + df.columns = [str(c).strip() for c in df.columns] + df = df.dropna(how='all') + + return df + diff --git a/services/upload_file/readers/reader_gdb.py b/services/upload_file/readers/reader_gdb.py new file mode 100644 index 0000000..843f2d5 --- /dev/null +++ b/services/upload_file/readers/reader_gdb.py @@ -0,0 +1,75 @@ +import geopandas as gpd +import fiona +import zipfile +import tempfile +import os +import shutil + +def read_gdb(zip_path: str, layer: str = None): + if not zip_path.lower().endswith(".zip"): + raise ValueError("File GDB harus berupa ZIP yang berisi folder .gdb atau file .gdbtable") + + tmpdir = tempfile.mkdtemp() + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(tmpdir) + + macosx_path = os.path.join(tmpdir, "__MACOSX") + if os.path.exists(macosx_path): + shutil.rmtree(macosx_path) + + gdb_folders = [] + for root, dirs, _ in os.walk(tmpdir): + for d in dirs: + if d.lower().endswith(".gdb"): + gdb_folders.append(os.path.join(root, d)) + + if not gdb_folders: + gdbtable_files = [] + for root, _, files in os.walk(tmpdir): + for f in files: + if f.lower().endswith(".gdbtable"): + gdbtable_files.append(os.path.join(root, f)) + + if gdbtable_files: + first_folder = os.path.dirname(gdbtable_files[0]) + base_name = os.path.basename(first_folder) + gdb_folder_path = os.path.join(tmpdir, f"{base_name}.gdb") + + os.makedirs(gdb_folder_path, exist_ok=True) + + for fpath in os.listdir(first_folder): + if ".gdb" in fpath.lower(): + shutil.move(os.path.join(first_folder, fpath), os.path.join(gdb_folder_path, fpath)) + + gdb_folders.append(gdb_folder_path) + # print(f"[INFO] Rebuilt GDB folder from nested structure: {gdb_folder_path}") + else: + # print("[DEBUG] Isi ZIP:", os.listdir(tmpdir)) + shutil.rmtree(tmpdir) + raise ValueError("Tidak ditemukan folder .gdb atau file .gdbtable di dalam ZIP") + + gdb_path = gdb_folders[0] + + layers = fiona.listlayers(gdb_path) + # print(f"[INFO] Layer tersedia: {layers}") + + chosen_layer = layer or (layers[0] if layers else None) + if not chosen_layer: + shutil.rmtree(tmpdir) + raise ValueError("Tidak ada layer GDB yang bisa dibaca.") + + print(f"[DEBUG] Membaca layer: {chosen_layer}") + + try: + gdf = gpd.read_file(gdb_path, layer=chosen_layer) + except Exception as e: + shutil.rmtree(tmpdir) + raise ValueError(f"Gagal membaca layer dari GDB: {e}") + + if gdf.crs is None: + # print("[WARN] CRS tidak terdeteksi, diasumsikan EPSG:4326") + gdf.set_crs("EPSG:4326", inplace=True) + + + shutil.rmtree(tmpdir) + return gdf diff --git a/services/upload_file/readers/reader_mpk.py b/services/upload_file/readers/reader_mpk.py new file mode 100644 index 0000000..a466e58 --- /dev/null +++ b/services/upload_file/readers/reader_mpk.py @@ -0,0 +1,72 @@ +import os +import tempfile +import json +from io import BytesIO +import geopandas as gpd +from py7zr import SevenZipFile +import pyogrio + + +def find_data_source(extract_dir: str): + """ + Cari data sumber (.gdb atau .shp) di dalam folder hasil ekstrak. + """ + for root, dirs, _ in os.walk(extract_dir): + for d in dirs: + if d.lower().endswith(".gdb"): + return os.path.join(root, d) + + for root, _, files in os.walk(extract_dir): + for f in files: + if f.lower().endswith(".shp"): + return os.path.join(root, f) + + raise ValueError("Tidak ditemukan data source yang didukung (.gdb atau .shp).") + + +def get_main_layer(gdb_path: str): + """ + Ambil nama layer utama dari geodatabase (.gdb). + """ + try: + layers = pyogrio.list_layers(gdb_path) + for layer in layers: + if not layer[0].lower().endswith("__attach"): + return layer[0] + if layers: + return layers[0][0] + raise ValueError(f"Tidak ada layer utama yang valid di {gdb_path}") + except Exception as e: + raise ValueError(f"Gagal membaca daftar layer GDB: {e}") + + +def read_mpk(path: str): + mpk_bytes = None + with open(path, "rb") as f: + mpk_bytes = f.read() + + if not mpk_bytes: + raise ValueError("File MPK kosong atau tidak valid.") + + with tempfile.TemporaryDirectory() as tempdir: + try: + with SevenZipFile(BytesIO(mpk_bytes), mode="r") as z: + z.extractall(path=tempdir) + except Exception as e: + raise ValueError(f"File MPK rusak atau tidak valid: {e}") + + src_path = find_data_source(tempdir) + + if src_path.lower().endswith(".gdb"): + layer_name = get_main_layer(src_path) + gdf = gpd.read_file(src_path, layer=layer_name) + else: + gdf = gpd.read_file(src_path) + + if gdf.crs is None: + raise ValueError("CRS tidak terdeteksi. Pastikan file memiliki informasi proyeksi (.prj).") + + gdf = gdf.to_crs(epsg=4326) + + print(f"[INFO] Berhasil membaca {len(gdf)} fitur") + return gdf \ No newline at end of file diff --git a/services/upload_file/readers/reader_pdf.py b/services/upload_file/readers/reader_pdf.py new file mode 100644 index 0000000..03c4df9 --- /dev/null +++ b/services/upload_file/readers/reader_pdf.py @@ -0,0 +1,288 @@ +import re +import pdfplumber +import pandas as pd +from services.upload_file.utils.pdf_cleaner import get_number_column_index, get_start_end_number, normalize_number_column, row_ratio, has_mixed_text_and_numbers, is_short_text_row, parse_page_selection, filter_geo_admin_column, cleaning_column +from services.upload_file.upload_exceptions import PDFReadError +from utils.logger_config import setup_logger + +logger = setup_logger(__name__) + +def detect_header_rows(rows): + if not rows: + return [] + + ratios = [row_ratio(r) for r in rows] + body_start_index = None + + for i in range(1, len(rows)): + row = rows[i] + if has_mixed_text_and_numbers(row): + body_start_index = i + break + if ratios[i] > 0.3: + body_start_index = i + break + if any(isinstance(c, str) and re.match(r'^\d+$', c.strip()) for c in row): + body_start_index = i + break + if ratios[i - 1] == 0 and ratios[i] > 0: + body_start_index = i + break + + if body_start_index is None: + body_start_index = len(rows) + + potential_headers = rows[:body_start_index] + body_filtered = rows[body_start_index:] + header_filtered = [] + for idx, row in enumerate(potential_headers): + if is_short_text_row(row): + if idx + 1 < len(potential_headers) and ratios[idx + 1] == 0: + header_filtered.append(row) + else: + continue + else: + header_filtered.append(row) + + return header_filtered, body_filtered + + +def merge_multiline_header(header_rows): + final_header = [] + for col in zip(*header_rows): + val = next((v for v in reversed(col) if v and str(v).strip()), '') + val = str(val).replace('\n', ' ').strip() + final_header.append(val) + final_header = [v for v in final_header if v not in ['', None]] + return final_header + +def merge_parsed_table(tables): + roots = [] + fragments = [] + + # STEP 1: klasifikasi + for table in tables: + num_idx = get_number_column_index(table["columns"]) + if num_idx is None: + roots.append(table) + continue + + start_no, _ = get_start_end_number(table["rows"], num_idx) + if start_no == 1: + roots.append(table) + else: + fragments.append(table) + + # STEP 2: merge fragment ke root + for frag in fragments: + frag_idx = get_number_column_index(frag["columns"]) + f_start, _ = get_start_end_number(frag["rows"], frag_idx) + + for root in roots: + if root["columns"] != frag["columns"]: + continue + + root_idx = get_number_column_index(root["columns"]) + _, r_end = get_start_end_number(root["rows"], root_idx) + + if f_start == r_end + 1: + root["rows"].extend(frag["rows"]) + break # fragment hanya boleh nempel ke 1 root + + return roots + + +def read_pdf(path: str, page: str): + """ + Membaca tabel dari file PDF secara semi-otomatis menggunakan `pdfplumber`. + + Alur utama proses: + 1. **Buka file PDF** menggunakan pdfplumber. + 2. **Pilih halaman** berdasarkan input `page` (misalnya "1,3-5" untuk halaman 1 dan 3–5). + 3. **Deteksi tabel** di setiap halaman yang dipilih. + 4. **Ekstraksi tabel mentah** (list of list) dari setiap halaman. + 5. **Pisahkan baris header dan body** dengan fungsi `detect_header_rows()`. + 6. **Gabungkan header multi-baris** (misalnya tabel dengan dua baris judul kolom). + 7. **Bersihkan body tabel** menggunakan `cleaning_column()`: + - Menghapus kolom nomor urut. + - Menyesuaikan jumlah kolom dengan header. + 8. **Gabungkan hasil akhir** ke dalam format JSON dengan struktur: + { + "title": , + "columns": [...], + "rows": [...] + } + 9. **Filter tambahan** dengan `filter_geo_admin_column()` (khusus metadata geospasial). + 10. **Kembalikan hasil** berupa list JSON siap dikirim ke frontend API. + + Args: + path (str): Lokasi file PDF yang akan dibaca. + page (str): Nomor halaman atau rentang halaman, contoh: "1", "2-4", "1,3-5". + + Returns: + list[dict]: Daftar tabel hasil ekstraksi dengan struktur kolom dan baris. + + Raises: + PDFReadError: Jika terjadi kesalahan saat membaca atau parsing PDF. + """ + # try: + # pdf_path = path + # selectedPage = page if page else "1" + # tables_data = [] + + # with pdfplumber.open(pdf_path) as pdf: + # total_pages = len(pdf.pages) + # selected_pages = parse_page_selection(selectedPage, total_pages) + + # logger.info(f"[INFO] Total Halaman PDF: {total_pages}") + # logger.info(f"[INFO] Total Halaman yang dipilih: {len(selected_pages)}") + # logger.info(f"[INFO] Halaman yang dipilih untuk dibaca: {selected_pages}") + + # for page_num in selected_pages: + # pdf_page = pdf.pages[page_num - 1] + # tables = pdf_page.find_tables() + # logger.info(f"\n\n[INFO] Halaman {page_num}: {len(tables)} tabel terdeteksi") + + # # pembacaan title ini tidak valid untuk halaman lanscape + # # for line in pdf_page.extract_text_lines(): + # # if line['top'] > tables[0].bbox[1]: + # # break + # # previous_line = line + # # print('[TITLE]', previous_line['text']) + + # for i, t in enumerate(tables, start=1): + # table = t.extract() + # if len(table) > 2: + # print(f"[TBL] tabel : {i} - halaman {page_num}") + # tables_data.append(table) + + # logger.info(f"\nTotal tabel terbaca: {len(tables_data)}\n") + + # header_only, body_only = [], [] + # for tbl in tables_data: + # head, body = detect_header_rows(tbl) + # header_only.append(head) + # body_only.append(body) + + # clean_header = [merge_multiline_header(h) for h in header_only] + # clean_body = [] + + # for i, raw_body in enumerate(body_only): + # con_body = [[cell for cell in row if cell not in (None, '')] for row in raw_body] + # cleaned = cleaning_column(clean_header[i], [con_body]) + # clean_body.append(cleaned[0]) + + # parsed = [] + # for i, (cols, rows) in enumerate(zip(clean_header, clean_body), start=1): + # parsed.append({ + # "title": str(i), + # "columns": cols, + # "rows": rows + # }) + + # # ================================================================= + + # clean_parsed = filter_geo_admin_column(parsed) + # merge_parsed = merge_parsed_table(clean_parsed) + + # logger.info(f"\nTotal tabel valid: {len(merge_parsed)}\n") + + # ordered_tables = [normalize_number_column(t) for t in merge_parsed] + # return ordered_tables + + # except Exception as e: + # raise PDFReadError(f"Gagal membaca PDF: {e}", code=422) + + try: + pdf_path = path + selectedPage = page if page else "1" + tables_data = [] + + with pdfplumber.open(pdf_path) as pdf: + total_pages = len(pdf.pages) + selected_pages = parse_page_selection(selectedPage, total_pages) + + logger.info(f"[INFO] Total Halaman PDF: {total_pages}") + logger.info(f"[INFO] Total Halaman yang dipilih: {len(selected_pages)}") + logger.info(f"[INFO] Halaman yang dipilih untuk dibaca: {selected_pages}") + + for page_num in selected_pages: + pdf_page = pdf.pages[page_num - 1] + tables = pdf_page.find_tables() + logger.info(f"\n\n[INFO] Halaman {page_num}: {len(tables)} tabel terdeteksi") + + # pembacaan title ini tidak valid untuk halaman lanscape + # for line in pdf_page.extract_text_lines(): + # if line['top'] > tables[0].bbox[1]: + # break + # previous_line = line + # print('[TITLE]', previous_line['text']) + + for i, t in enumerate(tables, start=1): + table = t.extract() + if len(table) > 2: + print(f"[TBL] tabel : {i} - halaman {page_num}") + tables_data.append({"page": f"halaman {page_num} - {i}", "table": table}) + + logger.info(f"\nTotal tabel terbaca: {len(tables_data)}\n") + + header_only, body_only, page_info = [], [], [] + for tbl in tables_data: + head, body = detect_header_rows(tbl["table"]) + header_only.append(head) + body_only.append(body) + page_info.append(tbl["page"]) + + clean_header = [merge_multiline_header(h) for h in header_only] + clean_body = [] + + for i, raw_body in enumerate(body_only): + con_body = [[cell for cell in row if cell not in (None, '')] for row in raw_body] + cleaned = cleaning_column(clean_header[i], [con_body]) + clean_body.append(cleaned[0]) + + parsed = [] + for i, (cols, rows, page) in enumerate(zip(clean_header, clean_body, page_info), start=1): + parsed.append({ + "title": page, + "columns": cols, + "rows": rows + }) + + # ================================================================= + + clean_parsed = filter_geo_admin_column(parsed) + merge_parsed = merge_parsed_table(clean_parsed) + + logger.info(f"\nTotal tabel valid: {len(merge_parsed)}\n") + + ordered_tables = [normalize_number_column(t) for t in merge_parsed] + return ordered_tables + + except Exception as e: + raise PDFReadError(f"Gagal membaca PDF: {e}", code=422) + + +def convert_df(payload): + try: + if "columns" not in payload or "rows" not in payload: + raise ValueError("Payload tidak memiliki key 'columns' atau 'rows'.") + + if not isinstance(payload["columns"], list): + raise TypeError("'columns' harus berupa list.") + if not isinstance(payload["rows"], list): + raise TypeError("'rows' harus berupa list.") + + for i, row in enumerate(payload["rows"]): + if len(row) != len(payload["columns"]): + raise ValueError(f"Jumlah elemen di baris ke-{i} tidak sesuai jumlah kolom.") + + df = pd.DataFrame(payload["rows"], columns=payload["columns"]) + + if "title" in payload: + df.attrs["title"] = payload["title"] + + return df + + except Exception as e: + raise PDFReadError(f"Gagal konversi payload ke DataFrame: {e}", code=400) diff --git a/services/upload_file/readers/reader_shp.py b/services/upload_file/readers/reader_shp.py new file mode 100644 index 0000000..ff4bb4e --- /dev/null +++ b/services/upload_file/readers/reader_shp.py @@ -0,0 +1,60 @@ +import geopandas as gpd +import fiona +import zipfile +import tempfile +import os +import shutil +from shapely.geometry import shape + +def read_shp(path: str): + if not path: + raise ValueError("Path shapefile tidak boleh kosong.") + + tmpdir = None + shp_path = None + + if path.lower().endswith(".zip"): + tmpdir = tempfile.mkdtemp() + with zipfile.ZipFile(path, "r") as zip_ref: + zip_ref.extractall(tmpdir) + + shp_files = [] + for root, _, files in os.walk(tmpdir): + for f in files: + if f.lower().endswith(".shp"): + shp_files.append(os.path.join(root, f)) + + if not shp_files: + raise ValueError("Tidak ditemukan file .shp di dalam ZIP.") + shp_path = shp_files[0] + print(f"[DEBUG] Membaca shapefile: {os.path.basename(shp_path)}") + + else: + shp_path = path + + try: + gdf = gpd.read_file(shp_path) + except Exception as e: + raise ValueError(f"Gagal membaca shapefile: {e}") + + if "geometry" not in gdf.columns or gdf.geometry.is_empty.all(): + print("[WARN] Geometry kosong. Mencoba membangun ulang dari fitur mentah...") + + with fiona.open(shp_path) as src: + features = [] + for feat in src: + geom = shape(feat["geometry"]) if feat["geometry"] else None + props = feat["properties"] + props["geometry"] = geom + features.append(props) + + gdf = gpd.GeoDataFrame(features, geometry="geometry", crs=src.crs) + + if gdf.crs is None: + # print("[WARN] CRS tidak terdeteksi. Diasumsikan EPSG:4326") + gdf.set_crs("EPSG:4326", inplace=True) + + if tmpdir and os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + + return gdf diff --git a/services/upload_file/upload.py b/services/upload_file/upload.py new file mode 100644 index 0000000..1729425 --- /dev/null +++ b/services/upload_file/upload.py @@ -0,0 +1,941 @@ +import json +import os +import pandas as pd +import geopandas as gpd +import numpy as np +import re +import zipfile +import tempfile +import asyncio +from pyproj import CRS +from shapely.geometry.base import BaseGeometry +from shapely.geometry import base as shapely_base +from fastapi import Depends, File, Form, UploadFile, HTTPException +from api.routers.datasets_router import cleansing_data, publish_layer, query_cleansing_data +from core.config import UPLOAD_FOLDER, MAX_FILE_MB, VALID_WKT_PREFIXES +from services.upload_file.ai_generate import send_metadata +from services.upload_file.readers.reader_csv import read_csv +from services.upload_file.readers.reader_shp import read_shp +from services.upload_file.readers.reader_gdb import read_gdb +from services.upload_file.readers.reader_mpk import read_mpk +from services.upload_file.readers.reader_pdf import convert_df, read_pdf +from services.upload_file.utils.geometry_detector import detect_and_build_geometry, attach_polygon_geometry_auto +from services.upload_file.upload_ws import report_progress +from database.connection import engine, sync_engine +from database.models import Base +from pydantic import BaseModel +from typing import Any, Dict, List, Optional +from shapely import MultiLineString, MultiPolygon, wkt +from sqlalchemy import text +from datetime import datetime +from response import successRes, errorRes +from utils.logger_config import log_activity +# Base.metadata.create_all(bind=engine) + + +def is_geom_empty(g): + if g is None: + return True + if isinstance(g, float) and pd.isna(g): + return True + if isinstance(g, BaseGeometry): + return g.is_empty + return False + + +def safe_json(value): + """Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable""" + if isinstance(value, (np.int64, np.int32)): + return int(value) + if isinstance(value, (np.float64, np.float32)): + return float(value) + if isinstance(value, pd.Timestamp): + return value.isoformat() + if isinstance(value, shapely_base.BaseGeometry): + return str(value) # convert to WKT string + if pd.isna(value): + return None + return value + + +def detect_zip_type(zip_path: str) -> str: + with zipfile.ZipFile(zip_path, "r") as zip_ref: + files = zip_ref.namelist() + + if any(f.lower().endswith(".gdb/") or ".gdb/" in f.lower() for f in files): + return "gdb" + + if any(f.lower().endswith(ext) for ext in [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files): + return "gdb" + + if any(f.lower().endswith(".shp") for f in files): + return "shp" + + return "unknown" + +# def detect_zip_type(zip_path: str) -> str: +# with zipfile.ZipFile(zip_path, "r") as zip_ref: +# files = zip_ref.namelist() + +# # ------------------------------------------------------------- +# # 1) DETECT FileGDB +# # ------------------------------------------------------------- +# is_gdb = ( +# any(".gdb/" in f.lower() for f in files) +# or any(f.lower().endswith(ext) for ext in +# [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files) +# ) + +# if is_gdb: +# print("\n[INFO] ZIP terdeteksi berisi FileGDB.") + +# with tempfile.TemporaryDirectory() as temp_dir: +# # extract ZIP +# with zipfile.ZipFile(zip_path, "r") as zip_ref: +# zip_ref.extractall(temp_dir) + +# # find folder *.gdb +# gdb_path = None +# for root, dirs, _ in os.walk(temp_dir): +# for d in dirs: +# if d.lower().endswith(".gdb"): +# gdb_path = os.path.join(root, d) +# break + +# if not gdb_path: +# print("[ERROR] Folder .gdb tidak ditemukan.") +# return "gdb" + +# print(f"[INFO] GDB Path: {gdb_path}") + +# # Cari seluruh file .gdbtable +# table_files = [ +# os.path.join(gdb_path, f) +# for f in os.listdir(gdb_path) +# if f.lower().endswith(".gdbtable") +# ] + +# if not table_files: +# print("[ERROR] Tidak ada file .gdbtable ditemukan.") +# return "gdb" + +# # Scan semua table file untuk mencari SpatialReference +# found_crs = False + +# for table_file in table_files: +# try: +# with open(table_file, "rb") as f: +# raw = f.read(15000) # baca awal file, cukup untuk header JSON + +# text = raw.decode("utf-8", errors="ignore") + +# start = text.find("{") +# end = text.rfind("}") + 1 + +# if start == -1 or end == -1: +# continue + +# json_str = text[start:end] +# meta = json.loads(json_str) + +# spatial_ref = meta.get("SpatialReference") +# if not spatial_ref: +# continue + +# wkt = spatial_ref.get("WKT") +# if not wkt: +# continue + +# print(f"[FOUND] CRS metadata pada: {os.path.basename(table_file)}") +# print(f"[CRS WKT] {wkt[:200]}...") + +# # Convert to EPSG +# try: +# epsg = CRS.from_wkt(wkt).to_epsg() +# print(f"[EPSG] {epsg}") +# except: +# print("[EPSG] Tidak ditemukan EPSG.") + +# found_crs = True +# break + +# except Exception: +# continue + +# if not found_crs: +# print("[WARNING] Tidak ditemukan CRS di file .gdbtable manapun.") + +# return "gdb" + +# # ----------------------------------------------------- +# # 2. DETEKSI SHP +# # ----------------------------------------------------- +# if any(f.lower().endswith(".shp") for f in files): +# print("\n[INFO] ZIP terdeteksi berisi SHP.") + +# # cari file .prj +# prj_files = [f for f in files if f.lower().endswith(".prj")] + +# if not prj_files: +# print("[WARNING] Tidak ada file .prj β†’ CRS tidak diketahui.") +# return "shp" + +# with zipfile.ZipFile(zip_path, "r") as zip_ref: +# with tempfile.TemporaryDirectory() as temp_dir: +# prj_path = os.path.join(temp_dir, os.path.basename(prj_files[0])) +# zip_ref.extract(prj_files[0], temp_dir) + +# # baca isi prj +# with open(prj_path, "r") as f: +# prj_text = f.read() + +# try: +# crs = CRS.from_wkt(prj_text) +# print(f"[CRS WKT] {crs.to_wkt()[:200]}...") + +# epsg = crs.to_epsg() +# if epsg: +# print(f"[EPSG] {epsg}") +# else: +# print("[EPSG] Tidak ditemukan dalam database EPSG.") + +# except Exception as e: +# print("[ERROR] Gagal membaca CRS dari file PRJ:", e) + +# return "shp" + +# # ----------------------------------------------------- +# # 3. UNKNOWN +# # ----------------------------------------------------- +# return "unknown" + + +def process_data(df: pd.DataFrame, ext: str, filename: str, fileDesc: str): + result = detect_and_build_geometry(df, master_polygons=None) + + if not hasattr(result, "geometry") or result.geometry.isna().all(): + result = attach_polygon_geometry_auto(result) + + # if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: + # geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \ + # if not result.empty else "None" + + # null_geom = result.geometry.isna().sum() + + def normalize_geom_type(geom_type): + if geom_type.startswith("Multi"): + return geom_type.replace("Multi", "") + return geom_type + + if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: + geom_types = ( + result.geometry + .dropna() + .geom_type + .apply(normalize_geom_type) + .unique() + ) + + geom_type = geom_types[0] if len(geom_types) > 0 else "None" + null_geom = result.geometry.isna().sum() + + print(f"[INFO] Tipe Geometry: {geom_type}") + print(f"[INFO] Jumlah geometry kosong: {null_geom}") + else: + res = { + "message": "Tidak menemukan tabel yang relevan.", + "file_type": ext, + "rows": 0, + "columns": 0, + "geometry_valid": 0, + "geometry_empty": 0, + "geometry_valid_percent": 0, + "warnings": [], + "warning_examples": [], + "preview": [] + } + + return errorRes(message="Tidak berhasil mencocokan geometry pada tabel." ,details=res, status_code=422) + + result = result.replace([pd.NA, float('inf'), float('-inf')], None) + if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns: + result['geometry'] = result['geometry'].apply( + lambda g: g.wkt if g is not None else None + ) + + empty_count = result['geometry'].apply(is_geom_empty).sum() + valid_count = len(result) - empty_count + match_percentage = (valid_count / len(result)) * 100 + + warnings = [] + if empty_count > 0: + warnings.append( + f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid " + f"({100 - match_percentage:.2f}% data gagal cocok)." + ) + + if empty_count > 0: + examples = result[result['geometry'].apply(is_geom_empty)].head(500) + warning_examples = examples.to_dict(orient="records") + else: + warning_examples = [] + + # preview_data = result.head(15).to_dict(orient="records") + preview_data = result.to_dict(orient="records") + + preview_safe = [ + {k: safe_json(v) for k, v in row.items()} for row in preview_data + ] + + warning_safe = [ + {k: safe_json(v) for k, v in row.items()} for row in warning_examples + ] + + ai_context = { + "nama_file_peta": filename, + "nama_opd": "Badan Penanggulangan Bencana Daerah (BPBD) Provinsi Jatim", + "tipe_data_spasial": geom_type, + "deskripsi_singkat": fileDesc, + "struktur_atribut_data": {}, + # "metadata": { + # "judul": "", + # "abstrak": "", + # "tujuan": "", + # "keyword": [], + # "kategori": [], + # "kategori_mapset": "" + # } + } + # ai_suggest = send_metadata(ai_context) + ai_suggest = {'judul': 'Peta Risiko Letusan Gunung Arjuna di Provinsi Jawa Timur', 'abstrak': 'Peta ini menggambarkan wilayah berisiko letusan Gunung Arjuna yang berada di Provinsi Jawa Timur. Data disajikan dalam bentuk poligon yang menunjukkan zona risiko berdasarkan analisis potensi aktivitas vulkanik.', 'tujuan': 'Data dapat digunakan untuk perencanaan mitigasi bencana dan pengambilan keputusan di wilayah Jawa Timur.', 'keyword': ['Risiko letusan', 'Gunung Arjuna', 'Bencana alam', 'Provinsi Jawa Timur', 'Geologi'], 'kategori': ['Geoscientific information', 'Environment'], 'kategori_mapset': 'Lingkungan Hidup'} + # print(ai_suggest) + + response = { + "message": "File berhasil dibaca dan dianalisis.", + "file_name": filename, + "file_type": ext, + "rows": int(len(result)), + "columns": list(map(str, result.columns)), + "geometry_valid": int(valid_count), + "geometry_empty": int(empty_count), + "geometry_valid_percent": float(round(match_percentage, 2)), + "geometry_type": geom_type, + "warnings": warnings, + "warning_rows": warning_safe, + "preview": preview_safe, + "metadata_suggest": ai_suggest + } + + # return successRes(content=response) + return response + + + + + + + + + + +async def handle_upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form(""), fileDesc: Optional[str] = Form("")): + fname = file.filename + ext = os.path.splitext(fname)[1].lower() + contents = await file.read() + size_mb = len(contents) / (1024*1024) + if size_mb > MAX_FILE_MB: + raise errorRes(status_code=413, message="Ukuran File Terlalu Besar") + tmp_path = UPLOAD_FOLDER / fname + with open(tmp_path, "wb") as f: + f.write(contents) + try: + df = None + print('ext', ext) + + if ext == ".csv": + df = read_csv(str(tmp_path)) + elif ext == ".xlsx": + df = read_csv(str(tmp_path), sheet) + elif ext == ".mpk": + df = read_mpk(str(tmp_path)) + elif ext == ".pdf": + tbl = read_pdf(tmp_path, page) + if len(tbl) == 0: + res = { + "message": "Tidak ditemukan tabel valid pada halaman yang dipilih", + "tables": {}, + "file_type": ext + } + return successRes(message="Tidak ditemukan tabel valid pada halaman yang dipilih", data=res) + elif len(tbl) > 1: + res = { + "message": "File berhasil dibaca dan dianalisis.", + "tables": tbl, + "file_type": ext + } + return successRes(data=res, message="File berhasil dibaca dan dianalisis.") + else: + df = convert_df(tbl[0]) + elif ext == ".zip": + zip_type = detect_zip_type(str(tmp_path)) + + if zip_type == "shp": + print("[INFO] ZIP terdeteksi sebagai Shapefile.") + df = read_shp(str(tmp_path)) + + elif zip_type == "gdb": + print("[INFO] ZIP terdeteksi sebagai Geodatabase (GDB).") + df = read_gdb(str(tmp_path)) + + else: + return successRes(message="ZIP file tidak mengandung SHP / GDB valid.") + else: + raise errorRes(status_code=400, message="Unsupported file type") + + if df is None or (hasattr(df, "empty") and df.empty): + return successRes(message="File berhasil dibaca, Tetapi tidak ditemukan tabel valid") + + res = process_data(df, ext, fname, fileDesc) + + tmp_path.unlink(missing_ok=True) + + return successRes(data=res) + + except Exception as e: + print(f"[ERROR] {e}") + return errorRes( + message="Internal Server Error", + details=str(e), + status_code=500 + ) + + # finally: + # db_session.close() + + + + + +class PdfRequest(BaseModel): + title: str + columns: List[str] + rows: List[List] + fileName: str + fileDesc: str + +async def handle_process_pdf(payload: PdfRequest): + try: + df = convert_df(payload.model_dump()) + if df is None or (hasattr(df, "empty") and df.empty): + return errorRes(message="Tidak ada tabel") + + res = process_data(df, '.pdf', payload.fileName, payload.fileDesc) + return successRes(data=res) + + except Exception as e: + print(f"[ERROR] {e}") + + return errorRes(message="Internal Server Error", details= str(e), status_code=500) + + # finally: + # db_session.close() + + + + + + + + +class UploadRequest(BaseModel): + title: str + rows: List[dict] + columns: List[str] + author: Dict[str, Any] + style: str + +# generate _2 if exist +async def generate_unique_table_name(base_name: str): + base_name = base_name.lower().replace(" ", "_").replace("-", "_") + table_name = base_name + counter = 2 + + async with engine.connect() as conn: + while True: + result = await conn.execute( + text("SELECT to_regclass(:tname)"), + {"tname": table_name} + ) + exists = result.scalar() + + if not exists: + return table_name + + table_name = f"{base_name}_{counter}" + counter += 1 + +def str_to_date(raw_date: str): + if raw_date: + try: + return datetime.strptime(raw_date, "%Y-%m-%d").date() + except Exception as e: + print("[WARNING] Tidak bisa parse dateCreated:", e) + return None + + + + + +def generate_job_id(user_id: str) -> str: + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + return f"{user_id}_{timestamp}" + + + +def save_xml_to_sld(xml_string, filename): + folder_path = 'style_temp' + os.makedirs(folder_path, exist_ok=True) + + file_path = os.path.join(folder_path, f"{filename}.sld") + + with open(file_path, "w", encoding="utf-8") as f: + f.write(xml_string) + + return file_path + + + + + + + +# async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): +# try: +# table_name = await generate_unique_table_name(payload.title) +# # DataFrame +# df = pd.DataFrame(payload.rows) +# df.columns = [col.upper() for col in df.columns] +# if "GEOMETRY" not in df.columns: +# raise HTTPException(400, "Kolom GEOMETRY tidak ditemukan") + +# # ===================================================================== +# # 1. LOAD WKT β†’ SHAPELY +# # ===================================================================== +# def safe_load_wkt(g): +# if not isinstance(g, str): +# return None +# try: +# geom = wkt.loads(g) +# return geom +# except: +# return None + +# df["GEOMETRY"] = df["GEOMETRY"].apply(safe_load_wkt) +# df = df.rename(columns={"GEOMETRY": "geom"}) + +# # ===================================================================== +# # 2. DROP ROW geometry NULL +# # ===================================================================== +# df = df[df["geom"].notnull()] +# if df.empty: +# raise HTTPException(400, "Semua geometry invalid atau NULL") + +# # ===================================================================== +# # 3. VALIDATE geometry (very important) +# # ===================================================================== +# df["geom"] = df["geom"].apply(lambda g: g if g.is_valid else g.buffer(0)) + +# # ===================================================================== +# # 4. SERAGAMKAN TIPE GEOMETRY (Polygonβ†’MultiPolygon, Lineβ†’MultiLine) +# # ===================================================================== +# def unify_geometry_type(g): +# gtype = g.geom_type.upper() +# if gtype == "POLYGON": +# return MultiPolygon([g]) +# if gtype == "LINESTRING": +# return MultiLineString([g]) +# return g # sudah MULTI atau POINT +# df["geom"] = df["geom"].apply(unify_geometry_type) + +# # ===================================================================== +# # 5. DETEKSI CRS DARI METADATA / INPUT / DEFAULT +# # ===================================================================== +# detected_crs = payload.author.get("crs") + +# detected = payload.author.get("crs") +# print('crs', detected) + +# if not detected_crs: +# detected_crs = "EPSG:4326" + +# detected_crs = 'EPSG:4326' +# # Buat GeoDataFrame +# gdf = gpd.GeoDataFrame(df, geometry="geom", crs=detected_crs) +# row_count = len(gdf) + +# # ===================================================================== +# # 6. VERIFY CRS (SRID) VALID di PROJ / PostGIS +# # ===================================================================== +# try: +# _ = gdf.to_crs(gdf.crs) # test CRS valid +# except: +# raise HTTPException(400, f"CRS {detected_crs} tidak valid") + +# # ===================================================================== +# # 7. SIMPAN KE POSTGIS (synchronous) +# # ===================================================================== +# loop = asyncio.get_running_loop() +# await loop.run_in_executor( +# None, +# lambda: gdf.to_postgis( +# table_name, +# sync_engine, +# if_exists="replace", +# index=False +# ) +# ) + +# # ===================================================================== +# # 8. ADD PRIMARY KEY (wajib untuk QGIS API) +# # ===================================================================== +# async with engine.begin() as conn: +# await conn.execute(text( +# f'ALTER TABLE "{table_name}" ADD COLUMN _ID SERIAL PRIMARY KEY;' +# )) + +# # ===================================================================== +# # 9. SIMPAN METADATA (geom_type, author metadata) +# # ===================================================================== +# unified_geom_type = list(gdf.geom_type.unique()) +# author = payload.author +# async with engine.begin() as conn: +# await conn.execute(text(""" +# INSERT INTO backend.author_metadata ( +# table_title, +# dataset_title, +# dataset_abstract, +# keywords, +# topic_category, +# date_created, +# dataset_status, +# organization_name, +# contact_person_name, +# contact_email, +# contact_phone, +# geom_type, +# user_id, +# process, +# geometry_count +# ) VALUES ( +# :table_title, +# :dataset_title, +# :dataset_abstract, +# :keywords, +# :topic_category, +# :date_created, +# :dataset_status, +# :organization_name, +# :contact_person_name, +# :contact_email, +# :contact_phone, +# :geom_type, +# :user_id, +# :process, +# :geometry_count +# ) +# """), { +# "table_title": table_name, +# "dataset_title": payload.title, +# "dataset_abstract": author.get("abstract"), +# "keywords": author.get("keywords"), +# # "topic_category": author.get("topicCategory"), +# "topic_category": ", ".join(author.get("topicCategory")), +# "date_created": str_to_date(author.get("dateCreated")), +# "dataset_status": author.get("status"), +# "organization_name": author.get("organization"), +# "contact_person_name": author.get("contactName"), +# "contact_email": author.get("contactEmail"), +# "contact_phone": author.get("contactPhone"), +# "geom_type": json.dumps(unified_geom_type), +# "user_id": user_id, +# "process": 'CLEANSING', +# "geometry_count": row_count +# }) + + +# # ===================================================================== +# # 10. LOGGING +# # ===================================================================== +# await log_activity( +# user_id=user_id, +# action_type="UPLOAD", +# action_title=f"Upload dataset {table_name}", +# details={"table_name": table_name, "rows": len(gdf)} +# ) + +# job_id = generate_job_id(str(user_id)) +# result = { +# "job_id": job_id, +# "job_status": "wait", +# "table_name": table_name, +# "status": "success", +# "message": f"Tabel '{table_name}' berhasil dibuat.", +# "total_rows": len(gdf), +# "geometry_type": unified_geom_type, +# "crs": detected_crs, +# "metadata_uuid": "" +# } +# save_xml_to_sld(payload.style, job_id) + +# await report_progress(job_id, "upload", 20, "Upload selesai") +# # cleansing_data(table_name, job_id) + +# cleansing = await query_cleansing_data(table_name) +# result['job_status'] = cleansing + +# uuid = await publish_layer(table_name, job_id) +# result['metadata_uuid'] = uuid + +# return successRes(data=result) + +# except Exception as e: +# await log_activity( +# user_id=user_id, +# action_type="ERROR", +# action_title="Upload gagal", +# details={"error": str(e)} +# ) +# raise HTTPException(status_code=500, detail=str(e)) + +async def handle_to_postgis(payload: UploadRequest, user_id: int = 2): + try: + job_id = generate_job_id(str(user_id)) + result = { + "job_id": job_id, + "job_status": "done", + "table_name": "just for test", + "status": "success", + "message": f"Tabel test berhasil dibuat.", + "total_rows": 10, + "geometry_type": "Polygon", + "crs": "EPSG 4326", + "metadata_uuid": "-" + } + + return successRes(data=result) + + except Exception as e: + print("gtw") + + + + + + + + + + + + + + +# =================================== +# partition +VIEW +# =================================== + + +# Daftar prefix WKT yang valid +# VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING") + + +def slugify(value: str) -> str: + """Mengubah judul dataset jadi nama aman untuk VIEW""" + return re.sub(r'[^a-zA-Z0-9]+', '_', value.lower()).strip('_') + + + +# Partition + VIEW +# async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str): +# norm_title = slugify(title) +# view_name = f"v_user_{user_id}_{norm_title}" +# base_table = f"test_partition_user_{user_id}" + +# # Ambil daftar field +# result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id}) +# fields_json = result.scalar_one_or_none() + +# base_columns = {"id", "user_id", "metadata_id", "geom"} +# columns_sql = "" +# field_list = [] + +# if fields_json: +# fields = json.loads(fields_json) if isinstance(fields_json, str) else fields_json +# field_list = fields + +# for f in field_list: +# safe_col = slugify(f) +# alias_name = safe_col if safe_col not in base_columns else f"attr_{safe_col}" + +# # CAST otomatis +# if safe_col in ["longitude", "latitude", "lon", "lat"]: +# columns_sql += f", (p.attributes->>'{f}')::float AS {alias_name}" +# else: +# columns_sql += f", p.attributes->>'{f}' AS {alias_name}" + +# # Drop view lama +# await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;")) + +# # πŸ”₯ Buat VIEW baru yang punya FID unik +# create_view_query = f""" +# CREATE OR REPLACE VIEW {view_name} AS +# SELECT +# row_number() OVER() AS fid, -- FID unik untuk QGIS +# p.id, +# p.user_id, +# p.metadata_id, +# p.geom +# {columns_sql}, +# m.title, +# m.year, +# m.description +# FROM {base_table} p +# JOIN dataset_metadata m ON m.id = p.metadata_id +# WHERE p.metadata_id = {metadata_id}; +# """ +# await conn.execute(text(create_view_query)) + +# # Register geometry untuk QGIS +# await conn.execute(text(f"DELETE FROM geometry_columns WHERE f_table_name = '{view_name}';")) +# await conn.execute(text(f""" +# INSERT INTO geometry_columns +# (f_table_schema, f_table_name, f_geometry_column, coord_dimension, srid, type) +# VALUES ('public', '{view_name}', 'geom', 2, 4326, 'GEOMETRY'); +# """)) + +# print(f"[INFO] VIEW {view_name} dibuat dengan FID unik dan kompatibel dengan QGIS.") + + +# async def handle_to_postgis(payload, engine, user_id: int = 3): +# """ +# Menangani upload data spasial ke PostGIS (dengan partition per user). +# - Jika partisi belum ada, akan dibuat otomatis +# - Metadata dataset disimpan di tabel dataset_metadata +# - Data spasial dimasukkan ke tabel partisi (test_partition_user_{id}) +# - VIEW otomatis dibuat untuk QGIS +# """ + +# try: +# df = pd.DataFrame(payload.rows) +# print(f"[INFO] Diterima {len(df)} baris data dari frontend.") + +# # --- Validasi kolom geometry --- +# if "geometry" not in df.columns: +# raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.") + +# # --- Parsing geometry ke objek shapely --- +# df["geometry"] = df["geometry"].apply( +# lambda g: wkt.loads(g) +# if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) +# else None +# ) + +# # --- Buat GeoDataFrame --- +# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") + +# # --- Metadata info dari payload --- +# # dataset_title = getattr(payload, "dataset_title", None) +# # dataset_year = getattr(payload, "dataset_year", None) +# # dataset_desc = getattr(payload, "dataset_description", None) +# dataset_title = "hujan 2045" +# dataset_year = 2045 +# dataset_desc = "test metadata" + +# if not dataset_title: +# raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.") + +# async with engine.begin() as conn: +# fields = [col for col in df.columns if col != "geometry"] +# # πŸ’Ύ 1️⃣ Simpan Metadata Dataset +# print("[INFO] Menyimpan metadata dataset...") +# result = await conn.execute( +# text(""" +# INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at) +# VALUES (:user_id, :title, :year, :desc, :fields, :created_at) +# RETURNING id; +# """), +# { +# "user_id": user_id, +# "title": dataset_title, +# "year": dataset_year, +# "desc": dataset_desc, +# "fields": json.dumps(fields), +# "created_at": datetime.utcnow(), +# }, +# ) +# metadata_id = result.scalar_one() +# print(f"[INFO] Metadata disimpan dengan ID {metadata_id}") + +# # βš™οΈ 2️⃣ Auto-create Partisi Jika Belum Ada +# print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...") +# await conn.execute( +# text(f""" +# DO $$ +# BEGIN +# IF NOT EXISTS ( +# SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}' +# ) THEN +# EXECUTE format(' +# CREATE TABLE test_partition_user_%s +# PARTITION OF test_partition +# FOR VALUES IN (%s); +# ', {user_id}, {user_id}); +# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id}); +# EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id}); +# END IF; +# END +# $$; +# """) +# ) + +# # 🧩 3️⃣ Insert Data Spasial ke Partisi +# print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...") +# insert_count = 0 +# for _, row in gdf.iterrows(): +# geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None +# attributes = row.drop(labels=["geometry"]).to_dict() + +# await conn.execute( +# text(""" +# INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at) +# VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)), +# CAST(:attr AS jsonb), :created_at); +# """), +# { +# "user_id": user_id, +# "metadata_id": metadata_id, +# "geom": geom_wkt, +# "attr": json.dumps(attributes), +# "created_at": datetime.utcnow(), +# }, +# ) +# insert_count += 1 + +# # 🧩 4️⃣ Membuat VIEW untuk dataset baru di QGIS +# await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title) + +# print(f"[INFO] βœ… Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).") + +# return { +# "status": "success", +# "user_id": user_id, +# "metadata_id": metadata_id, +# "dataset_title": dataset_title, +# "inserted_rows": insert_count, +# "geometry_type": list(gdf.geom_type.unique()), +# } + +# except Exception as e: +# print(f"[ERROR] Gagal upload ke PostGIS partition: {e}") +# raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e)) + diff --git a/services/upload_file/upload_exceptions.py b/services/upload_file/upload_exceptions.py new file mode 100644 index 0000000..ae496ce --- /dev/null +++ b/services/upload_file/upload_exceptions.py @@ -0,0 +1,9 @@ +class PDFReadError(Exception): + """Exception khusus untuk kesalahan saat membaca file PDF.""" + def __init__(self, message: str, code: int = 400): + super().__init__(message) + self.message = message + self.code = code + + def to_dict(self): + return {"error": self.message, "code": self.code} diff --git a/services/upload_file/upload_ws.py b/services/upload_file/upload_ws.py new file mode 100644 index 0000000..4c3727e --- /dev/null +++ b/services/upload_file/upload_ws.py @@ -0,0 +1,27 @@ +# app/jobs/progress.py +from typing import Dict +from api.routers.ws.manager import manager + +# state job (in-memory dulu) +job_state: Dict[str, dict] = {} + + +async def report_progress( + job_id: str, + step: str, + progress: int, + message: str +): + """ + Fungsi tunggal untuk update & broadcast progress job + """ + + job_state[job_id] = { + "job_id": job_id, + "step": step, + "progress": progress, + "message": message, + } + + # push ke websocket + await manager.send(job_id, job_state[job_id]) diff --git a/services/upload_file/utils/geometry_detector.py b/services/upload_file/utils/geometry_detector.py new file mode 100644 index 0000000..80b0278 --- /dev/null +++ b/services/upload_file/utils/geometry_detector.py @@ -0,0 +1,466 @@ +import geopandas as gpd +from shapely.geometry import Point, LineString +import pandas as pd +import numpy as np +import re +import os +from shapely import wkt +from rapidfuzz import process, fuzz +from sqlalchemy import create_engine +from shapely.geometry.base import BaseGeometry +from core.config import REFERENCE_DB_URL, REFERENCE_SCHEMA, DESA_REF, KEC_REF, KAB_REF + +# ============================================================ +# KONFIGURASI DAN KONSTANTA +# ============================================================ + +COLUMN_ALIASES = { + 'desa': ['desa', 'kelurahan', 'desa_kelurahan', 'desa/kelurahan', 'nama_desa', 'nama_kelurahan', 'Desa/Kel'], + 'kecamatan': ['kec', 'kecamatan', 'nama_kec', 'nama_kecamatan'], + 'kabupaten': ['kab', 'kabupaten', 'kota', 'kabupaten_kota', 'kota_kabupaten', 'kab/kota', 'kota/kabupaten', 'kota/kab'] +} + +# ============================================================ +# FUNGSI BANTU ADMINISTRATIF +# ============================================================ + +def find_admin_column(df, aliases): + """Mencari kolom yang paling cocok untuk tiap level admin (desa/kec/kab)""" + matched = {} + for level, alias_list in aliases.items(): + for col in df.columns: + col_norm = col.strip().lower().replace(' ', '_').replace('/', '_') + if any(alias in col_norm for alias in alias_list): + matched[level] = col + break + return matched + + +def detect_smallest_admin_level(df): + """Mendeteksi level administratif terkecil yang ada di DataFrame""" + cols = [c.lower() for c in df.columns] + if any('desa' in c or 'kelurahan' in c for c in cols): + return 'desa' + elif any('kecamatan' in c for c in cols): + return 'kecamatan' + elif any('kab' in c or 'kota' in c for c in cols): + return 'kabupaten' + return None + + +def fuzzy_merge(df, master, left_key, right_key, threshold=85): + """Melakukan fuzzy matching antar nama wilayah""" + matches = df[left_key].apply( + lambda x: process.extractOne(str(x), master[right_key], score_cutoff=threshold) + ) + df['match'] = matches.apply(lambda m: m[0] if m else None) + merged = df.merge(master, left_on='match', right_on=right_key, how='left') + return merged + + + + + +def normalize_name(name: str, level: str = None): + if not isinstance(name, str): + return None + + name = name.strip() + if not name: + return None + + name = re.sub(r'\s*\([^)]*\)\s*', '', name) + + raw = name.lower() + raw = re.sub(r'^(desa|kelurahan|kel|dusun|kampung)\s+', '', raw) + raw = re.sub(r'^(kecamatan|kec)\s+', '', raw) + raw = re.sub(r'^(kabupaten|kab\.?|kab)\s+', '', raw) + + if level in ["kabupaten", "kota"]: + raw = re.sub(r'^(kota\s+)', '', raw) + + raw = re.sub(r'[^a-z\s]', '', raw) + raw = re.sub(r'\s+', ' ', raw).strip() + + tokens = raw.split() + + merged_tokens = [] + i = 0 + while i < len(tokens): + if i < len(tokens) - 1: + sim = fuzz.ratio(tokens[i], tokens[i + 1]) + if sim > 75: + merged_tokens.append(tokens[i] + tokens[i + 1]) + i += 2 + continue + merged_tokens.append(tokens[i]) + i += 1 + + cleaned_tokens = [] + prev = None + for tok in merged_tokens: + if prev and fuzz.ratio(prev, tok) > 95: + continue + cleaned_tokens.append(tok) + prev = tok + + raw = " ".join(cleaned_tokens) + formatted = raw.title() + + if level in ["kabupaten", "kota"]: + if "kota" in name.lower(): + if not formatted.startswith("Kota "): + formatted = f"Kota {formatted}" + else: + formatted = formatted.replace("Kota ", "") + + return formatted + + + + +def is_geom_empty(g): + """True jika geometry None, NaN, atau geometry Shapely kosong.""" + if g is None: + return True + if isinstance(g, float) and pd.isna(g): + return True + if isinstance(g, BaseGeometry): + return g.is_empty + return False + + + + + +import math + +def normalize_lon(val, is_lat=False): + if pd.isna(val): + return None + try: + v = float(val) + except: + return None + + av = abs(v) + if av == 0: + return v + + if (-180 <= v <= 180 and not is_lat) or (-90 <= v <= 90 and is_lat): + return v + + for factor in [1, 10, 100, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9]: + nv = v / factor + if (not is_lat and -180 <= nv <= 180) or (is_lat and -90 <= nv <= 90): + return nv + + return None + + + +def normalize_lat(val): + if pd.isna(val): + return None + v = float(val) + av = abs(v) + if av > 1e9: # contoh: -8167413802 (10 digit) + return v / 1e9 + elif av > 1e8: # fallback jika ada variasi + return v / 1e8 + else: + return v + + +# ============================================================ +# FUNGSI UTAMA GEOMETRY DETECTION (LAT/LON / PATH) +# ============================================================ +def detect_and_build_geometry(df: pd.DataFrame, master_polygons: gpd.GeoDataFrame = None): + """ + Mendeteksi dan membentuk geometry dari DataFrame. + Bisa dari lat/lon, WKT, atau join ke master polygon (jika disediakan). + """ + + if isinstance(df, gpd.GeoDataFrame): + geom_cols = [ + c for c in df.columns + if re.match(r'^(geometry|geom|the_geom|wkb_geometry)$', c, re.IGNORECASE) + or c.lower().startswith("geom") + or c.lower().endswith("geometry") + ] + # if "geometry" in df.columns and df.geometry.notna().any(): + if geom_cols: + geom_count = df.geometry.notna().sum() + geom_type = list(df.geom_type.unique()) + print(f"[INFO] Detected existing geometry in GeoDataFrame ({geom_count} features, {geom_type}).") + return df + + lat_col = next((c for c in df.columns if re.search(r'\b(lat|latitude|y[_\s]*coord|y$)\b', c.lower())), None) + lon_col = next((c for c in df.columns if re.search(r'\b(lon|long|longitude|x[_\s]*coord|x$)\b', c.lower())), None) + + if lat_col and lon_col: + df[lat_col] = pd.to_numeric(df[lat_col], errors='coerce') + df[lon_col] = pd.to_numeric(df[lon_col], errors='coerce') + + df[lon_col] = df[lon_col].apply(lambda x: normalize_lon(x, is_lat=False)) + df[lat_col] = df[lat_col].apply(normalize_lat) + + gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs="EPSG:4326") + print("[INFO] Geometry dibangun dari kolom lat/lon.") + return gdf + + coord_col = next( + (c for c in df.columns if re.search(r'(geom|geometry|wkt|shp|shape|path|coord)', c.lower())), None + ) + + if coord_col and df[coord_col].notnull().any(): + sample_val = str(df[coord_col].dropna().iloc[0]).strip() + + if sample_val.startswith('['): + def parse_geom(val): + try: + pts = eval(val) + return LineString(pts) + except Exception: + return None + df['geometry'] = df[coord_col].apply(parse_geom) + gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326") + print("[INFO] Geometry dibangun dari kolom koordinat/path (list of points).") + return gdf + + elif any(x in sample_val.upper() for x in ["POINT", "LINESTRING", "POLYGON"]): + try: + df['geometry'] = df[coord_col].apply( + lambda g: wkt.loads(g) if isinstance(g, str) and any( + x in g.upper() for x in ["POINT", "LINESTRING", "POLYGON"] + ) else None + ) + gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326") + print("[INFO] Geometry dibangun dari kolom WKT (Point/Line/Polygon/MultiPolygon).") + return gdf + except Exception as e: + print(f"[WARN] Gagal parsing kolom geometry sebagai WKT: {e}") + + + + if master_polygons is not None: + df.columns = df.columns.str.lower().str.strip().str.replace(' ', '_').str.replace('/', '_') + matches = find_admin_column(df, COLUMN_ALIASES) + + if 'desa' in matches: + admin_col = matches['desa'] + merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_desa', how='left') + if merged['geometry'].isna().sum() > 0: + merged = fuzzy_merge(df, master_polygons, admin_col, 'nama_desa') + gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) + return gdf + + elif 'kecamatan' in matches: + admin_col = matches['kecamatan'] + merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kecamatan', how='left') + gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) + return gdf + + elif 'kabupaten' in matches: + admin_col = matches['kabupaten'] + merged = df.merge(master_polygons, left_on=admin_col, right_on='nama_kabupaten', how='left') + gdf = gpd.GeoDataFrame(merged, geometry='geometry', crs=master_polygons.crs) + return gdf + + print("[WARN] Tidak ditemukan geometry (lat/lon, path, atau master).") + return df + + +# def get_reference_polygons(level): +# """Mengambil data batas wilayah (MultiPolygon) dari DB referensi""" +# table_map = { +# 'desa': f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim", +# 'kecamatan': f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim", +# 'kabupaten': f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim" +# } + +# table_name = table_map.get(level) +# if not table_name: +# raise ValueError(f"Tidak ada tabel referensi untuk level '{level}'.") + +# engine = create_engine(REFERENCE_DB_URL) +# query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}" +# gdf = gpd.read_postgis(query, engine, geom_col='geometry') + +# print(f"[INFO] {len(gdf)} data referensi '{level}' berhasil dimuat dari {table_name}.") +# return gdf + + +from functools import lru_cache + +@lru_cache(maxsize=3) +def get_reference_polygons(level): + local_path = f"cache/{level}_ref.parquet" + if os.path.exists(local_path): + print(f"[CACHE] Memuat referensi '{level}' dari file lokal.") + return gpd.read_parquet(local_path) + + print(f"[DB] Mengambil data referensi '{level}' dari database...") + table_map = { + "desa": f"{REFERENCE_SCHEMA}.administrasi_ar_keldesa_jatim", + "kecamatan": f"{REFERENCE_SCHEMA}.administrasi_ar_kec_jatim", + "kabupaten": f"{REFERENCE_SCHEMA}.administrasi_ar_kabkot_jatim" + } + table_name = table_map.get(level) + engine = create_engine(REFERENCE_DB_URL) + query = f"SELECT *, ST_Multi(geom) AS geometry FROM {table_name}" + gdf = gpd.read_postgis(query, engine, geom_col="geometry") + gdf.to_parquet(local_path) + print(f"[CACHE] Disimpan ke {local_path}") + return gdf + + + + + + +# ============================================================ +# Optimize Join +# ============================================================ +def build_join_key(df, cols): + arr = df[cols].astype(str).replace("nan", "", regex=False).to_numpy() + return np.char.add.reduce(np.column_stack( + [arr[:, i] + ("|" if i < len(cols) - 1 else "") for i in range(len(cols))] + ), axis=1) + + +# ============================================================ +# FUNGSI: AUTO ATTACH POLYGON KE DATAFRAME NON-SPASIAL +# ============================================================ +def attach_polygon_geometry_auto(df: pd.DataFrame): + """ + Tambahkan kolom geometry MultiPolygon berdasarkan kombinasi + (desa/kelurahan + kecamatan + kabupaten/kota), tanpa duplikasi baris. + """ + level = detect_smallest_admin_level(df) + if not level: + print("[WARN] Tidak ditemukan kolom administratif (desa/kecamatan/kabupaten).") + return df + + print(f"[INFO] Detected smallest admin level: {level}") + ref_gdf = get_reference_polygons(level) + + desa_col = next((c for c in df.columns if any(x in c.lower() for x in ['desa', 'kelurahan'])), None) + kec_col = next((c for c in df.columns if 'kec' in c.lower()), None) + kab_col = next((c for c in df.columns if any(x in c.lower() for x in ['kab', 'kota'])), None) + + if desa_col and (not kec_col or not kab_col): + print("[ERROR] Kolom 'Desa' ditemukan tetapi kolom 'Kecamatan' dan/atau 'Kabupaten' tidak lengkap.") + print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") + return df + + elif not desa_col and kec_col and not kab_col: + print("[ERROR] Kolom 'Kecamatan' ditemukan tetapi kolom 'Kabupaten/Kota' tidak ditemukan.") + print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") + return df + + elif kab_col and not desa_col and not kec_col : + print("[INFO] Struktur kolom administratif valid (minimal Kabupaten/Kota ditemukan).") + print(f"[DEBUG] Ditemukan: Desa={desa_col}, Kec={kec_col}, Kab={kab_col}") + + elif not desa_col and not kec_col and not kab_col: + print("[WARN] Tidak ditemukan kolom administratif apapun (Desa/Kecamatan/Kabupaten).") + print(f"[DEBUG] Kolom CSV: {list(df.columns)}") + return df + + # kolom di referensi + desa_ref = DESA_REF + kec_ref = KEC_REF + kab_ref = KAB_REF + + if desa_col is not None: + df[desa_col] = df[desa_col].astype(str).apply(lambda x: normalize_name(x, "desa")) + + if kec_col is not None: + df[kec_col] = df[kec_col].astype(str).apply(lambda x: normalize_name(x, "kecamatan")) + + if kab_col is not None: + df[kab_col] = df[kab_col].astype(str).apply(lambda x: normalize_name(x, "kabupaten")) + + + if desa_ref is not None: + ref_gdf[desa_ref] = ref_gdf[desa_ref].astype(str).apply(lambda x: normalize_name(x, "desa")) + + if kec_ref is not None: + ref_gdf[kec_ref] = ref_gdf[kec_ref].astype(str).apply(lambda x: normalize_name(x, "kecamatan")) + + if kab_ref is not None: + ref_gdf[kab_ref] = ref_gdf[kab_ref].astype(str).apply(lambda x: normalize_name(x, "kabupaten")) + + + + + join_cols = [col for col in [desa_col, kec_col, kab_col] if col] + + if not join_cols: + print("[ERROR] Tidak ada kolom administratif yang bisa digunakan untuk join key.") + else: + join_cols_df = [col for col in [desa_col, kec_col, kab_col] if col] + join_cols_ref = [col for col in [desa_ref, kec_ref, kab_ref] if col] + + common_depth = min(len(join_cols_df), len(join_cols_ref)) + join_cols_df = join_cols_df[-common_depth:] + join_cols_ref = join_cols_ref[-common_depth:] + + # print(f"[DEBUG] Join kolom DF : {join_cols_df}") + # print(f"[DEBUG] Join kolom REF : {join_cols_ref}") + + # df["_join_key"] = df[join_cols_df].astype(str).agg("|".join, axis=1) + # ref_gdf["_join_key"] = ref_gdf[join_cols_ref].astype(str).agg("|".join, axis=1) + + df["_join_key"] = build_join_key(df, join_cols_df) + ref_gdf["_join_key"] = build_join_key(ref_gdf, join_cols_ref) + + + # print(f"[INFO] Join key berhasil dibuat dari kolom: {join_cols_df}") + + ref_lookup = ref_gdf[["_join_key", "geometry"]].drop_duplicates(subset=["_join_key"]) + df = df.merge(ref_lookup, how="left", on="_join_key") + matched = df["geometry"].notna().sum() + # print(f"[INFO] {matched} dari {len(df)} baris cocok langsung berdasarkan (desa + kec + kab/kota).") + + if matched < len(df): + unmatched = df[df["geometry"].isna()] + # print(f"[INFO] Melakukan fuzzy match untuk {len(unmatched)} baris yang belum cocok...") + + ref_dict = dict(zip(ref_lookup["_join_key"], ref_lookup["geometry"])) + + def find_fuzzy_geom(row): + key = row["_join_key"] + if not isinstance(key, str): + return None + # fuzzy old + # match = process.extractOne(key, list(ref_dict.keys()), scorer=fuzz.token_sort_ratio) + # fuzzy new + match = process.extractOne( + key, list(ref_dict.keys()), scorer=fuzz.token_set_ratio, score_cutoff=80 + ) + + if match and match[1] >= 85: + return ref_dict[match[0]] + return None + + df.loc[df["geometry"].isna(), "geometry"] = df[df["geometry"].isna()].apply(find_fuzzy_geom, axis=1) + + df = df.drop(columns=["_join_key"], errors="ignore") + + # admin_cols = [col for col in [desa_col, kec_col, kab_col] if col and col in df.columns] + # if matched < len(df): + # diff = df[df['geometry'].isna()][admin_cols] + + # print("[DEBUG] Baris yang tidak match:") + # if diff.empty: + # print("(semua baris berhasil match)") + # else: + # print(diff.to_string(index=False)) + + + # print(f"[REPORT] Total match: {df['geometry'].notna().sum()} / {len(df)} ({df['geometry'].notna().mean()*100:.2f}%)") + + + return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") diff --git a/services/upload_file/utils/pdf_cleaner.py b/services/upload_file/utils/pdf_cleaner.py new file mode 100644 index 0000000..69d84c0 --- /dev/null +++ b/services/upload_file/utils/pdf_cleaner.py @@ -0,0 +1,208 @@ +import re +import itertools + +geo_admin_keywords = [ + 'lat', 'lon', 'long', 'latitude', 'longitude', 'koordinat', 'geometry', 'geometri', + 'desa', 'kelurahan', 'kel', 'kecamatan', 'kabupaten', 'kab', 'kota', 'provinsi', + 'lokasi', 'region', 'area', 'zone', 'boundary', 'batas' +] + +def normalize_text(text): + text = text.lower() + text = re.sub(r'[^a-z0-9/ ]+', ' ', text) + text = re.sub(r'\s+', ' ', text).strip() + return text + +def generate_combined_patterns(keywords): + combos = list(itertools.combinations(keywords, 2)) + patterns = [] + for a, b in combos: + patterns.append(rf'{a}\s*/\s*{b}') + patterns.append(rf'{b}\s*/\s*{a}') + return patterns + +combined_patterns = generate_combined_patterns(geo_admin_keywords) + +def contains_geo_admin_keywords(text): + text_clean = normalize_text(text) + if len(text_clean) < 3: + return False + + for pattern in combined_patterns: + if re.search(pattern, text_clean): + return True + + for kw in geo_admin_keywords: + if re.search(rf'(^|[\s/_-]){kw}([\s/_-]|$)', text_clean): + return True + + return False + +def filter_geo_admin_column(tables): + filtered = [] + for table in tables: + found = any(contains_geo_admin_keywords(col) for col in table['columns']) + if found: + filtered.append(table) + return filtered + + +NUMBER_HEADER_KEYWORDS = [ + "no","no.","nomor","nomor urut","no urut","No","Nomor","No Urut","Index", + "ID","Sr No","S/N","SN","Sl No" +] + +def has_number_header(header): + header_text = header + return any(keyword in header_text for keyword in NUMBER_HEADER_KEYWORDS) + +def is_numbering_column(col_values): + numeric_like = 0 + total = 0 + for v in col_values: + if not v or not isinstance(v, str): + continue + total += 1 + if re.fullmatch(r"0*\d{1,3}", v.strip()): + numeric_like += 1 + return total > 0 and (numeric_like / total) > 0.6 + +def is_numeric_value(v): + if v is None: + return False + if isinstance(v, (int, float)): + return True + if isinstance(v, str) and re.fullmatch(r"0*\d{1,3}", v.strip()): + return True + return False + +def cleaning_column(headers, bodies): + cleaned_bodies = [] + + for header, body in zip(headers, bodies): + if not body: + cleaned_bodies.append(body) + continue + + header_has_number = has_number_header(header) + first_col = [row[0] for row in body if row and len(row) > 0] + first_col_is_numbering = is_numbering_column(first_col) + + if not header_has_number and first_col_is_numbering: + new_body = [] + for row in body: + if not row: + continue + first_val = row[0] + if is_numeric_value(first_val) and len(row) > 1: + new_body.append(row[1:]) + else: + new_body.append(row) + body = new_body + + header_len = len(headers) + filtered_body = [row for row in body if len(row) == header_len] + + cleaned_bodies.append(filtered_body) + + return cleaned_bodies + +def parse_page_selection(selectedPage: str, total_pages: int): + if not selectedPage: + return list(range(1, total_pages + 1)) + + pages = set() + parts = re.split(r'[,\s]+', selectedPage.strip()) + + for part in parts: + if '-' in part: + try: + start, end = map(int, part.split('-')) + pages.update(range(start, end + 1)) + except ValueError: + continue + else: + try: + pages.add(int(part)) + except ValueError: + continue + + valid_pages = [p for p in sorted(pages) if 1 <= p <= total_pages] + return valid_pages + +def is_number(s): + if s is None: + return False + s = str(s).strip().replace(',', '').replace('.', '') + return s.isdigit() + +def row_ratio(row): + non_empty = [c for c in row if c not in (None, '', ' ')] + if not non_empty: + return 0 + num_count = sum(is_number(c) for c in non_empty) + return num_count / len(non_empty) + +def has_mixed_text_and_numbers(row): + non_empty = [c for c in row if c not in (None, '', ' ')] + has_text = any(isinstance(c, str) and re.search(r'[A-Za-z]', str(c)) for c in non_empty) + has_num = any(is_number(c) for c in non_empty) + return has_text and has_num + +def is_short_text_row(row): + """Deteksi baris teks pendek (1-2 kolom teks pendek).""" + non_empty = [str(c).strip() for c in row if c not in (None, '', ' ')] + if not non_empty: + return False + text_only = all(not is_number(c) for c in non_empty) + joined = " ".join(non_empty) + return text_only and len(non_empty) <= 2 and len(joined) < 20 + + + + + + + + +def get_number_column_index(columns): + for i, col in enumerate(columns): + if has_number_header(col): + return i + return None + +def get_start_end_number(rows, idx): + try: + start_no = int(rows[0][idx]) + end_no = int(rows[-1][idx]) + return start_no, end_no + except: + return None, None + +def normalize_number_column(table): + columns = table["columns"] + rows = table["rows"] + + num_idx = get_number_column_index(columns) + if num_idx is None: + return table + + current = None + + for row in rows: + try: + val = int(row[num_idx]) + except: + continue + + if current is None: + current = val + else: + if val <= current: + current += 1 + else: + current = val + + row[num_idx] = str(current) + + return table diff --git a/utils/logger_config.py b/utils/logger_config.py new file mode 100644 index 0000000..55f1367 --- /dev/null +++ b/utils/logger_config.py @@ -0,0 +1,65 @@ +import logging +import os +import json +from sqlalchemy import text +from database.connection import engine + +LOG_DIR = "logs" +os.makedirs(LOG_DIR, exist_ok=True) + +def setup_logger(name: str): + """ + Konfigurasi logger standar untuk seluruh service. + Format log: + [LEVEL] [Nama Modul] Pesan + """ + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + + # Handler untuk menulis ke file + file_handler = logging.FileHandler(os.path.join(LOG_DIR, "app.log")) + file_handler.setLevel(logging.INFO) + + # Handler untuk console (stdout) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + + formatter = logging.Formatter('[%(levelname)s] [%(name)s] %(message)s') + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + if not logger.handlers: + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger + + + + + +async def log_activity(user_id, action_type, action_title, details=None): + payload = { + "user_id": user_id, + "action_type": action_type, + "action_title": action_title, + "action_details": json.dumps(details) if details else None + } + + async with engine.begin() as conn: + await conn.execute( + text(""" + INSERT INTO backend.system_logs ( + user_id, + action_type, + action_title, + action_details + ) VALUES ( + :user_id, + :action_type, + :action_title, + :action_details + ) + """), + payload + ) diff --git a/utils/qgis_init.py b/utils/qgis_init.py new file mode 100644 index 0000000..9c27cb4 --- /dev/null +++ b/utils/qgis_init.py @@ -0,0 +1,30 @@ +# utils/qgis_init.py +import os +import sys + +# Lokasi instalasi QGIS di Linux (Ubuntu / Debian) +QGIS_PREFIX = "/usr" + +# Path modul Python QGIS +sys.path.append("/usr/share/qgis/python") + +# Environment variable agar QGIS dapat berjalan headless (tanpa GUI) +os.environ["QGIS_PREFIX_PATH"] = QGIS_PREFIX +os.environ["QT_QPA_PLATFORM"] = "offscreen" + +from qgis.core import QgsApplication +from qgis.analysis import QgsNativeAlgorithms +import processing +from processing.core.Processing import Processing + + +def init_qgis(): + qgs = QgsApplication([], False) + qgs.initQgis() + + # Register QGIS processing provider + Processing.initialize() + QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) + + print("QGIS initialized successfully") + return qgs