satupeta-main/services/upload_file/readers/reader_csv.py
2026-02-10 08:54:35 +07:00

229 lines
9.5 KiB
Python

import pandas as pd
import re
import csv
import os
def detect_header_line(path, max_rows=10):
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
lines = [next(f) for _ in range(max_rows)]
header_line_idx = 0
best_score = -1
for i, line in enumerate(lines):
cells = re.split(r'[;,|\t]', line.strip())
alpha_ratio = sum(bool(re.search(r'[A-Za-z]', c)) for c in cells) / max(len(cells), 1)
digit_ratio = sum(bool(re.search(r'\d', c)) for c in cells) / max(len(cells), 1)
score = alpha_ratio - digit_ratio
if score > best_score:
best_score = score
header_line_idx = i
return header_line_idx
def detect_delimiter(path, sample_size=2048):
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
sample = f.read(sample_size)
sniffer = csv.Sniffer()
try:
dialect = sniffer.sniff(sample)
return dialect.delimiter
except Exception:
for delim in [',', ';', '\t', '|']:
if delim in sample:
return delim
return ','
# def read_csv(path: str, sheet: str = None):
# ext = os.path.splitext(path)[1].lower()
# try:
# if ext in ['.csv']:
# header_line = detect_header_line(path)
# delimiter = detect_delimiter(path)
# print(f"[INFO] Detected header line: {header_line + 1}, delimiter: '{delimiter}'")
# df = pd.read_csv(
# path,
# header=header_line,
# sep=delimiter,
# encoding='utf-8',
# low_memory=False,
# thousands=','
# )
# elif ext in ['.xlsx', '.xls']:
# print(f"[INFO] Membaca file Excel: {os.path.basename(path)}")
# xls = pd.ExcelFile(path)
# print(f"[INFO] Ditemukan {len(xls.sheet_names)} sheet: {xls.sheet_names}")
# if sheet:
# if sheet not in xls.sheet_names:
# raise ValueError(f"Sheet '{sheet}' tidak ditemukan dalam file {os.path.basename(path)}")
# print(f"[INFO] Membaca sheet yang ditentukan: '{sheet}'")
# df = pd.read_excel(xls, sheet_name=sheet, header=0, dtype=str)
# df = df.dropna(how='all').dropna(axis=1, how='all')
# else:
# print("[INFO] Tidak ada sheet yang ditentukan, mencari sheet paling relevan...")
# best_sheet = None
# best_score = -1
# best_df = None
# for sheet_name in xls.sheet_names:
# try:
# temp_df = pd.read_excel(xls, sheet_name=sheet_name, header=0, dtype=str)
# temp_df = temp_df.dropna(how='all').dropna(axis=1, how='all')
# if len(temp_df) == 0 or len(temp_df.columns) < 2:
# continue
# # hitung skor relevansi
# text_ratio = temp_df.applymap(lambda x: isinstance(x, str)).sum().sum() / (temp_df.size or 1)
# row_score = len(temp_df)
# score = (row_score * 0.7) + (text_ratio * 100)
# if score > best_score:
# best_score = score
# best_sheet = sheet_name
# best_df = temp_df
# except Exception as e:
# print(f"[WARN] Gagal membaca sheet {sheet_name}: {e}")
# continue
# if best_df is not None:
# print(f"[INFO] Sheet terpilih: '{best_sheet}' dengan skor {best_score:.2f}")
# df = best_df
# else:
# raise ValueError("Tidak ada sheet valid yang dapat dibaca.")
# for col in df.columns:
# if df[col].astype(str).str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any():
# df[col] = df[col].astype(str).str.replace(',', '', regex=False)
# df[col] = pd.to_numeric(df[col], errors='ignore')
# else:
# raise ValueError("Format file tidak dikenali (hanya .csv, .xlsx, .xls)")
# except Exception as e:
# print(f"[WARN] Gagal membaca file ({e}), fallback ke default reader.")
# df = pd.read_csv(path, encoding='utf-8', low_memory=False, thousands=',')
# df = df.loc[:, ~df.columns.astype(str).str.contains('^Unnamed')]
# df.columns = [str(c).strip() for c in df.columns]
# df = df.dropna(how='all')
# return df
def read_csv(path: str, sheet: str = None):
ext = os.path.splitext(path)[1].lower()
df = pd.DataFrame() # Inisialisasi default
try:
# --- BLOK PEMBACAAN FILE ---
if ext in ['.csv']:
header_line = detect_header_line(path)
delimiter = detect_delimiter(path)
print(f"[INFO] Detected header line: {header_line + 1}, delimiter: '{delimiter}'")
df = pd.read_csv(
path,
header=header_line,
sep=delimiter,
encoding='utf-8',
low_memory=False,
thousands=','
)
elif ext in ['.xlsx', '.xls']:
print(f"[INFO] Membaca file Excel: {os.path.basename(path)}")
xls = pd.ExcelFile(path, engine='openpyxl') # Pakai engine openpyxl
print(f"[INFO] Ditemukan {len(xls.sheet_names)} sheet: {xls.sheet_names}")
if sheet:
if sheet not in xls.sheet_names:
raise ValueError(f"Sheet '{sheet}' tidak ditemukan.")
print(f"[INFO] Membaca sheet yang ditentukan: '{sheet}'")
# Tambahkan engine='openpyxl'
df = pd.read_excel(xls, sheet_name=sheet, header=0, dtype=str, engine='openpyxl')
df = df.dropna(how='all').dropna(axis=1, how='all')
else:
# Logika pencarian sheet terbaik (tidak berubah, hanya indentasi)
print("[INFO] Tidak ada sheet yang ditentukan, mencari sheet paling relevan...")
best_sheet = None
best_score = -1
best_df = None
for sheet_name in xls.sheet_names:
try:
temp_df = pd.read_excel(xls, sheet_name=sheet_name, header=0, dtype=str, engine='openpyxl')
temp_df = temp_df.dropna(how='all').dropna(axis=1, how='all')
if len(temp_df) == 0 or len(temp_df.columns) < 2:
continue
text_ratio = temp_df.applymap(lambda x: isinstance(x, str)).sum().sum() / (temp_df.size or 1)
row_score = len(temp_df)
score = (row_score * 0.7) + (text_ratio * 100)
if score > best_score:
best_score = score
best_sheet = sheet_name
best_df = temp_df
except Exception as e:
print(f"[WARN] Gagal membaca sheet {sheet_name}: {e}")
continue
if best_df is not None:
print(f"[INFO] Sheet terpilih: '{best_sheet}' dengan skor {best_score:.2f}")
df = best_df
else:
raise ValueError("Tidak ada sheet valid yang dapat dibaca.")
else:
raise ValueError("Format file tidak dikenali (hanya .csv, .xlsx, .xls)")
# --- BLOK PEMBERSIHAN (Dilakukan setelah file sukses terbaca) ---
# Kita bungkus ini agar error konversi angka TIDAK menggagalkan pembacaan file
if not df.empty:
df = df.loc[:, ~df.columns.astype(str).str.contains('^Unnamed')]
df.columns = [str(c).strip() for c in df.columns]
df = df.dropna(how='all')
# Konversi Angka yang Lebih Aman
for col in df.columns:
try:
# Cek apakah kolom terlihat seperti angka
if df[col].astype(str).str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any():
# Bersihkan koma
clean_col = df[col].astype(str).str.replace(',', '', regex=False)
# Gunakan errors='coerce' agar jika ada error value (NaN/REF), dia jadi NaN, bukan crash
df[col] = pd.to_numeric(clean_col, errors='coerce')
except Exception as ex:
# Jika konversi gagal, biarkan sebagai string/object dan lanjut ke kolom berikutnya
print(f"[WARN] Gagal konversi numerik pada kolom '{col}': {ex}")
pass
return df
except Exception as e:
# --- ERROR HANDLING YANG BENAR ---
print(f"[WARN] Gagal membaca file utama ({e}).")
# Hanya lakukan fallback CSV jika file aslinya MEMANG CSV (atau txt)
# Jangan paksa baca .xlsx pakai read_csv
if ext in ['.csv', '.txt']:
print("[INFO] Mencoba fallback ke default CSV reader...")
try:
return pd.read_csv(path, encoding='utf-8', low_memory=False, thousands=',')
except Exception as e2:
print(f"[ERROR] Fallback CSV juga gagal: {e2}")
# Jika file Excel gagal dibaca, return DataFrame kosong atau raise error
print("[ERROR] Tidak dapat memulihkan pembacaan file Excel.")
return pd.DataFrame()