74 lines
2.6 KiB
Python
74 lines
2.6 KiB
Python
import pandas as pd
|
|
import re
|
|
import csv
|
|
import os
|
|
|
|
def detect_header_line(path, max_rows=10):
|
|
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = [next(f) for _ in range(max_rows)]
|
|
|
|
header_line_idx = 0
|
|
best_score = -1
|
|
|
|
for i, line in enumerate(lines):
|
|
cells = re.split(r'[;,|\t]', line.strip())
|
|
alpha_ratio = sum(bool(re.search(r'[A-Za-z]', c)) for c in cells) / max(len(cells), 1)
|
|
digit_ratio = sum(bool(re.search(r'\d', c)) for c in cells) / max(len(cells), 1)
|
|
score = alpha_ratio - digit_ratio
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
header_line_idx = i
|
|
|
|
return header_line_idx
|
|
|
|
|
|
def detect_delimiter(path, sample_size=2048):
|
|
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
sample = f.read(sample_size)
|
|
sniffer = csv.Sniffer()
|
|
try:
|
|
dialect = sniffer.sniff(sample)
|
|
return dialect.delimiter
|
|
except Exception:
|
|
for delim in [',', ';', '\t', '|']:
|
|
if delim in sample:
|
|
return delim
|
|
return ','
|
|
|
|
|
|
def read_csv(path: str):
|
|
ext = os.path.splitext(path)[1].lower() # ambil ekstensi file
|
|
|
|
try:
|
|
if ext in ['.csv', '.txt']:
|
|
# === Baca file CSV ===
|
|
header_line = detect_header_line(path)
|
|
delimiter = detect_delimiter(path)
|
|
print(f"[INFO] Detected header line: {header_line + 1}, delimiter: '{delimiter}'")
|
|
|
|
df = pd.read_csv(path, header=header_line, sep=delimiter, encoding='utf-8', low_memory=False, thousands=',')
|
|
|
|
elif ext in ['.xlsx', '.xls']:
|
|
# === Baca file Excel ===
|
|
print(f"[INFO] Membaca file Excel: {os.path.basename(path)}")
|
|
pre_df = pd.read_excel(path, header=0, dtype=str) # baca semua sebagai string
|
|
df = pre_df.copy()
|
|
for col in df.columns:
|
|
if df[col].str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any():
|
|
df[col] = df[col].str.replace(',', '', regex=False)
|
|
df[col] = pd.to_numeric(df[col], errors='ignore')
|
|
|
|
else:
|
|
raise ValueError("Format file tidak dikenali (hanya .csv, .txt, .xlsx, .xls)")
|
|
|
|
except Exception as e:
|
|
print(f"[WARN] Gagal membaca file ({e}), fallback ke default")
|
|
df = pd.read_csv(path, encoding='utf-8', low_memory=False, thousands=',')
|
|
|
|
# Bersihkan kolom dan baris kosong
|
|
df = df.loc[:, ~df.columns.astype(str).str.contains('^Unnamed')]
|
|
df.columns = [str(c).strip() for c in df.columns]
|
|
df = df.dropna(how='all')
|
|
|
|
return df |