file_table_reader/services/upload_file/utils/pdf_cleaner.py
2026-02-24 08:47:34 +07:00

209 lines
5.6 KiB
Python
Executable File

import re
import itertools
geo_admin_keywords = [
'lat', 'lon', 'long', 'latitude', 'longitude', 'koordinat', 'geometry', 'geometri',
'desa', 'kelurahan', 'kel', 'kecamatan', 'kabupaten', 'kab', 'kota', 'provinsi',
'lokasi', 'region', 'area', 'zone', 'boundary', 'batas'
]
def normalize_text(text):
text = text.lower()
text = re.sub(r'[^a-z0-9/ ]+', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def generate_combined_patterns(keywords):
combos = list(itertools.combinations(keywords, 2))
patterns = []
for a, b in combos:
patterns.append(rf'{a}\s*/\s*{b}')
patterns.append(rf'{b}\s*/\s*{a}')
return patterns
combined_patterns = generate_combined_patterns(geo_admin_keywords)
def contains_geo_admin_keywords(text):
text_clean = normalize_text(text)
if len(text_clean) < 3:
return False
for pattern in combined_patterns:
if re.search(pattern, text_clean):
return True
for kw in geo_admin_keywords:
if re.search(rf'(^|[\s/_-]){kw}([\s/_-]|$)', text_clean):
return True
return False
def filter_geo_admin_column(tables):
filtered = []
for table in tables:
found = any(contains_geo_admin_keywords(col) for col in table['columns'])
if found:
filtered.append(table)
return filtered
NUMBER_HEADER_KEYWORDS = [
"no","no.","nomor","nomor urut","no urut","No","Nomor","No Urut","Index",
"ID","Sr No","S/N","SN","Sl No"
]
def has_number_header(header):
header_text = header
return any(keyword in header_text for keyword in NUMBER_HEADER_KEYWORDS)
def is_numbering_column(col_values):
numeric_like = 0
total = 0
for v in col_values:
if not v or not isinstance(v, str):
continue
total += 1
if re.fullmatch(r"0*\d{1,3}", v.strip()):
numeric_like += 1
return total > 0 and (numeric_like / total) > 0.6
def is_numeric_value(v):
if v is None:
return False
if isinstance(v, (int, float)):
return True
if isinstance(v, str) and re.fullmatch(r"0*\d{1,3}", v.strip()):
return True
return False
def cleaning_column(headers, bodies):
cleaned_bodies = []
for header, body in zip(headers, bodies):
if not body:
cleaned_bodies.append(body)
continue
header_has_number = has_number_header(header)
first_col = [row[0] for row in body if row and len(row) > 0]
first_col_is_numbering = is_numbering_column(first_col)
if not header_has_number and first_col_is_numbering:
new_body = []
for row in body:
if not row:
continue
first_val = row[0]
if is_numeric_value(first_val) and len(row) > 1:
new_body.append(row[1:])
else:
new_body.append(row)
body = new_body
header_len = len(headers)
filtered_body = [row for row in body if len(row) == header_len]
cleaned_bodies.append(filtered_body)
return cleaned_bodies
def parse_page_selection(selectedPage: str, total_pages: int):
if not selectedPage:
return list(range(1, total_pages + 1))
pages = set()
parts = re.split(r'[,\s]+', selectedPage.strip())
for part in parts:
if '-' in part:
try:
start, end = map(int, part.split('-'))
pages.update(range(start, end + 1))
except ValueError:
continue
else:
try:
pages.add(int(part))
except ValueError:
continue
valid_pages = [p for p in sorted(pages) if 1 <= p <= total_pages]
return valid_pages
def is_number(s):
if s is None:
return False
s = str(s).strip().replace(',', '').replace('.', '')
return s.isdigit()
def row_ratio(row):
non_empty = [c for c in row if c not in (None, '', ' ')]
if not non_empty:
return 0
num_count = sum(is_number(c) for c in non_empty)
return num_count / len(non_empty)
def has_mixed_text_and_numbers(row):
non_empty = [c for c in row if c not in (None, '', ' ')]
has_text = any(isinstance(c, str) and re.search(r'[A-Za-z]', str(c)) for c in non_empty)
has_num = any(is_number(c) for c in non_empty)
return has_text and has_num
def is_short_text_row(row):
"""Deteksi baris teks pendek (1-2 kolom teks pendek)."""
non_empty = [str(c).strip() for c in row if c not in (None, '', ' ')]
if not non_empty:
return False
text_only = all(not is_number(c) for c in non_empty)
joined = " ".join(non_empty)
return text_only and len(non_empty) <= 2 and len(joined) < 20
def get_number_column_index(columns):
for i, col in enumerate(columns):
if has_number_header(col):
return i
return None
def get_start_end_number(rows, idx):
try:
start_no = int(rows[0][idx])
end_no = int(rows[-1][idx])
return start_no, end_no
except:
return None, None
def normalize_number_column(table):
columns = table["columns"]
rows = table["rows"]
num_idx = get_number_column_index(columns)
if num_idx is None:
return table
current = None
for row in rows:
try:
val = int(row[num_idx])
except:
continue
if current is None:
current = val
else:
if val <= current:
current += 1
else:
current = val
row[num_idx] = str(current)
return table