init qgis headless

This commit is contained in:
DmsAnhr 2025-11-25 15:33:38 +07:00
commit b46b1bb4ba
8 changed files with 985 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__/

123
cleansing_service.py Normal file
View File

@ -0,0 +1,123 @@
from qgis.core import (
QgsVectorLayer,
QgsVectorLayerExporter,
QgsVectorFileWriter
)
import processing
from typing import Dict
from database import build_uri
def load_layer(table_name: str):
uri = build_uri(table_name)
print('uri',uri)
layer = QgsVectorLayer(uri, table_name, "postgres")
print("Layer valid:", layer.isValid())
# print("Error:", layer.error().summary())
# print("FIELDS:", [f.name() for f in layer.fields()])
return layer
def cleansing_layer(layer: QgsVectorLayer) -> Dict:
summary = {
"total_features_before": layer.featureCount(),
"invalid_geometries_before": 0,
"invalid_geometries_fixed": 0,
"duplicates_removed": 0,
}
# -------------------------
# 1. IDENTIFY INVALID GEOMETRY
# -------------------------
invalid_ids = []
for f in layer.getFeatures():
if not f.geometry().isGeosValid():
invalid_ids.append(f.id())
summary["invalid_geometries_before"] = len(invalid_ids)
# -------------------------
# 2. FIX INVALID GEOMETRY
# -------------------------
fixed = processing.run(
"native:fixgeometries",
{
"INPUT": layer,
"OUTPUT": "memory:"
}
)["OUTPUT"]
summary["invalid_geometries_fixed"] = len(invalid_ids)
# -------------------------
# 3. AUTO DETECT DUPLICATE KEY FIELD
# -------------------------
all_fields = [f.name() for f in fixed.fields()]
print("Detecting key fields:", all_fields)
key_fields = None
# (1) Prefer kolom "id"
if "id" in all_fields:
key_fields = ["id"]
# (2) Jika tidak ada, pilih kolom integer pertama
if key_fields is None:
int_cols = [f.name() for f in fixed.fields()
if f.typeName().lower() in ["int", "integer", "bigint"]]
if int_cols:
key_fields = [int_cols[0]]
# (3) Jika tetap tidak ada, pakai semua kolom
if key_fields is None:
key_fields = all_fields
print("Using key field:", key_fields)
# -------------------------
# 4. REMOVE DUPLICATES BY ATTRIBUTE
# -------------------------
dedup = processing.run(
"native:removeduplicatesbyattribute",
{
"INPUT": fixed,
"FIELDS": key_fields,
"METHOD": 0,
"OUTPUT": "memory:"
}
)["OUTPUT"]
summary["duplicates_removed"] = (
fixed.featureCount() - dedup.featureCount()
)
# -------------------------
# 5. TRIM STRING FIELDS
# -------------------------
trimmed = processing.run(
"qgis:refactorfields",
{
"INPUT": dedup,
"FIELDS_MAPPING": [
{
"expression": f"trim(\"{field.name()}\")"
if field.typeName().lower() in ["text", "varchar"]
else f"\"{field.name()}\"",
"name": field.name(),
"type": field.type(),
"length": field.length(),
"precision": field.precision()
}
for field in dedup.fields()
],
"OUTPUT": "memory:"
}
)["OUTPUT"]
# -------------------------
# RESULT
# -------------------------
return {
"summary": summary,
"clean_layer": trimmed
}

34
data/exmpl.geojson Normal file
View File

@ -0,0 +1,34 @@
{ "type": "FeatureCollection",
"features": [
{ "type": "Feature",
"geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
"properties": {"prop0": "value0"}
},
{ "type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
]
},
"properties": {
"prop0": "value0",
"prop1": 0.0
}
},
{ "type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
[100.0, 1.0], [100.0, 0.0] ]
]
},
"properties": {
"prop0": "value0",
"prop1": {"this": "that"}
}
}
]
}

21
database.py Normal file
View File

@ -0,0 +1,21 @@
POSTGIS = {
"host": "192.168.60.24",
"port": "5432",
"db": "test_postgis",
"user": "postgres",
"password": "12345"
}
def build_uri(table_name: str) -> str:
return (
f"dbname='{POSTGIS['db']}' "
f"host='{POSTGIS['host']}' "
f"port='{POSTGIS['port']}' "
f"user='{POSTGIS['user']}' "
f"password='{POSTGIS['password']}' "
f"sslmode=disable "
f"table=\"public\".\"{table_name}\" "
f"key='_id'"
)

597
full_cleansing_service.py Normal file
View File

@ -0,0 +1,597 @@
from qgis.core import (
QgsVectorLayer,
QgsVectorLayerExporter,
QgsVectorFileWriter
)
import processing
from typing import Dict
from database import build_uri
def load_layer(table_name: str):
uri = build_uri(table_name)
print('uri', uri)
layer = QgsVectorLayer(uri, table_name, "postgres")
print("Layer valid:", layer.isValid())
return layer
# def cleansing_layer(layer: QgsVectorLayer) -> Dict:
# summary = {
# "total_features_before": layer.featureCount(),
# "invalid_geometries_before": 0,
# "invalid_geometries_fixed": 0,
# "duplicates_removed": 0,
# "sliver_removed": 0,
# "holes_removed": 0
# }
# # ========================================================
# # 1. IDENTIFY INVALID GEOMETRY
# # ========================================================
# invalid_ids = []
# for f in layer.getFeatures():
# if not f.geometry().isGeosValid():
# invalid_ids.append(f.id())
# summary["invalid_geometries_before"] = len(invalid_ids)
# # ========================================================
# # 2. FIX GEOMETRIES
# # ========================================================
# fixed = processing.run(
# "native:fixgeometries",
# {
# "INPUT": layer,
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# summary["invalid_geometries_fixed"] = len(invalid_ids)
# # ========================================================
# # 3. ENSURE MULTIPOLYGON
# # ========================================================
# multipolygon = processing.run(
# "native:collect",
# {
# "INPUT": fixed,
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# # ========================================================
# # 4. REMOVE DUPLICATE ROWS
# # ========================================================
# all_fields = [f.name() for f in multipolygon.fields()]
# print("Detecting key fields:", all_fields)
# key_fields = None
# # (1) Prefer 'id'
# if "id" in all_fields:
# key_fields = ["id"]
# # (2) Else pick first integer field
# if key_fields is None:
# int_cols = [
# f.name() for f in multipolygon.fields()
# if f.typeName().lower() in ["int", "integer", "bigint"]
# ]
# if int_cols:
# key_fields = [int_cols[0]]
# # (3) Else use all fields
# if key_fields is None:
# key_fields = all_fields
# print("Using key field:", key_fields)
# dedup = processing.run(
# "native:removeduplicatesbyattribute",
# {
# "INPUT": multipolygon,
# "FIELDS": key_fields,
# "METHOD": 0,
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# summary["duplicates_removed"] = (
# multipolygon.featureCount() - dedup.featureCount()
# )
# # ========================================================
# # 5. REMOVE DUPLICATE VERTICES
# # ========================================================
# no_dup_vertices = processing.run(
# "native:removeduplicatevertices",
# {
# "INPUT": dedup,
# "VERTICES": 0, # remove exact duplicates
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# # ========================================================
# # 6. FIX SRID (REPROJECT IF NEEDED)
# # ========================================================
# # Force SRID to 4326
# reprojected = processing.run(
# "native:reprojectlayer",
# {
# "INPUT": no_dup_vertices,
# "TARGET_CRS": "EPSG:4326",
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# # ========================================================
# # 7. REMOVE SLIVER POLYGONS (< 1 m²)
# # ========================================================
# # Filter polygons with area < 1 (threshold bisa kamu ubah)
# slivers = processing.run(
# "native:extractbyexpression",
# {
# "INPUT": reprojected,
# "EXPRESSION": "$area < 1",
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# summary["sliver_removed"] = slivers.featureCount()
# # Keep only polygons with area >= 1
# no_sliver = processing.run(
# "native:extractbyexpression",
# {
# "INPUT": reprojected,
# "EXPRESSION": "$area >= 1",
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# # ========================================================
# # 8. REMOVE TINY HOLES (< 1 m²)
# # ========================================================
# no_holes = processing.run(
# "native:deleteholes",
# {
# "INPUT": no_sliver,
# "MIN_AREA": 1, # minimum area of hole to keep
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# summary["holes_removed"] = 0 # can't count holes easily in PyQGIS
# # ========================================================
# # 9. TRIM STRING FIELDS (ATTRIBUTE CLEANSING)
# # ========================================================
# trimmed = processing.run(
# "qgis:refactorfields",
# {
# "INPUT": no_holes,
# "FIELDS_MAPPING": [
# {
# "expression": f"trim(\"{field.name()}\")"
# if field.typeName().lower() in ["text", "varchar"]
# else f"\"{field.name()}\"",
# "name": field.name(),
# "type": field.type(),
# "length": field.length(),
# "precision": field.precision()
# }
# for field in no_holes.fields()
# ],
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# # ========================================================
# # RETURN CLEANED LAYER
# # ========================================================
# return {
# "summary": summary,
# "clean_layer": trimmed
# }
# def cleansing_layer(layer: QgsVectorLayer) -> Dict:
# # ========================================================
# # INITIAL STATE
# # ========================================================
# print("\n========== START CLEANSING ==========")
# print("Step 0: Load Layer")
# print(" - Valid:", layer.isValid())
# print(" - Feature Count:", layer.featureCount())
# summary = {
# "step0_features": layer.featureCount(),
# "step1_invalid_before": 0,
# "step2_after_fix": 0,
# "step3_after_multipolygon": 0,
# "step4_duplicates_removed": 0,
# "step5_after_remove_vertices": 0,
# "step6_after_srid": 0,
# "step7_sliver_removed": 0,
# "step8_after_deleteholes": 0
# }
# # ========================================================
# # 1. VALIDATE GEOMETRY
# # ========================================================
# print("\nStep 1: Identify invalid geometries")
# invalid_ids = []
# for f in layer.getFeatures():
# if not f.geometry().isGeosValid():
# invalid_ids.append(f.id())
# summary["step1_invalid_before"] = len(invalid_ids)
# print(" - Invalid geometries found:", len(invalid_ids))
# # ========================================================
# # 2. FIX GEOMETRIES
# # ========================================================
# print("\nStep 2: Fix geometries")
# fixed = processing.run(
# "native:fixgeometries",
# {"INPUT": layer, "OUTPUT": "memory:"}
# )["OUTPUT"]
# print(" - Valid:", fixed.isValid())
# print(" - Features after fix:", fixed.featureCount())
# summary["step2_after_fix"] = fixed.featureCount()
# # ========================================================
# # 3. ENSURE MULTIPOLYGON
# # ========================================================
# print("\nStep 3: Ensure MULTIPOLYGON")
# multipolygon = processing.run(
# "native:collect",
# {"INPUT": fixed, "OUTPUT": "memory:"}
# )["OUTPUT"]
# print(" - Valid:", multipolygon.isValid())
# print(" - Features:", multipolygon.featureCount())
# summary["step3_after_multipolygon"] = multipolygon.featureCount()
# # ========================================================
# # 4. REMOVE DUPLICATE ROWS
# # ========================================================
# print("\nStep 4: Remove duplicate rows")
# all_fields = [f.name() for f in multipolygon.fields()]
# print(" - All fields:", all_fields)
# key_fields = None
# if "id" in all_fields:
# key_fields = ["id"]
# else:
# int_cols = [
# f.name() for f in multipolygon.fields()
# if f.typeName().lower() in ["int", "integer", "bigint"]
# ]
# if int_cols:
# key_fields = [int_cols[0]]
# else:
# key_fields = all_fields
# print(" - Using duplicate key:", key_fields)
# dedup = processing.run(
# "native:removeduplicatesbyattribute",
# {"INPUT": multipolygon, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:"}
# )["OUTPUT"]
# duplicates_removed = multipolygon.featureCount() - dedup.featureCount()
# summary["step4_duplicates_removed"] = duplicates_removed
# print(" - Features before:", multipolygon.featureCount())
# print(" - Features after:", dedup.featureCount())
# print(" - Duplicates removed:", duplicates_removed)
# # ========================================================
# # 5. REMOVE DUPLICATE VERTICES
# # ========================================================
# print("\nStep 5: Remove duplicate vertices")
# no_dup_vertices = processing.run(
# "native:removeduplicatevertices",
# {"INPUT": dedup, "VERTICES": 0, "OUTPUT": "memory:"}
# )["OUTPUT"]
# print(" - Features:", no_dup_vertices.featureCount())
# summary["step5_after_remove_vertices"] = no_dup_vertices.featureCount()
# # ========================================================
# # 6. FIX SRID / REPROJECT
# # ========================================================
# print("\nStep 6: Reproject (Fix SRID to EPSG:4326)")
# reprojected = processing.run(
# "native:reprojectlayer",
# {"INPUT": no_dup_vertices, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"}
# )["OUTPUT"]
# print(" - Features:", reprojected.featureCount())
# summary["step6_after_srid"] = reprojected.featureCount()
# # ========================================================
# # 7. REMOVE SLIVER POLYGONS (< 1 m2)
# # ========================================================
# print("\nStep 7: Remove sliver polygons (<1 m²)")
# slivers = processing.run(
# "native:extractbyexpression",
# {"INPUT": reprojected, "EXPRESSION": "$area < 1", "OUTPUT": "memory:"}
# )["OUTPUT"]
# summary["step7_sliver_removed"] = slivers.featureCount()
# print(" - Slivers found:", slivers.featureCount())
# no_sliver = processing.run(
# "native:extractbyexpression",
# {"INPUT": reprojected, "EXPRESSION": "$area >= 1", "OUTPUT": "memory:"}
# )["OUTPUT"]
# print(" - Features left after removing slivers:", no_sliver.featureCount())
# # ========================================================
# # 8. REMOVE TINY HOLES (< 1 m2)
# # ========================================================
# print("\nStep 8: Remove tiny holes")
# no_holes = processing.run(
# "native:deleteholes",
# {"INPUT": no_sliver, "MIN_AREA": 1, "OUTPUT": "memory:"}
# )["OUTPUT"]
# print(" - Features:", no_holes.featureCount())
# summary["step8_after_deleteholes"] = no_holes.featureCount()
# # ========================================================
# # FINISH (TRIM ATTRIBUTES)
# # ========================================================
# print("\nFinal Step: Trim string fields")
# trimmed = processing.run(
# "qgis:refactorfields",
# {
# "INPUT": no_holes,
# "FIELDS_MAPPING": [
# {
# "expression": f"trim(\"{field.name()}\")"
# if field.typeName().lower() in ["text", "varchar"]
# else f"\"{field.name()}\"",
# "name": field.name(),
# "type": field.type(),
# "length": field.length(),
# "precision": field.precision()
# }
# for field in no_holes.fields()
# ],
# "OUTPUT": "memory:"
# }
# )["OUTPUT"]
# print(" - Final feature count:", trimmed.featureCount())
# print("========== CLEANSING DONE ==========\n")
# return {
# "summary": summary,
# "clean_layer": trimmed
# }
# self-intersection
def cleansing_layer(layer: QgsVectorLayer) -> Dict:
# ========================================================
# INITIAL STATE
# ========================================================
print("\n========== START CLEANSING ==========")
print("Step 0: Load Layer")
print(" - Valid:", layer.isValid())
print(" - Feature Count:", layer.featureCount())
summary = {
"step0_features": layer.featureCount(),
"step1_invalid_before": 0,
"step1_5_self_intersections": 0,
"step2_after_fix": 0,
"step3_after_multipolygon": 0,
"step4_duplicates_removed": 0,
"step5_after_remove_vertices": 0,
"step6_after_srid": 0,
"step7_sliver_removed": 0,
"step8_after_deleteholes": 0
}
# ========================================================
# 1. VALIDATE GEOMETRY
# ========================================================
print("\nStep 1: Identify invalid geometries")
invalid_ids = []
for f in layer.getFeatures():
if not f.geometry().isGeosValid():
invalid_ids.append(f.id())
summary["step1_invalid_before"] = len(invalid_ids)
print(" - Invalid geometries found:", len(invalid_ids))
# ========================================================
# 1.5 CHECK SELF INTERSECTION
# ========================================================
print("\nStep 1.5: Check self-intersection")
self_inter = processing.run(
"native:checkgeometryselfintersection",
{"INPUT": layer, "OUTPUT": "memory:"}
)["OUTPUT"]
self_inter_count = self_inter.featureCount()
summary["step1_5_self_intersections"] = self_inter_count
print(" - Features with self-intersection:", self_inter_count)
# ========================================================
# 2. FIX GEOMETRIES (INCLUDES SELF-INTERSECTION FIX)
# ========================================================
print("\nStep 2: Fix geometries (including self-intersections)")
fixed = processing.run(
"native:fixgeometries",
{"INPUT": layer, "OUTPUT": "memory:"}
)["OUTPUT"]
print(" - Valid after fix:", fixed.isValid())
print(" - Features after fix:", fixed.featureCount())
summary["step2_after_fix"] = fixed.featureCount()
# ========================================================
# 3. ENSURE MULTIPOLYGON
# ========================================================
print("\nStep 3: Ensure MULTIPOLYGON")
multipolygon = processing.run(
"native:collect",
{"INPUT": fixed, "OUTPUT": "memory:"}
)["OUTPUT"]
print(" - Valid:", multipolygon.isValid())
print(" - Features:", multipolygon.featureCount())
summary["step3_after_multipolygon"] = multipolygon.featureCount()
# ========================================================
# 4. REMOVE DUPLICATE ROWS
# ========================================================
print("\nStep 4: Remove duplicate rows")
all_fields = [f.name() for f in multipolygon.fields()]
print(" - All fields:", all_fields)
if "id" in all_fields:
key_fields = ["id"]
else:
int_cols = [
f.name() for f in multipolygon.fields()
if f.typeName().lower() in ["int", "integer", "bigint"]
]
key_fields = [int_cols[0]] if int_cols else all_fields
print(" - Using duplicate key:", key_fields)
dedup = processing.run(
"native:removeduplicatesbyattribute",
{"INPUT": multipolygon, "FIELDS": key_fields, "METHOD": 0, "OUTPUT": "memory:"}
)["OUTPUT"]
duplicates_removed = multipolygon.featureCount() - dedup.featureCount()
summary["step4_duplicates_removed"] = duplicates_removed
print(" - Features before:", multipolygon.featureCount())
print(" - Features after:", dedup.featureCount())
print(" - Duplicates removed:", duplicates_removed)
# ========================================================
# 5. REMOVE DUPLICATE VERTICES
# ========================================================
print("\nStep 5: Remove duplicate vertices")
no_dup_vertices = processing.run(
"native:removeduplicatevertices",
{"INPUT": dedup, "VERTICES": 0, "OUTPUT": "memory:"}
)["OUTPUT"]
print(" - Features:", no_dup_vertices.featureCount())
summary["step5_after_remove_vertices"] = no_dup_vertices.featureCount()
# ========================================================
# 6. FIX SRID / REPROJECT
# ========================================================
print("\nStep 6: Reproject (Fix SRID to EPSG:4326)")
reprojected = processing.run(
"native:reprojectlayer",
{"INPUT": no_dup_vertices, "TARGET_CRS": "EPSG:4326", "OUTPUT": "memory:"}
)["OUTPUT"]
print(" - Features:", reprojected.featureCount())
summary["step6_after_srid"] = reprojected.featureCount()
# ========================================================
# 7. REMOVE SLIVER POLYGONS (< 1 m2)
# ========================================================
print("\nStep 7: Remove sliver polygons (<1 m²)")
slivers = processing.run(
"native:extractbyexpression",
{"INPUT": reprojected, "EXPRESSION": "$area < 1", "OUTPUT": "memory:"}
)["OUTPUT"]
summary["step7_sliver_removed"] = slivers.featureCount()
print(" - Slivers found:", slivers.featureCount())
no_sliver = processing.run(
"native:extractbyexpression",
{"INPUT": reprojected, "EXPRESSION": "$area >= 1", "OUTPUT": "memory:"}
)["OUTPUT"]
print(" - Features left after removing slivers:", no_sliver.featureCount())
# ========================================================
# 8. REMOVE TINY HOLES (< 1 m2)
# ========================================================
print("\nStep 8: Remove tiny holes")
no_holes = processing.run(
"native:deleteholes",
{"INPUT": no_sliver, "MIN_AREA": 1, "OUTPUT": "memory:"}
)["OUTPUT"]
print(" - Features:", no_holes.featureCount())
summary["step8_after_deleteholes"] = no_holes.featureCount()
# ========================================================
# FINAL: TRIM STRING FIELDS
# ========================================================
print("\nFinal Step: Trim string fields")
trimmed = processing.run(
"qgis:refactorfields",
{
"INPUT": no_holes,
"FIELDS_MAPPING": [
{
"expression": f"trim(\"{field.name()}\")"
if field.typeName().lower() in ["text", "varchar"]
else f"\"{field.name()}\"",
"name": field.name(),
"type": field.type(),
"length": field.length(),
"precision": field.precision()
}
for field in no_holes.fields()
],
"OUTPUT": "memory:"
}
)["OUTPUT"]
print(" - Final feature count:", trimmed.featureCount())
print("========== CLEANSING DONE ==========\n")
return {
"summary": summary,
"clean_layer": trimmed
}

94
main.py Normal file
View File

@ -0,0 +1,94 @@
# from fastapi import FastAPI
# from qgis.core import QgsVectorLayer
# from qgis_bootstrap import start_qgis
# app = FastAPI()
# # Start QGIS headless
# qgs = start_qgis()
# @app.get("/")
# def root():
# return {"status": "QGIS API Ready"}
# @app.get("/extent")
# def extent():
# layer = QgsVectorLayer("data/exmpl.geojson", "jalan", "ogr")
# if not layer.isValid():
# return {"error": "Layer tidak valid"}
# ext = layer.extent()
# return {
# "xmin": ext.xMinimum(),
# "ymin": ext.yMinimum(),
# "xmax": ext.xMaximum(),
# "ymax": ext.yMaximum(),
# }
from fastapi import FastAPI, BackgroundTasks
from qgis_bootstrap import start_qgis
# from cleansing_service import load_layer, cleansing_layer
from full_cleansing_service import load_layer, cleansing_layer
app = FastAPI()
qgs = start_qgis()
@app.get("/")
def root():
return {"status": "QGIS Cleansing API Running"}
@app.get("/clean/{table_name}")
def clean_table(table_name: str):
layer = load_layer(table_name)
if not layer.isValid():
return {"error": f"Table '{table_name}' tidak valid atau tidak ditemukan."}
print(layer)
result = cleansing_layer(layer)
summary = result["summary"]
return {
"table": table_name,
"summary": summary,
"message": "Cleansing selesai"
}
@app.post("/process/{table_name}")
def process_table(table_name: str, background: BackgroundTasks):
background.add_task(run_clean_table, table_name)
return {"status": "ACCEPTED", "table": table_name}
def run_clean_table(table_name: str):
print(f"\n=== Mulai cleansing untuk tabel: {table_name} ===")
layer = load_layer(table_name)
if not layer.isValid():
print(f"[ERROR] Table '{table_name}' tidak valid atau tidak ditemukan.")
return
print("[OK] Layer valid, mulai cleansing...")
result = cleansing_layer(layer)
summary = result["summary"]
clean_layer = result["clean_layer"]
print("\n=== RINGKASAN CLEANSING ===")
for k, v in summary.items():
print(f"{k}: {v}")
# TODO: save back ke PostGIS
# save_to_postgis(clean_layer, table_name)
print(f"=== Cleansing selesai untuk tabel: {table_name} ===\n")

73
qgis_bootstrap.py Normal file
View File

@ -0,0 +1,73 @@
# import os
# import sys
# QGIS_APP = "/Applications/QGIS-LTR.app/Contents"
# QGIS_PREFIX = f"{QGIS_APP}/Resources"
# # ==== FIX VERY IMPORTANT ====
# os.environ["QGIS_PREFIX_PATH"] = QGIS_PREFIX
# os.environ["PROJ_LIB"] = f"{QGIS_PREFIX}/proj"
# os.environ["GDAL_DATA"] = f"{QGIS_PREFIX}/gdal"
# os.environ["QT_PLUGIN_PATH"] = f"{QGIS_PREFIX}/plugins"
# # =============================
# os.environ["QT_QPA_PLATFORM"] = "offscreen"
# # Python path
# sys.path.append(f"{QGIS_PREFIX}/python")
# sys.path.append(f"{QGIS_PREFIX}/python/plugins")
# from qgis.core import QgsApplication
# from qgis.analysis import QgsNativeAlgorithms
# import processing
# from processing.core.Processing import Processing
# def start_qgis():
# qgs = QgsApplication([], False)
# qgs.initQgis()
# # === WAJIB: initialize processing ===
# Processing.initialize()
# qgs.processingRegistry().addProvider(QgsNativeAlgorithms())
# return qgs
import os
import sys
# ==== Linux QGIS installation prefix ====
QGIS_PREFIX = "/usr"
os.environ["QGIS_PREFIX_PATH"] = QGIS_PREFIX
os.environ["PROJ_LIB"] = "/usr/share/proj"
os.environ["GDAL_DATA"] = "/usr/share/gdal"
os.environ["QT_PLUGIN_PATH"] = "/usr/lib/qt/plugins"
# Python path
sys.path.append("/usr/lib/python3/dist-packages/qgis")
sys.path.append("/usr/lib/python3/dist-packages/qgis/plugins")
from qgis.core import QgsApplication
from qgis.analysis import QgsNativeAlgorithms
import processing
from processing.core.Processing import Processing
def start_qgis():
qgs = QgsApplication([], False)
qgs.initQgis()
Processing.initialize()
qgs.processingRegistry().addProvider(QgsNativeAlgorithms())
return qgs

42
test_pg.py Normal file
View File

@ -0,0 +1,42 @@
import os, sys
print("---- ENV CHECK ----")
print("QGIS_PREFIX_PATH:", os.environ.get("QGIS_PREFIX_PATH"))
print("PROJ_LIB:", os.environ.get("PROJ_LIB"))
print("GDAL_DATA:", os.environ.get("GDAL_DATA"))
print("QT_PLUGIN_PATH:", os.environ.get("QT_PLUGIN_PATH"))
import glob
print("\n--- SEARCH proj.db in QGIS PREFIX ---")
print(glob.glob("/Applications/QGIS-LTR.app/**/proj.db", recursive=True))
print("\n--- CHECK FILE EXISTS ---")
print("proj.db exists? ", os.path.isfile(os.path.join(os.environ["PROJ_LIB"], "proj.db")))
QGIS_APP = "/Applications/QGIS-LTR.app/Contents"
QGIS_PREFIX = f"{QGIS_APP}/Resources"
os.environ["QGIS_PREFIX_PATH"] = QGIS_PREFIX
sys.path.append(f"{QGIS_PREFIX}/python")
sys.path.append(f"{QGIS_PREFIX}/python/plugins")
from qgis.core import QgsApplication, QgsVectorLayer
qgs = QgsApplication([], False)
qgs.initQgis()
uri = (
"dbname='gisdb' host=localhost port=5432 user='postgres' password='postgres' "
"table=\"public\".\"dataset_metadata\" key='id' srid=4326"
)
layer = QgsVectorLayer(uri, "test", "postgres")
print("Valid:", layer.isValid())
print("Provider:", layer.providerType())
print("Error summary:", layer.error().summary())
qgs.exitQgis()