file_table_reader/services/upload_file/upload.py
2025-11-17 10:53:15 +07:00

620 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import pandas as pd
import geopandas as gpd
import numpy as np
import re
import zipfile
from shapely.geometry.base import BaseGeometry
from shapely.geometry import base as shapely_base
from fastapi import File, Form, UploadFile, HTTPException
from core.config import UPLOAD_FOLDER, MAX_FILE_MB, VALID_WKT_PREFIXES
from services.upload_file.readers.reader_csv import read_csv
from services.upload_file.readers.reader_shp import read_shp
from services.upload_file.readers.reader_gdb import read_gdb
from services.upload_file.readers.reader_mpk import read_mpk
from services.upload_file.readers.reader_pdf import convert_df, read_pdf
from services.upload_file.utils.geometry_detector import detect_and_build_geometry
from services.upload_file.utils.geometry_detector import attach_polygon_geometry_auto
from database.connection import engine
from database.models import Base
from pydantic import BaseModel
from typing import List, Optional
from shapely import wkt
from sqlalchemy import text
from datetime import datetime
from response import successRes, errorRes
# Base.metadata.create_all(bind=engine)
def is_geom_empty(g):
if g is None:
return True
if isinstance(g, float) and pd.isna(g):
return True
if isinstance(g, BaseGeometry):
return g.is_empty
return False
def safe_json(value):
"""Konversi aman untuk semua tipe numpy/pandas/shapely ke tipe JSON-serializable"""
if isinstance(value, (np.int64, np.int32)):
return int(value)
if isinstance(value, (np.float64, np.float32)):
return float(value)
if isinstance(value, pd.Timestamp):
return value.isoformat()
if isinstance(value, shapely_base.BaseGeometry):
return str(value) # convert to WKT string
if pd.isna(value):
return None
return value
def detect_zip_type(zip_path: str) -> str:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
files = zip_ref.namelist()
if any(f.lower().endswith(".gdb/") or ".gdb/" in f.lower() for f in files):
return "gdb"
if any(f.lower().endswith(ext) for ext in [".gdbtable", ".gdbtablx", ".gdbindexes", ".spx"] for f in files):
return "gdb"
if any(f.lower().endswith(".shp") for f in files):
return "shp"
return "unknown"
def process_data(df: pd.DataFrame, ext: str):
result = detect_and_build_geometry(df, master_polygons=None)
if not hasattr(result, "geometry") or result.geometry.isna().all():
result = attach_polygon_geometry_auto(result)
if isinstance(result, gpd.GeoDataFrame) and "geometry" in result.columns:
geom_type = ", ".join([g for g in result.geometry.geom_type.unique() if g]) \
if not result.empty else "None"
null_geom = result.geometry.isna().sum()
print(f"[INFO] Tipe Geometry: {geom_type}")
print(f"[INFO] Jumlah geometry kosong: {null_geom}")
else:
res = {
"message": "Tidak menemukan tabel yang relevan.",
"file_type": ext,
"rows": 0,
"columns": 0,
"geometry_valid": 0,
"geometry_empty": 0,
"geometry_valid_percent": 0,
"warnings": [],
"warning_examples": [],
"preview": []
}
return errorRes(message="Tidak berhasil mencocokan geometry pada tabel." ,details=res, status_code=422)
result = result.replace([pd.NA, float('inf'), float('-inf')], None)
if isinstance(result, gpd.GeoDataFrame) and 'geometry' in result.columns:
result['geometry'] = result['geometry'].apply(
lambda g: g.wkt if g is not None else None
)
empty_count = result['geometry'].apply(is_geom_empty).sum()
valid_count = len(result) - empty_count
match_percentage = (valid_count / len(result)) * 100
warnings = []
if empty_count > 0:
warnings.append(
f"{empty_count} dari {len(result)} baris tidak memiliki geometry yang valid "
f"({100 - match_percentage:.2f}% data gagal cocok)."
)
if empty_count > 0:
examples = result[result['geometry'].apply(is_geom_empty)].head(500)
warning_examples = examples.to_dict(orient="records")
else:
warning_examples = []
preview_data = result.head(10).to_dict(orient="records")
# preview_data = result.to_dict(orient="records")
preview_safe = [
{k: safe_json(v) for k, v in row.items()} for row in preview_data
]
warning_safe = [
{k: safe_json(v) for k, v in row.items()} for row in warning_examples
]
response = {
"message": "File berhasil dibaca dan dianalisis.",
"file_type": ext,
"rows": int(len(result)),
"columns": list(map(str, result.columns)),
"geometry_valid": int(valid_count),
"geometry_empty": int(empty_count),
"geometry_valid_percent": float(round(match_percentage, 2)),
"warnings": warnings,
"warning_examples": warning_safe,
"preview": preview_safe
}
# return successRes(content=response)
return response
async def handle_upload_file(file: UploadFile = File(...), page: Optional[str] = Form(""), sheet: Optional[str] = Form("")):
fname = file.filename
ext = os.path.splitext(fname)[1].lower()
contents = await file.read()
size_mb = len(contents) / (1024*1024)
if size_mb > MAX_FILE_MB:
raise errorRes(status_code=413, message="Ukuran File Terlalu Besar")
tmp_path = UPLOAD_FOLDER / fname
with open(tmp_path, "wb") as f:
f.write(contents)
try:
df = None
print('ext', ext)
if ext == ".csv":
df = read_csv(str(tmp_path))
elif ext == ".xlsx":
df = read_csv(str(tmp_path), sheet)
elif ext == ".mpk":
df = read_mpk(str(tmp_path))
elif ext == ".pdf":
tbl = read_pdf(tmp_path, page)
if len(tbl) == 0:
res = {
"message": "Tidak ditemukan tabel valid",
"tables": {},
"file_type": ext
}
return successRes(message="Tidak ditemukan tabel valid", data=res)
elif len(tbl) > 1:
res = {
"message": "File berhasil dibaca dan dianalisis.",
"tables": tbl,
"file_type": ext
}
return successRes(data=res, message="File berhasil dibaca dan dianalisis.")
else:
df = convert_df(tbl[0])
elif ext == ".zip":
zip_type = detect_zip_type(str(tmp_path))
if zip_type == "shp":
print("[INFO] ZIP terdeteksi sebagai Shapefile.")
df = read_shp(str(tmp_path))
elif zip_type == "gdb":
print("[INFO] ZIP terdeteksi sebagai Geodatabase (GDB).")
df = read_gdb(str(tmp_path))
else:
raise errorRes(
status_code=400,
message="ZIP file tidak mengandung SHP atau GDB yang valid."
)
else:
raise errorRes(status_code=400, message="Unsupported file type")
if df is None or (hasattr(df, "empty") and df.empty):
return successRes(message="File berhasil dibaca, Tetapi tidak ditemukan tabel valid")
res = process_data(df, ext)
tmp_path.unlink(missing_ok=True)
return successRes(data=res)
except Exception as e:
print(f"[ERROR] {e}")
return errorRes(
message="Internal Server Error",
details=str(e),
status_code=500
)
# finally:
# db_session.close()
class PdfRequest(BaseModel):
title: str
columns: List[str]
rows: List[List]
async def handle_process_pdf(payload: PdfRequest):
try:
df = convert_df(payload.model_dump())
if df is None or (hasattr(df, "empty") and df.empty):
return errorRes(message="Tidak ada tabel")
res = process_data(df, '.pdf')
return successRes(data=res)
except Exception as e:
print(f"[ERROR] {e}")
return errorRes(message="Internal Server Error", details= str(e), status_code=500)
# finally:
# db_session.close()
class UploadRequest(BaseModel):
title: str
rows: List[dict]
columns: List[str]
# import time
# def handle_to_postgis(payload: UploadRequest):
# # time.sleep(2)
# # return {
# # "table_name": 'test',
# # "status": "success",
# # "message": f"Tabel test berhasil diunggah ke PostGIS.",
# # "total_rows": 999,
# # "geometry_type": 'POINT'
# # }
# try:
# table_name = payload.title.lower().replace(" ", "_").replace("-","_")
# df = pd.DataFrame(payload.rows)
# print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# if "geometry" in df.columns:
# df["geometry"] = df["geometry"].apply(
# lambda g: wkt.loads(g) if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) else None
# )
# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# else:
# raise HTTPException(status_code=400, detail="Kolom geometry tidak ditemukan dalam data.")
# with engine.begin() as conn:
# conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
# gdf.to_postgis(table_name, engine, if_exists="replace", index=False)
# with engine.begin() as conn:
# conn.execute(text(f'ALTER TABLE "{table_name}" ADD COLUMN _id SERIAL PRIMARY KEY;'))
# print(f"[INFO] Tabel '{table_name}' berhasil dibuat di PostGIS ({len(gdf)} baris).")
# return {
# "table_name": table_name,
# "status": "success",
# "message": f"Tabel '{table_name}' berhasil diunggah ke PostGIS.",
# "total_rows": len(gdf),
# "geometry_type": list(gdf.geom_type.unique())
# }
# except Exception as e:
# print(f"[ERROR] Gagal upload ke PostGIS: {e}")
# raise HTTPException(status_code=500, detail=str(e))
# VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING")
# def handle_to_postgis(payload: UploadRequest, user_id: int = 2):
# try:
# table_name = "test_partition"
# df = pd.DataFrame(payload.rows)
# print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# if "geometry" not in df.columns:
# raise HTTPException(status_code=400, detail="Kolom geometry tidak ditemukan dalam data.")
# df["geometry"] = df["geometry"].apply(
# lambda g: wkt.loads(g) if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES) else None
# )
# gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# insert_count = 0
# with engine.begin() as conn:
# # 💡 Tambahkan blok auto-create partisi di sini
# conn.execute(text(f"""
# DO $$
# BEGIN
# IF NOT EXISTS (
# SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}'
# ) THEN
# EXECUTE format('
# CREATE TABLE test_partition_user_%s
# PARTITION OF test_partition
# FOR VALUES IN (%s);
# ', {user_id}, {user_id});
# EXECUTE format('CREATE INDEX ON test_partition_user_%s USING GIST (geom);', {user_id});
# EXECUTE format('CREATE INDEX ON test_partition_user_%s USING GIN (properties);', {user_id});
# END IF;
# END
# $$;
# """))
# # Lanjut insert data seperti biasa
# for _, row in gdf.iterrows():
# geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None
# properties = row.drop(labels=["geometry"]).to_dict()
# conn.execute(
# text("""
# INSERT INTO test_partition (user_id, geom, properties, created_at)
# VALUES (:user_id, ST_Force2D(ST_GeomFromText(:geom, 4326)), CAST(:properties AS jsonb), :created_at)
# """),
# {
# "user_id": user_id,
# "geom": geom_wkt,
# "properties": json.dumps(properties),
# "created_at": datetime.utcnow()
# }
# )
# insert_count += 1
# print(f"[INFO] Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id}.")
# return {
# "table_name": table_name,
# "user_id": user_id,
# "status": "success",
# "message": f"Data berhasil ditambahkan ke partisi user_id={user_id}.",
# "total_rows": insert_count,
# "geometry_type": list(gdf.geom_type.unique())
# }
# except Exception as e:
# print(f"[ERROR] Gagal upload ke PostGIS partition: {e}")
# raise HTTPException(status_code=500, detail=str(e))
# Daftar prefix WKT yang valid
VALID_WKT_PREFIXES = ("POINT", "LINESTRING", "POLYGON", "MULTIPOLYGON", "MULTILINESTRING")
def slugify(value: str) -> str:
"""Mengubah judul dataset jadi nama aman untuk VIEW"""
return re.sub(r'[^a-zA-Z0-9]+', '_', value.lower()).strip('_')
# async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str):
# """Membuat VIEW PostgreSQL berdasarkan metadata dataset dan registrasi geometry untuk QGIS."""
# norm_title = slugify(title)
# view_name = f"v_user_{user_id}_{norm_title}"
# base_table = f"test_partition_user_{user_id}"
# # 1⃣ Hapus view lama jika ada
# drop_query = text(f"DROP VIEW IF EXISTS {view_name} CASCADE;")
# await conn.execute(drop_query)
# # 2⃣ Buat view baru
# create_view_query = text(f"""
# CREATE OR REPLACE VIEW {view_name} AS
# SELECT p.*, m.title, m.year, m.description
# FROM {base_table} p
# JOIN dataset_metadata m ON m.id = p.metadata_id
# WHERE p.metadata_id = {metadata_id};
# """)
# await conn.execute(create_view_query)
# # 3⃣ Daftarkan geometry column agar QGIS mengenali layer ini
# # (gunakan Populate_Geometry_Columns jika PostGIS >= 3)
# populate_query = text(f"SELECT Populate_Geometry_Columns('{view_name}'::regclass);")
# await conn.execute(populate_query)
# print(f"[INFO] VIEW {view_name} berhasil dibuat dan geometry terdaftar.")
async def create_dataset_view_from_metadata(conn, metadata_id: int, user_id: int, title: str):
"""Buat VIEW dinamis sesuai struktur atribut JSON pada dataset (hindari duplikasi nama kolom)."""
norm_title = slugify(title)
view_name = f"v_user_{user_id}_{norm_title}"
base_table = f"test_partition_user_{user_id}"
# Ambil daftar field dari metadata
result = await conn.execute(text("SELECT fields FROM dataset_metadata WHERE id=:mid"), {"mid": metadata_id})
fields_json = result.scalar_one_or_none()
# --- daftar kolom bawaan dari tabel utama
base_columns = {"id", "user_id", "metadata_id", "geom"}
columns_sql = ""
field_list = []
if fields_json:
try:
# handle jika data sudah berupa list atau string JSON
if isinstance(fields_json, str):
field_list = json.loads(fields_json)
elif isinstance(fields_json, list):
field_list = fields_json
else:
raise ValueError(f"Tipe data fields_json tidak dikenali: {type(fields_json)}")
for f in field_list:
safe_col = slugify(f)
# Hindari duplikat nama dengan kolom utama
if safe_col in base_columns:
alias_name = f"attr_{safe_col}"
else:
alias_name = safe_col
columns_sql += f", p.attributes->>'{f}' AS {alias_name}"
except Exception as e:
print(f"[WARN] Gagal parse field list metadata: {e}")
# 1⃣ Drop view lama
await conn.execute(text(f"DROP VIEW IF EXISTS {view_name} CASCADE;"))
# 2⃣ Buat view baru dinamis
create_view_query = f"""
CREATE OR REPLACE VIEW {view_name} AS
SELECT p.id, p.user_id, p.metadata_id, p.geom
{columns_sql},
m.title, m.year, m.description
FROM {base_table} p
JOIN dataset_metadata m ON m.id = p.metadata_id
WHERE p.metadata_id = {metadata_id};
"""
await conn.execute(text(create_view_query))
# 3⃣ Register geometry untuk QGIS
await conn.execute(text(f"SELECT Populate_Geometry_Columns('{view_name}'::regclass);"))
print(f"[INFO] VIEW {view_name} berhasil dibuat (kolom dinamis: {field_list if field_list else '(none)'}).")
async def handle_to_postgis(payload, engine, user_id: int = 3):
"""
Menangani upload data spasial ke PostGIS (dengan partition per user).
- Jika partisi belum ada, akan dibuat otomatis
- Metadata dataset disimpan di tabel dataset_metadata
- Data spasial dimasukkan ke tabel partisi (test_partition_user_{id})
- VIEW otomatis dibuat untuk QGIS
"""
try:
df = pd.DataFrame(payload.rows)
print(f"[INFO] Diterima {len(df)} baris data dari frontend.")
# --- Validasi kolom geometry ---
if "geometry" not in df.columns:
raise errorRes(status_code=400, message="Kolom 'geometry' tidak ditemukan dalam data.")
# --- Parsing geometry ke objek shapely ---
df["geometry"] = df["geometry"].apply(
lambda g: wkt.loads(g)
if isinstance(g, str) and g.strip().upper().startswith(VALID_WKT_PREFIXES)
else None
)
# --- Buat GeoDataFrame ---
gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# --- Metadata info dari payload ---
# dataset_title = getattr(payload, "dataset_title", None)
# dataset_year = getattr(payload, "dataset_year", None)
# dataset_desc = getattr(payload, "dataset_description", None)
dataset_title = "longsor 2020"
dataset_year = 2020
dataset_desc = "test metadata"
if not dataset_title:
raise errorRes(status_code=400, detail="Field 'dataset_title' wajib ada untuk metadata.")
async with engine.begin() as conn:
fields = [col for col in df.columns if col != "geometry"]
# 💾 1⃣ Simpan Metadata Dataset
print("[INFO] Menyimpan metadata dataset...")
result = await conn.execute(
text("""
INSERT INTO dataset_metadata (user_id, title, year, description, fields, created_at)
VALUES (:user_id, :title, :year, :desc, :fields, :created_at)
RETURNING id;
"""),
{
"user_id": user_id,
"title": dataset_title,
"year": dataset_year,
"desc": dataset_desc,
"fields": json.dumps(fields),
"created_at": datetime.utcnow(),
},
)
metadata_id = result.scalar_one()
print(f"[INFO] Metadata disimpan dengan ID {metadata_id}")
# ⚙️ 2⃣ Auto-create Partisi Jika Belum Ada
print(f"[INFO] Memastikan partisi test_partition_user_{user_id} tersedia...")
await conn.execute(
text(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_tables WHERE tablename = 'test_partition_user_{user_id}'
) THEN
EXECUTE format('
CREATE TABLE test_partition_user_%s
PARTITION OF test_partition
FOR VALUES IN (%s);
', {user_id}, {user_id});
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_geom ON test_partition_user_%s USING GIST (geom);', {user_id}, {user_id});
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_partition_user_%s_metadata ON test_partition_user_%s (metadata_id);', {user_id}, {user_id});
END IF;
END
$$;
""")
)
# 🧩 3⃣ Insert Data Spasial ke Partisi
print(f"[INFO] Memasukkan data ke test_partition_user_{user_id} ...")
insert_count = 0
for _, row in gdf.iterrows():
geom_wkt = row["geometry"].wkt if row["geometry"] is not None else None
attributes = row.drop(labels=["geometry"]).to_dict()
await conn.execute(
text("""
INSERT INTO test_partition (user_id, metadata_id, geom, attributes, created_at)
VALUES (:user_id, :metadata_id, ST_Force2D(ST_GeomFromText(:geom, 4326)),
CAST(:attr AS jsonb), :created_at);
"""),
{
"user_id": user_id,
"metadata_id": metadata_id,
"geom": geom_wkt,
"attr": json.dumps(attributes),
"created_at": datetime.utcnow(),
},
)
insert_count += 1
# 🧩 4⃣ Membuat VIEW untuk dataset baru di QGIS
await create_dataset_view_from_metadata(conn, metadata_id, user_id, dataset_title)
print(f"[INFO] ✅ Berhasil memasukkan {insert_count} baris ke partisi user_id={user_id} (metadata_id={metadata_id}).")
return {
"status": "success",
"user_id": user_id,
"metadata_id": metadata_id,
"dataset_title": dataset_title,
"inserted_rows": insert_count,
"geometry_type": list(gdf.geom_type.unique()),
}
except Exception as e:
print(f"[ERROR] Gagal upload ke PostGIS partition: {e}")
raise errorRes(status_code=500, message="Gagal upload ke PostGIS partition", details=str(e))