import os import pandas as pd import geopandas as gpd import numpy as np import zipfile from shapely.geometry.base import BaseGeometry from shapely.geometry import base as shapely_base from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.responses import JSONResponse from core.config import UPLOAD_FOLDER, MAX_FILE_MB from services.reader_csv import read_csv from services.reader_shp import read_shp from services.reader_gdb import read_gdb # from services.reader_pdf import convert_df, read_pdf from testing.test_pdf_multi import convert_df, read_pdf from services.geometry_detector import detect_and_build_geometry from services.geometry_detector import attach_polygon_geometry_auto from database.connection import engine from database.models import Base import time import pathlib from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List from shapely import wkt from sqlalchemy import text UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True) apiVersion = "1.1.0" app = FastAPI( title="ETL Geo Upload Service", version=apiVersion, description="Upload Automation API" ) origins = [ "http://localhost:3000", "http://127.0.0.1:3000", "http://localhost:5173", "http://127.0.0.1:5173", "192.168.60.24:5173", "http://labai.polinema.ac.id:666", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Create upload_logs table if not exists Base.metadata.create_all(bind=engine) def generate_table_name(filename: str, prefix: str = "data"): name = pathlib.Path(filename).stem ts = time.strftime("%Y%m%d%H%M%S") safe = "".join([c if c.isalnum() or c=='_' else '_' for c in name]) return f"{prefix}_{safe}_{ts}" def is_geom_empty(g): if g is None: return True if isinstance(g, float) and pd.isna(g): return True if isinstance(g, BaseGeometry): return g.is_empty return False def safe_json(value): """Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable""" if isinstance(value, (np.int64, np.int32)): return int(value) if isinstance(value, (np.float64, np.float32)): return float(value) if isinstance(value, pd.Timestamp): return value.isoformat() if isinstance(value, shapely_base.BaseGeometry): return str(value) # ubah ke WKT string if pd.isna(value): return None return value def detect_zip_type(zip_path: str) -> str: with zipfile.ZipFile(zip_path, "r") as zip_ref: files = zip_ref.namelist() if any(f.lower().endswith(".gdb/") or ".gdb/" in f.lower() for f in files): return "gdb" if any(f.lower().endswith(ext) for ext in [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files): return "gdb" if any(f.lower().endswith(".shp") for f in files): return "shp" return "unknown" from datetime import datetime @app.get("/status", tags=["System"]) async def server_status(): response = { "status": "success", "message": "Server is running smoothly ✅", "data": { "service": "upload_automation", "status_code": 200, "timestamp": datetime.utcnow().isoformat() + "Z", }, "meta": { "version": apiVersion, "environment": "deployment" } } return response @app.post("/upload") async def upload_file(file: UploadFile = File(...)): fname = file.filename ext = os.path.splitext(fname)[1].lower() contents = await file.read() size_mb = len(contents) / (1024*1024) if size_mb > MAX_FILE_MB: raise HTTPException(status_code=413, detail="File too large") tmp_path = UPLOAD_FOLDER / fname with open(tmp_path, "wb") as f: f.write(contents) try: df = None print('ext', ext) if ext == ".csv": df = read_csv(str(tmp_path)) elif ext == ".xlsx": df = read_csv(str(tmp_path)) elif ext == ".pdf": tbl = read_pdf(tmp_path) if len(tbl) > 1: response = { "message": "File berhasil dibaca dan dianalisis.", "tables": tbl, "file_type": ext } return JSONResponse(content=response) else: df = convert_df(tbl[0]) elif ext == ".zip": zip_type = detect_zip_type(str(tmp_path)) if zip_type == "shp": print("[INFO] ZIP terdeteksi sebagai Shapefile.") df = read_shp(str(tmp_path)) elif zip_type == "gdb": print("[INFO] ZIP terdeteksi sebagai Geodatabase (GDB).") df = read_gdb(str(tmp_path)) else: raise HTTPException( status_code=400, detail="ZIP file tidak mengandung SHP atau GDB yang valid." ) else: raise HTTPException(status_code=400, detail="Unsupported file type") if df is None or (hasattr(df, "empty") and df.empty): return JSONResponse({"error": "No valid table detected"}, status_code=400) result = detect_and_build_geometry(df, master_polygons=None) if not hasattr(result, "geometry") or result.geometry.isna().all(): result = attach_polygon_geometry_auto(result) if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \ if not result.empty else "None" null_geom = result.geometry.isna().sum() print(f"[INFO] Tipe Geometry: {geom_type}") print(f"[INFO] Jumlah geometry kosong: {null_geom}") else: response = { "message": "Tidak menemukan tabel yang relevan.", "file_type": ext, "rows": 0, "columns": 0, "geometry_valid": 0, "geometry_empty": 0, "geometry_valid_percent": 0, "warnings": [], "warning_examples": [], "preview": [] } return JSONResponse(content=response) tmp_path.unlink(missing_ok=True) result = result.replace([pd.NA, float('inf'), float('-inf')], None) if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns: result['geometry'] = result['geometry'].apply( lambda g: g.wkt if g is not None else None ) empty_count = result['geometry'].apply(is_geom_empty).sum() valid_count = len(result) - empty_count match_percentage = (valid_count / len(result)) * 100 warnings = [] if empty_count > 0: warnings.append( f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid " f"({100 - match_percentage:.2f}% data gagal cocok)." ) if empty_count > 0: examples = result[result['geometry'].apply(is_geom_empty)].head(500) warning_examples = examples.to_dict(orient="records") else: warning_examples = [] preview_data = result.to_dict(orient="records") preview_safe = [ {k: safe_json(v) for k, v in row.items()} for row in preview_data ] warning_safe = [ {k: safe_json(v) for k, v in row.items()} for row in warning_examples ] response = { "message": "File berhasil dibaca dan dianalisis.", "rows": int(len(result)), "columns": list(map(str, result.columns)), "geometry_valid": int(valid_count), "geometry_empty": int(empty_count), "geometry_valid_percent": float(round(match_percentage, 2)), "warnings": warnings, "warning_examples": warning_safe, "preview": preview_safe } return JSONResponse(content=response) except Exception as e: print(f"[ERROR] {e}") return JSONResponse({"error": str(e)}, status_code=500) # finally: # db_session.close() class PdfRequest(BaseModel): title: str columns: List[str] rows: List[List] @app.post("/process-pdf") async def upload_file(payload: PdfRequest): try: df = convert_df(payload.model_dump()) if df is None or (hasattr(df, "empty") and df.empty): return JSONResponse({"error": "No valid table detected"}, status_code=400) result = detect_and_build_geometry(df, master_polygons=None) if not hasattr(result, "geometry") or result.geometry.isna().all(): print("[INFO] Mencoba menambahkan geometry (MultiPolygon) berdasarkan nama wilayah...") result = attach_polygon_geometry_auto(result) if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \ if not result.empty else "None" null_geom = result.geometry.isna().sum() print(f"[INFO] Tipe Geometry: {geom_type}") print(f"[INFO] Jumlah geometry kosong: {null_geom}") else: print("[WARN] Object bukan GeoDataFrame atau tidak punya kolom geometry.") print(f"[DEBUG] Kolom saat ini: {list(result.columns)}") response = { "message": "Tidak menemukan tabel yang relevan.", "file_type": ".pdf", "rows": 0, "columns": 0, "geometry_valid": 0, "geometry_empty": 0, "geometry_valid_percent": 0, "warnings": [], "warning_examples": [], "preview": [] } return JSONResponse(content=response) result = result.replace([pd.NA, float('inf'), float('-inf')], None) if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns: result['geometry'] = result['geometry'].apply( lambda g: g.wkt if g is not None else None ) empty_count = result['geometry'].apply(is_geom_empty).sum() valid_count = len(result) - empty_count match_percentage = (valid_count / len(result)) * 100 warnings = [] if empty_count > 0: warnings.append( f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid " f"({100 - match_percentage:.2f}% data gagal cocok)." ) if empty_count > 0: examples = result[result['geometry'].apply(is_geom_empty)].head(500) warning_examples = examples.to_dict(orient="records") else: warning_examples = [] # preview_data = result.head(5).to_dict(orient="records") preview_data = result.to_dict(orient="records") preview_safe = [ {k: safe_json(v) for k, v in row.items()} for row in preview_data ] warning_safe = [ {k: safe_json(v) for k, v in row.items()} for row in warning_examples ] response = { "message": "File berhasil dibaca dan dianalisis.", "rows": int(len(result)), "columns": list(map(str, result.columns)), "geometry_valid": int(valid_count), "geometry_empty": int(empty_count), "geometry_valid_percent": float(round(match_percentage, 2)), "warnings": warnings, "warning_examples": warning_safe, "preview": preview_safe } return JSONResponse(content=response) except Exception as e: print(f"[ERROR] {e}") return JSONResponse({"error": str(e)}, status_code=500) # finally: # db_session.close() VALID_WKT_PREFIXES = ( "POINT", "POINT Z", "POINT M", "POINT ZM", "MULTIPOINT", "MULTIPOINT Z", "MULTIPOINT M", "MULTIPOINT ZM", "LINESTRING", "LINESTRING Z", "LINESTRING M", "LINESTRING ZM", "MULTILINESTRING", "MULTILINESTRING Z", "MULTILINESTRING M", "MULTILINESTRING ZM", "POLYGON", "POLYGON Z", "POLYGON M", "POLYGON ZM", "MULTIPOLYGON", "MULTIPOLYGON Z", "MULTIPOLYGON M", "MULTIPOLYGON ZM", "GEOMETRYCOLLECTION", "GEOMETRYCOLLECTION Z", "GEOMETRYCOLLECTION M", "GEOMETRYCOLLECTION ZM", ) class UploadRequest(BaseModel): title: str rows: List[dict] columns: List[str] @app.post("/upload_to_postgis") def upload_to_postgis(payload: UploadRequest): try: table_name = payload.title.lower().replace(" ", "_").replace("-","_") df = pd.DataFrame(payload.rows) print(f"[INFO] Diterima {len(df)} baris data dari frontend.") if "geometry" in df.columns: df["geometry"] = df["geometry"].apply( lambda g: wkt.loads(g) if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) else None ) gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") else: raise HTTPException(status_code=400, detail="Kolom geometry tidak ditemukan dalam data.") with engine.begin() as conn: conn.execute(text(f"DROP TABLE IF EXISTS {table_name}")) gdf.to_postgis(table_name, engine, if_exists="replace", index=False) with engine.begin() as conn: conn.execute(text(f'ALTER TABLE "{table_name}" ADD COLUMN _id SERIAL PRIMARY KEY;')) print(f"[INFO] Tabel '{table_name}' berhasil dibuat di PostGIS ({len(gdf)} baris).") return { "table_name": table_name, "status": "success", "message": f"Tabel '{table_name}' berhasil diunggah ke PostGIS.", "total_rows": len(gdf), "geometry_type": list(gdf.geom_type.unique()) } except Exception as e: print(f"[ERROR] Gagal upload ke PostGIS: {e}") raise HTTPException(status_code=500, detail=str(e))