file_table_reader/services/upload_file/readers/reader_pdf.py

169 lines
5.9 KiB
Python
Raw Normal View History

2025-11-17 03:53:15 +00:00
import re
import pdfplumber
import pandas as pd
from services.upload_file.utils.pdf_cleaner import row_ratio, has_mixed_text_and_numbers, is_short_text_row, parse_page_selection, filter_geo_admin_column, cleaning_column
from services.upload_file.upload_exceptions import PDFReadError
from utils.logger_config import setup_logger
logger = setup_logger(__name__)
def detect_header_rows(rows):
if not rows:
return []
ratios = [row_ratio(r) for r in rows]
body_start_index = None
for i in range(1, len(rows)):
row = rows[i]
if has_mixed_text_and_numbers(row):
body_start_index = i
break
if ratios[i] > 0.3:
body_start_index = i
break
if any(isinstance(c, str) and re.match(r'^\d+$', c.strip()) for c in row):
body_start_index = i
break
if ratios[i - 1] == 0 and ratios[i] > 0:
body_start_index = i
break
if body_start_index is None:
body_start_index = len(rows)
potential_headers = rows[:body_start_index]
body_filtered = rows[body_start_index:]
header_filtered = []
for idx, row in enumerate(potential_headers):
if is_short_text_row(row):
if idx + 1 < len(potential_headers) and ratios[idx + 1] == 0:
header_filtered.append(row)
else:
continue
else:
header_filtered.append(row)
return header_filtered, body_filtered
def merge_multiline_header(header_rows):
final_header = []
for col in zip(*header_rows):
val = next((v for v in reversed(col) if v and str(v).strip()), '')
val = str(val).replace('\n', ' ').strip()
final_header.append(val)
final_header = [v for v in final_header if v not in ['', None]]
return final_header
def read_pdf(path: str, page: str):
"""
Membaca tabel dari file PDF secara semi-otomatis menggunakan `pdfplumber`.
Alur utama proses:
1. **Buka file PDF** menggunakan pdfplumber.
2. **Pilih halaman** berdasarkan input `page` (misalnya "1,3-5" untuk halaman 1 dan 35).
3. **Deteksi tabel** di setiap halaman yang dipilih.
4. **Ekstraksi tabel mentah** (list of list) dari setiap halaman.
5. **Pisahkan baris header dan body** dengan fungsi `detect_header_rows()`.
6. **Gabungkan header multi-baris** (misalnya tabel dengan dua baris judul kolom).
7. **Bersihkan body tabel** menggunakan `cleaning_column()`:
- Menghapus kolom nomor urut.
- Menyesuaikan jumlah kolom dengan header.
8. **Gabungkan hasil akhir** ke dalam format JSON dengan struktur:
{
"title": <nomor tabel>,
"columns": [...],
"rows": [...]
}
9. **Filter tambahan** dengan `filter_geo_admin_column()` (khusus metadata geospasial).
10. **Kembalikan hasil** berupa list JSON siap dikirim ke frontend API.
Args:
path (str): Lokasi file PDF yang akan dibaca.
page (str): Nomor halaman atau rentang halaman, contoh: "1", "2-4", "1,3-5".
Returns:
list[dict]: Daftar tabel hasil ekstraksi dengan struktur kolom dan baris.
Raises:
PDFReadError: Jika terjadi kesalahan saat membaca atau parsing PDF.
"""
try:
pdf_path = path
selectedPage = page if page else "1"
tables_data = []
with pdfplumber.open(pdf_path) as pdf:
total_pages = len(pdf.pages)
selected_pages = parse_page_selection(selectedPage, total_pages)
logger.info(f"[INFO] Total halaman PDF: {total_pages}")
logger.info(f"[INFO] Halaman yang dipilih untuk dibaca: {selected_pages}")
for page_num in selected_pages:
pdf_page = pdf.pages[page_num - 1]
tables = pdf_page.find_tables()
logger.info(f"[INFO] Halaman {page_num}: {len(tables)} tabel terdeteksi")
for t in tables:
table = t.extract()
if len(table) > 2:
tables_data.append(table)
logger.info(f"\nTotal tabel valid: {len(tables_data)}\n")
header_only, body_only = [], []
for tbl in tables_data:
head, body = detect_header_rows(tbl)
header_only.append(head)
body_only.append(body)
clean_header = [merge_multiline_header(h) for h in header_only]
clean_body = []
for i, raw_body in enumerate(body_only):
con_body = [[cell for cell in row if cell not in (None, '')] for row in raw_body]
cleaned = cleaning_column(clean_header[i], [con_body])
clean_body.append(cleaned[0])
parsed = []
for i, (cols, rows) in enumerate(zip(clean_header, clean_body), start=1):
parsed.append({
"title": str(i),
"columns": cols,
"rows": rows
})
clean_parsed = filter_geo_admin_column(parsed)
return clean_parsed
except Exception as e:
raise PDFReadError(f"Gagal membaca PDF: {e}", code=422)
def convert_df(payload):
try:
if "columns" not in payload or "rows" not in payload:
raise ValueError("Payload tidak memiliki key 'columns' atau 'rows'.")
if not isinstance(payload["columns"], list):
raise TypeError("'columns' harus berupa list.")
if not isinstance(payload["rows"], list):
raise TypeError("'rows' harus berupa list.")
for i, row in enumerate(payload["rows"]):
if len(row) != len(payload["columns"]):
raise ValueError(f"Jumlah elemen di baris ke-{i} tidak sesuai jumlah kolom.")
df = pd.DataFrame(payload["rows"], columns=payload["columns"])
if "title" in payload:
df.attrs["title"] = payload["title"]
return df
except Exception as e:
raise PDFReadError(f"Gagal konversi payload ke DataFrame: {e}", code=400)