From 62bd02d660681fe924655fef5728586a4d7e9659 Mon Sep 17 00:00:00 2001 From: dmsanhrProject Date: Tue, 4 Nov 2025 13:33:17 +0700 Subject: [PATCH] update reader csv --- services/reader_csv.py | 144 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 130 insertions(+), 14 deletions(-) diff --git a/services/reader_csv.py b/services/reader_csv.py index 9eafa50..c5cecb9 100644 --- a/services/reader_csv.py +++ b/services/reader_csv.py @@ -1,3 +1,89 @@ +# import pandas as pd +# import re +# import csv +# import os + +# def detect_header_line(path, max_rows=10): +# with open(path, 'r', encoding='utf-8', errors='ignore') as f: +# lines = [next(f) for _ in range(max_rows)] + +# header_line_idx = 0 +# best_score = -1 + +# for i, line in enumerate(lines): +# cells = re.split(r'[;,|\t]', line.strip()) +# alpha_ratio = sum(bool(re.search(r'[A-Za-z]', c)) for c in cells) / max(len(cells), 1) +# digit_ratio = sum(bool(re.search(r'\d', c)) for c in cells) / max(len(cells), 1) +# score = alpha_ratio - digit_ratio + +# if score > best_score: +# best_score = score +# header_line_idx = i + +# return header_line_idx + + +# def detect_delimiter(path, sample_size=2048): +# with open(path, 'r', encoding='utf-8', errors='ignore') as f: +# sample = f.read(sample_size) +# sniffer = csv.Sniffer() +# try: +# dialect = sniffer.sniff(sample) +# return dialect.delimiter +# except Exception: +# for delim in [',', ';', '\t', '|']: +# if delim in sample: +# return delim +# return ',' + + +# def read_csv(path: str): +# ext = os.path.splitext(path)[1].lower() # ambil ekstensi file + +# try: +# if ext in ['.csv', '.txt']: +# # === Baca file CSV === +# header_line = detect_header_line(path) +# delimiter = detect_delimiter(path) +# print(f"[INFO] Detected header line: {header_line + 1}, delimiter: '{delimiter}'") + +# df = pd.read_csv(path, header=header_line, sep=delimiter, encoding='utf-8', low_memory=False, thousands=',') + +# elif ext in ['.xlsx', '.xls']: +# # === Baca file Excel === +# print(f"[INFO] Membaca file Excel: {os.path.basename(path)}") +# pre_df = pd.read_excel(path, header=0, dtype=str) # baca semua sebagai string +# df = pre_df.copy() +# for col in df.columns: +# if df[col].str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any(): +# df[col] = df[col].str.replace(',', '', regex=False) +# df[col] = pd.to_numeric(df[col], errors='ignore') + +# else: +# raise ValueError("Format file tidak dikenali (hanya .csv, .txt, .xlsx, .xls)") + +# except Exception as e: +# print(f"[WARN] Gagal membaca file ({e}), fallback ke default") +# df = pd.read_csv(path, encoding='utf-8', low_memory=False, thousands=',') + +# # Bersihkan kolom dan baris kosong +# df = df.loc[:, ~df.columns.astype(str).str.contains('^Unnamed')] +# df.columns = [str(c).strip() for c in df.columns] +# df = df.dropna(how='all') + +# return df + + + + + + + + + + + + import pandas as pd import re import csv @@ -6,23 +92,18 @@ import os def detect_header_line(path, max_rows=10): with open(path, 'r', encoding='utf-8', errors='ignore') as f: lines = [next(f) for _ in range(max_rows)] - header_line_idx = 0 best_score = -1 - for i, line in enumerate(lines): cells = re.split(r'[;,|\t]', line.strip()) alpha_ratio = sum(bool(re.search(r'[A-Za-z]', c)) for c in cells) / max(len(cells), 1) digit_ratio = sum(bool(re.search(r'\d', c)) for c in cells) / max(len(cells), 1) - score = alpha_ratio - digit_ratio - + score = alpha_ratio - digit_ratio if score > best_score: best_score = score header_line_idx = i - return header_line_idx - def detect_delimiter(path, sample_size=2048): with open(path, 'r', encoding='utf-8', errors='ignore') as f: sample = f.read(sample_size) @@ -36,9 +117,8 @@ def detect_delimiter(path, sample_size=2048): return delim return ',' - def read_csv(path: str): - ext = os.path.splitext(path)[1].lower() # ambil ekstensi file + ext = os.path.splitext(path)[1].lower() try: if ext in ['.csv', '.txt']: @@ -52,18 +132,54 @@ def read_csv(path: str): elif ext in ['.xlsx', '.xls']: # === Baca file Excel === print(f"[INFO] Membaca file Excel: {os.path.basename(path)}") - pre_df = pd.read_excel(path, header=0, dtype=str) # baca semua sebagai string - df = pre_df.copy() + xls = pd.ExcelFile(path) + + print(f"[INFO] Ditemukan {len(xls.sheet_names)} sheet: {xls.sheet_names}") + + # Evaluasi tiap sheet untuk mencari yang paling relevan + best_sheet = None + best_score = -1 + best_df = None + + for sheet_name in xls.sheet_names: + try: + df = pd.read_excel(xls, sheet_name=sheet_name, header=0, dtype=str) + df = df.dropna(how='all').dropna(axis=1, how='all') + + if len(df) == 0 or len(df.columns) < 2: + continue + + # hitung "skor relevansi" + text_ratio = df.applymap(lambda x: isinstance(x, str)).sum().sum() / (df.size or 1) + row_score = len(df) + score = (row_score * 0.7) + (text_ratio * 100) + + if score > best_score: + best_score = score + best_sheet = sheet_name + best_df = df + + except Exception as e: + print(f"[WARN] Gagal membaca sheet {sheet_name}: {e}") + continue + + if best_df is not None: + print(f"[INFO] Sheet terpilih: '{best_sheet}' dengan skor {best_score:.2f}") + df = best_df + else: + raise ValueError("Tidak ada sheet valid yang dapat dibaca.") + + # Konversi tipe numerik jika ada for col in df.columns: - if df[col].str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any(): - df[col] = df[col].str.replace(',', '', regex=False) + if df[col].astype(str).str.replace(',', '', regex=False).str.match(r'^-?\d+(\.\d+)?$').any(): + df[col] = df[col].astype(str).str.replace(',', '', regex=False) df[col] = pd.to_numeric(df[col], errors='ignore') else: raise ValueError("Format file tidak dikenali (hanya .csv, .txt, .xlsx, .xls)") except Exception as e: - print(f"[WARN] Gagal membaca file ({e}), fallback ke default") + print(f"[WARN] Gagal membaca file ({e}), fallback ke default reader.") df = pd.read_csv(path, encoding='utf-8', low_memory=False, thousands=',') # Bersihkan kolom dan baris kosong @@ -71,4 +187,4 @@ def read_csv(path: str): df.columns = [str(c).strip() for c in df.columns] df = df.dropna(how='all') - return df \ No newline at end of file + return df