file_table_reader/services/upload_file/readers/reader_pdf.py

289 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import pdfplumber
import pandas as pd
from services.upload_file.utils.pdf_cleaner import get_number_column_index, get_start_end_number, normalize_number_column, row_ratio, has_mixed_text_and_numbers, is_short_text_row, parse_page_selection, filter_geo_admin_column, cleaning_column
from services.upload_file.upload_exceptions import PDFReadError
from utils.logger_config import setup_logger
logger = setup_logger(__name__)
def detect_header_rows(rows):
if not rows:
return []
ratios = [row_ratio(r) for r in rows]
body_start_index = None
for i in range(1, len(rows)):
row = rows[i]
if has_mixed_text_and_numbers(row):
body_start_index = i
break
if ratios[i] > 0.3:
body_start_index = i
break
if any(isinstance(c, str) and re.match(r'^\d+$', c.strip()) for c in row):
body_start_index = i
break
if ratios[i - 1] == 0 and ratios[i] > 0:
body_start_index = i
break
if body_start_index is None:
body_start_index = len(rows)
potential_headers = rows[:body_start_index]
body_filtered = rows[body_start_index:]
header_filtered = []
for idx, row in enumerate(potential_headers):
if is_short_text_row(row):
if idx + 1 < len(potential_headers) and ratios[idx + 1] == 0:
header_filtered.append(row)
else:
continue
else:
header_filtered.append(row)
return header_filtered, body_filtered
def merge_multiline_header(header_rows):
final_header = []
for col in zip(*header_rows):
val = next((v for v in reversed(col) if v and str(v).strip()), '')
val = str(val).replace('\n', ' ').strip()
final_header.append(val)
final_header = [v for v in final_header if v not in ['', None]]
return final_header
def merge_parsed_table(tables):
roots = []
fragments = []
# STEP 1: klasifikasi
for table in tables:
num_idx = get_number_column_index(table["columns"])
if num_idx is None:
roots.append(table)
continue
start_no, _ = get_start_end_number(table["rows"], num_idx)
if start_no == 1:
roots.append(table)
else:
fragments.append(table)
# STEP 2: merge fragment ke root
for frag in fragments:
frag_idx = get_number_column_index(frag["columns"])
f_start, _ = get_start_end_number(frag["rows"], frag_idx)
for root in roots:
if root["columns"] != frag["columns"]:
continue
root_idx = get_number_column_index(root["columns"])
_, r_end = get_start_end_number(root["rows"], root_idx)
if f_start == r_end + 1:
root["rows"].extend(frag["rows"])
break # fragment hanya boleh nempel ke 1 root
return roots
def read_pdf(path: str, page: str):
"""
Membaca tabel dari file PDF secara semi-otomatis menggunakan `pdfplumber`.
Alur utama proses:
1. **Buka file PDF** menggunakan pdfplumber.
2. **Pilih halaman** berdasarkan input `page` (misalnya "1,3-5" untuk halaman 1 dan 35).
3. **Deteksi tabel** di setiap halaman yang dipilih.
4. **Ekstraksi tabel mentah** (list of list) dari setiap halaman.
5. **Pisahkan baris header dan body** dengan fungsi `detect_header_rows()`.
6. **Gabungkan header multi-baris** (misalnya tabel dengan dua baris judul kolom).
7. **Bersihkan body tabel** menggunakan `cleaning_column()`:
- Menghapus kolom nomor urut.
- Menyesuaikan jumlah kolom dengan header.
8. **Gabungkan hasil akhir** ke dalam format JSON dengan struktur:
{
"title": <nomor tabel>,
"columns": [...],
"rows": [...]
}
9. **Filter tambahan** dengan `filter_geo_admin_column()` (khusus metadata geospasial).
10. **Kembalikan hasil** berupa list JSON siap dikirim ke frontend API.
Args:
path (str): Lokasi file PDF yang akan dibaca.
page (str): Nomor halaman atau rentang halaman, contoh: "1", "2-4", "1,3-5".
Returns:
list[dict]: Daftar tabel hasil ekstraksi dengan struktur kolom dan baris.
Raises:
PDFReadError: Jika terjadi kesalahan saat membaca atau parsing PDF.
"""
# try:
# pdf_path = path
# selectedPage = page if page else "1"
# tables_data = []
# with pdfplumber.open(pdf_path) as pdf:
# total_pages = len(pdf.pages)
# selected_pages = parse_page_selection(selectedPage, total_pages)
# logger.info(f"[INFO] Total Halaman PDF: {total_pages}")
# logger.info(f"[INFO] Total Halaman yang dipilih: {len(selected_pages)}")
# logger.info(f"[INFO] Halaman yang dipilih untuk dibaca: {selected_pages}")
# for page_num in selected_pages:
# pdf_page = pdf.pages[page_num - 1]
# tables = pdf_page.find_tables()
# logger.info(f"\n\n[INFO] Halaman {page_num}: {len(tables)} tabel terdeteksi")
# # pembacaan title ini tidak valid untuk halaman lanscape
# # for line in pdf_page.extract_text_lines():
# # if line['top'] > tables[0].bbox[1]:
# # break
# # previous_line = line
# # print('[TITLE]', previous_line['text'])
# for i, t in enumerate(tables, start=1):
# table = t.extract()
# if len(table) > 2:
# print(f"[TBL] tabel : {i} - halaman {page_num}")
# tables_data.append(table)
# logger.info(f"\nTotal tabel terbaca: {len(tables_data)}\n")
# header_only, body_only = [], []
# for tbl in tables_data:
# head, body = detect_header_rows(tbl)
# header_only.append(head)
# body_only.append(body)
# clean_header = [merge_multiline_header(h) for h in header_only]
# clean_body = []
# for i, raw_body in enumerate(body_only):
# con_body = [[cell for cell in row if cell not in (None, '')] for row in raw_body]
# cleaned = cleaning_column(clean_header[i], [con_body])
# clean_body.append(cleaned[0])
# parsed = []
# for i, (cols, rows) in enumerate(zip(clean_header, clean_body), start=1):
# parsed.append({
# "title": str(i),
# "columns": cols,
# "rows": rows
# })
# # =================================================================
# clean_parsed = filter_geo_admin_column(parsed)
# merge_parsed = merge_parsed_table(clean_parsed)
# logger.info(f"\nTotal tabel valid: {len(merge_parsed)}\n")
# ordered_tables = [normalize_number_column(t) for t in merge_parsed]
# return ordered_tables
# except Exception as e:
# raise PDFReadError(f"Gagal membaca PDF: {e}", code=422)
try:
pdf_path = path
selectedPage = page if page else "1"
tables_data = []
with pdfplumber.open(pdf_path) as pdf:
total_pages = len(pdf.pages)
selected_pages = parse_page_selection(selectedPage, total_pages)
logger.info(f"[INFO] Total Halaman PDF: {total_pages}")
logger.info(f"[INFO] Total Halaman yang dipilih: {len(selected_pages)}")
logger.info(f"[INFO] Halaman yang dipilih untuk dibaca: {selected_pages}")
for page_num in selected_pages:
pdf_page = pdf.pages[page_num - 1]
tables = pdf_page.find_tables()
logger.info(f"\n\n[INFO] Halaman {page_num}: {len(tables)} tabel terdeteksi")
# pembacaan title ini tidak valid untuk halaman lanscape
# for line in pdf_page.extract_text_lines():
# if line['top'] > tables[0].bbox[1]:
# break
# previous_line = line
# print('[TITLE]', previous_line['text'])
for i, t in enumerate(tables, start=1):
table = t.extract()
if len(table) > 2:
print(f"[TBL] tabel : {i} - halaman {page_num}")
tables_data.append({"page": f"halaman {page_num} - {i}", "table": table})
logger.info(f"\nTotal tabel terbaca: {len(tables_data)}\n")
header_only, body_only, page_info = [], [], []
for tbl in tables_data:
head, body = detect_header_rows(tbl["table"])
header_only.append(head)
body_only.append(body)
page_info.append(tbl["page"])
clean_header = [merge_multiline_header(h) for h in header_only]
clean_body = []
for i, raw_body in enumerate(body_only):
con_body = [[cell for cell in row if cell not in (None, '')] for row in raw_body]
cleaned = cleaning_column(clean_header[i], [con_body])
clean_body.append(cleaned[0])
parsed = []
for i, (cols, rows, page) in enumerate(zip(clean_header, clean_body, page_info), start=1):
parsed.append({
"title": page,
"columns": cols,
"rows": rows
})
# =================================================================
clean_parsed = filter_geo_admin_column(parsed)
merge_parsed = merge_parsed_table(clean_parsed)
logger.info(f"\nTotal tabel valid: {len(merge_parsed)}\n")
ordered_tables = [normalize_number_column(t) for t in merge_parsed]
return ordered_tables
except Exception as e:
raise PDFReadError(f"Gagal membaca PDF: {e}", code=422)
def convert_df(payload):
try:
if "columns" not in payload or "rows" not in payload:
raise ValueError("Payload tidak memiliki key 'columns' atau 'rows'.")
if not isinstance(payload["columns"], list):
raise TypeError("'columns' harus berupa list.")
if not isinstance(payload["rows"], list):
raise TypeError("'rows' harus berupa list.")
for i, row in enumerate(payload["rows"]):
if len(row) != len(payload["columns"]):
raise ValueError(f"Jumlah elemen di baris ke-{i} tidak sesuai jumlah kolom.")
df = pd.DataFrame(payload["rows"], columns=payload["columns"])
if "title" in payload:
df.attrs["title"] = payload["title"]
return df
except Exception as e:
raise PDFReadError(f"Gagal konversi payload ke DataFrame: {e}", code=400)