import os import pandas as pd import geopandas as gpd import numpy as np import zipfile from shapely.geometry.base import BaseGeometry from shapely.geometry import base as shapely_base from fastapi import File, Form, UploadFile, HTTPException from fastapi.responses import JSONResponse from core.config import UPLOAD_FOLDER, MAX_FILE_MB, VALID_WKT_PREFIXES from services.upload_file.read_csv.reader_csv import read_csv from services.upload_file.read_shp.reader_shp import read_shp from services.upload_file.read_gdb.reader_gdb import read_gdb from services.upload_file.read_mpk.reader_mpk import read_mpk from services.upload_file.read_pdf.reader_pdf import convert_df, read_pdf from services.upload_file.geom_detector.geometry_detector import detect_and_build_geometry from services.upload_file.geom_detector.geometry_detector import attach_polygon_geometry_auto from database.connection import engine from database.models import Base from pydantic import BaseModel from typing import List, Optional from shapely import wkt from sqlalchemy import text Base.metadata.create_all(bind=engine) def is_geom_empty(g): if g is None: return True if isinstance(g, float) and pd.isna(g): return True if isinstance(g, BaseGeometry): return g.is_empty return False def safe_json(value): """Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable""" if isinstance(value, (np.int64, np.int32)): return int(value) if isinstance(value, (np.float64, np.float32)): return float(value) if isinstance(value, pd.Timestamp): return value.isoformat() if isinstance(value, shapely_base.BaseGeometry): return str(value) # convert to WKT string if pd.isna(value): return None return value def detect_zip_type(zip_path: str) -> str: with zipfile.ZipFile(zip_path, "r") as zip_ref: files = zip_ref.namelist() if any(f.lower().endswith(".gdb/") or ".gdb/" in f.lower() for f in files): return "gdb" if any(f.lower().endswith(ext) for ext in [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files): return "gdb" if any(f.lower().endswith(".shp") for f in files): return "shp" return "unknown" def process_data(df: pd.DataFrame, ext: str): result = detect_and_build_geometry(df, master_polygons=None) if not hasattr(result, "geometry") or result.geometry.isna().all(): result = attach_polygon_geometry_auto(result) if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns: geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \ if not result.empty else "None" null_geom = result.geometry.isna().sum() print(f"[INFO] Tipe Geometry: {geom_type}") print(f"[INFO] Jumlah geometry kosong: {null_geom}") else: response = { "message": "Tidak menemukan tabel yang relevan.", "file_type": ext, "rows": 0, "columns": 0, "geometry_valid": 0, "geometry_empty": 0, "geometry_valid_percent": 0, "warnings": [], "warning_examples": [], "preview": [] } return JSONResponse(content=response) result = result.replace([pd.NA, float('inf'), float('-inf')], None) if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns: result['geometry'] = result['geometry'].apply( lambda g: g.wkt if g is not None else None ) empty_count = result['geometry'].apply(is_geom_empty).sum() valid_count = len(result) - empty_count match_percentage = (valid_count / len(result)) * 100 warnings = [] if empty_count > 0: warnings.append( f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid " f"({100 - match_percentage:.2f}% data gagal cocok)." ) if empty_count > 0: examples = result[result['geometry'].apply(is_geom_empty)].head(500) warning_examples = examples.to_dict(orient="records") else: warning_examples = [] preview_data = result.to_dict(orient="records") preview_safe = [ {k: safe_json(v) for k, v in row.items()} for row in preview_data ] warning_safe = [ {k: safe_json(v) for k, v in row.items()} for row in warning_examples ] response = { "message": "File berhasil dibaca dan dianalisis.", "file_type": ext, "rows": int(len(result)), "columns": list(map(str, result.columns)), "geometry_valid": int(valid_count), "geometry_empty": int(empty_count), "geometry_valid_percent": float(round(match_percentage, 2)), "warnings": warnings, "warning_examples": warning_safe, "preview": preview_safe } # return JSONResponse(content=response) return response async def handle_upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form("")): fname = file.filename ext = os.path.splitext(fname)[1].lower() contents = await file.read() size_mb = len(contents) / (1024*1024) if size_mb > MAX_FILE_MB: raise HTTPException(status_code=413, detail="Ukuran File Terlalu Besar") tmp_path = UPLOAD_FOLDER / fname with open(tmp_path, "wb") as f: f.write(contents) try: df = None print('ext', ext) if ext == ".csv": df = read_csv(str(tmp_path)) elif ext == ".xlsx": df = read_csv(str(tmp_path), sheet) elif ext == ".mpk": df = read_mpk(str(tmp_path)) elif ext == ".pdf": tbl = read_pdf(tmp_path, page) if len(tbl) == 0: response = { "message": "Tidak ditemukan tabel valid", "tables": tbl, "file_type": ext } return JSONResponse(content=response) elif len(tbl) > 1: response = { "message": "File berhasil dibaca dan dianalisis.", "tables": tbl, "file_type": ext } return JSONResponse(content=response) else: df = convert_df(tbl[0]) elif ext == ".zip": zip_type = detect_zip_type(str(tmp_path)) if zip_type == "shp": print("[INFO] ZIP terdeteksi sebagai Shapefile.") df = read_shp(str(tmp_path)) elif zip_type == "gdb": print("[INFO] ZIP terdeteksi sebagai Geodatabase (GDB).") df = read_gdb(str(tmp_path)) else: raise HTTPException( status_code=400, detail="ZIP file tidak mengandung SHP atau GDB yang valid." ) else: raise HTTPException(status_code=400, detail="Unsupported file type") if df is None or (hasattr(df, "empty") and df.empty): return JSONResponse({"error": "No valid table detected"}, status_code=400) res = process_data(df, ext) tmp_path.unlink(missing_ok=True) return JSONResponse(content=res) except Exception as e: print(f"[ERROR] {e}") return JSONResponse({"error": str(e)}, status_code=500) # finally: # db_session.close() class PdfRequest(BaseModel): title: str columns: List[str] rows: List[List] async def handle_process_pdf(payload: PdfRequest): try: df = convert_df(payload.model_dump()) if df is None or (hasattr(df, "empty") and df.empty): return JSONResponse({"error": "No valid table detected"}, status_code=400) res = process_data(df, '.pdf') return JSONResponse(content=res) except Exception as e: print(f"[ERROR] {e}") return JSONResponse({"error": str(e)}, status_code=500) # finally: # db_session.close() class UploadRequest(BaseModel): title: str rows: List[dict] columns: List[str] def handle_to_postgis(payload: UploadRequest): try: table_name = payload.title.lower().replace(" ", "_").replace("-","_") df = pd.DataFrame(payload.rows) print(f"[INFO] Diterima {len(df)} baris data dari frontend.") if "geometry" in df.columns: df["geometry"] = df["geometry"].apply( lambda g: wkt.loads(g) if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) else None ) gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") else: raise HTTPException(status_code=400, detail="Kolom geometry tidak ditemukan dalam data.") with engine.begin() as conn: conn.execute(text(f"DROP TABLE IF EXISTS {table_name}")) gdf.to_postgis(table_name, engine, if_exists="replace", index=False) with engine.begin() as conn: conn.execute(text(f'ALTER TABLE "{table_name}" ADD COLUMN _id SERIAL PRIMARY KEY;')) print(f"[INFO] Tabel '{table_name}' berhasil dibuat di PostGIS ({len(gdf)} baris).") return { "table_name": table_name, "status": "success", "message": f"Tabel '{table_name}' berhasil diunggah ke PostGIS.", "total_rows": len(gdf), "geometry_type": list(gdf.geom_type.unique()) } except Exception as e: print(f"[ERROR] Gagal upload ke PostGIS: {e}") raise HTTPException(status_code=500, detail=str(e))