From 3eef042111ef10af26f380900e715244d7b222f8 Mon Sep 17 00:00:00 2001 From: nite Date: Mon, 27 Oct 2025 20:02:02 +1100 Subject: [PATCH] refactor(app): Extract PDF conversion logic into a separate module The main.py script was becoming monolithic, containing all the logic for PDF conversion, image path simplification, and content refinement. This change extracts these core functionalities into a new `pdf_convertor` module. This refactoring improves the project structure by: - Enhancing modularity and separation of concerns. - Making the main.py script a cleaner, high-level orchestrator. - Improving code readability and maintainability. The functions `convert_pdf_to_markdown`, `save_md_images`, and `refine_content` are now imported from the `pdf_convertor` module and called from the main execution block. --- .gitignore | 0 .python-version | 0 README.md | 0 llm.py | 23 +++++ main.py | 255 ++++------------------------------------------- pdf_convertor.py | 190 +++++++++++++++++++++++++++++++++++ pyproject.toml | 0 uv.lock | 0 8 files changed, 235 insertions(+), 233 deletions(-) mode change 100644 => 100755 .gitignore mode change 100644 => 100755 .python-version mode change 100644 => 100755 README.md create mode 100755 llm.py mode change 100644 => 100755 main.py create mode 100755 pdf_convertor.py mode change 100644 => 100755 pyproject.toml mode change 100644 => 100755 uv.lock diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/.python-version b/.python-version old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/llm.py b/llm.py new file mode 100755 index 0000000..58108ab --- /dev/null +++ b/llm.py @@ -0,0 +1,23 @@ +import configparser +import os + + +def set_gemini_api_key() -> None: + config = configparser.ConfigParser() + config.read("config.ini") + google_api_key = config.get("llm", "GOOGLE_API_KEY", fallback=None) + + if not os.environ.get("GOOGLE_API_KEY"): + if google_api_key: + os.environ["GOOGLE_API_KEY"] = google_api_key + else: + raise ValueError( + "Error: GOOGLE_API_KEY not found in config.ini or environment variables" + ) + return + + +def get_model_name() -> str: + config = configparser.ConfigParser() + config.read("config.ini") + return config.get("llm", "MODEL_NAME", fallback="gemini-2.5-flash") diff --git a/main.py b/main.py old mode 100644 new mode 100755 index 0eece82..7536c00 --- a/main.py +++ b/main.py @@ -1,247 +1,36 @@ -import base64 import os -import re -import configparser -import sys -from pathlib import Path -from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions -from docling.datamodel.base_models import InputFormat -from docling.datamodel.pipeline_options import ( - PdfPipelineOptions, +from pdf_convertor import ( + convert_pdf_to_markdown, + save_md_images, + refine_content, ) -from docling.datamodel.settings import settings -from docling.document_converter import DocumentConverter, PdfFormatOption -from docling_core.types.doc.base import ImageRefMode -from langchain_core.messages import HumanMessage, SystemMessage -from langchain_google_genai import ChatGoogleGenerativeAI - - -def convert_pdf_to_markdown(input_doc_path, output_md_path): - """Converts a PDF document to Markdown format.""" - accelerator_options = AcceleratorOptions( - num_threads=8, device=AcceleratorDevice.CUDA - ) - - pipeline_options = PdfPipelineOptions() - pipeline_options.accelerator_options = accelerator_options - pipeline_options.do_ocr = True - pipeline_options.do_table_structure = True - pipeline_options.table_structure_options.do_cell_matching = True - pipeline_options.generate_page_images = True - pipeline_options.generate_picture_images = True - - converter = DocumentConverter( - format_options={ - InputFormat.PDF: PdfFormatOption( - pipeline_options=pipeline_options, - ) - } - ) - - # Enable the profiling to measure the time spent - settings.debug.profile_pipeline_timings = True - - # Convert the document - print(f"Converting {input_doc_path} to Markdown...") - conversion_result = converter.convert(input_doc_path) - doc = conversion_result.document - - # List with total time per document - doc_conversion_secs = conversion_result.timings["pipeline_total"].times - - doc.save_as_markdown( - filename=Path(output_md_path), - artifacts_dir=Path( - os.path.join(os.path.splitext(os.path.basename(output_md_path))[0], "image") - ), - image_mode=ImageRefMode.REFERENCED, - ) - print(f"Conversion took: {doc_conversion_secs} seconds") - print(f"Markdown file saved to: {output_md_path}") - - -def simplify_image_references_in_markdown(markdown_path): - """Simplifies image names in the markdown file and renames the image files.""" - print(f"Simplifying image references in {markdown_path}...") - with open(markdown_path, "r+", encoding="utf-8") as f: - content = f.read() - - # Find all unique image paths - image_paths = set(re.findall(r"\((\S*?image_\d{6}_[a-f0-9]+\.png)\)", content)) - - for old_path in image_paths: - old_path_prefix = os.path.join("output", old_path) - if not os.path.exists(path=old_path_prefix): - continue - - directory = os.path.dirname(old_path_prefix) - old_filename = os.path.basename(old_path_prefix) - - # Create new filename, e.g., image_000000.png - parts = old_filename.split("_") - new_filename = f"{parts[0]}_{parts[1]}.png" - new_path = os.path.join(directory, new_filename) - - # Rename the physical file - if not os.path.exists(new_path): - os.rename(old_path_prefix, new_path) - - # Replace the path in the markdown content - new_path_in_markdown = new_path.replace(f"output{os.sep}", "") - content = content.replace(old_path, new_path_in_markdown) - - # Go back to the beginning of the file and write the modified content - f.seek(0) - f.write(content) - f.truncate() - print("Image references simplified.") - - -def refine_and_translate_content(markdown_path, pdf_path): - """Refines and translates the Markdown content using an LLM.""" - print("Starting content refinement and translation...") - - config = configparser.ConfigParser() - config.read("config.ini") - google_api_key = config.get("api_keys", "GOOGLE_API_KEY", fallback=None) - - if not google_api_key: - print("Error: GOOGLE_API_KEY not found in config.ini") - return - - os.environ["GOOGLE_API_KEY"] = google_api_key - try: - llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0) - except Exception as e: - print( - f"Error initializing LLM. Make sure your Google API key is set correctly. Error: {e}" - ) - return - - try: - with open(markdown_path, "r", encoding="utf-8") as f: - markdown_text = f.read() - markdown_content = markdown_text.encode("utf-8") - - with open(pdf_path, "rb") as pdf_file: - pdf_bytes = pdf_file.read() - - except FileNotFoundError as e: - print(f"Error reading files: {e}") - return - - prompt = """ - 您是一名专业的科技文档编辑和翻译。您的任务是润色一份从随附 PDF 文档自动转换而来的 Markdown 文本。请以原始 PDF 作为布局、图像和上下文的真实依据。 - - 请根据提供的 Markdown 和 PDF 执行以下四项操作: - - 1. **清理多余字符**:查看 Markdown 文本,删除原始 PDF 中不存在的任何转换伪影或奇怪格式。 - 2. **解释图像内容**:参考 PDF 中的图表、示意图和图像,在图像引用后添加清晰的解释。 - 3. **更正列表格式**:转换可能使嵌套列表扁平化。分析 PDF 中的列表结构,并在 Markdown 中恢复正确的多级缩进。 - 4. **更正数学公式和符号**:将纯文字公式转换为正确的公式表达,例如 `Kmin` 应使用 `$K_{min}`,`E = hc/λ`,应使用 `$E = \\frac{hc}{\\lambda}$` - 5. **调整标题**:将相同层级的同名标题按照小节内的不同内容重新命名,避免同层级同名标题出现,并且确保大纲的清晰性。 - 6. **翻译成中文**:将整个清理和更正后的文档翻译成简体中文。当您遇到专业或技术术语时,您必须在其译文旁边保留原始英文术语并用括号括起来。 - - 只需要输出调整翻译后的 markdown 文本,不需要任何其他的文字内容。 - """ - - human_message_parts = [ - { - "type": "media", - "mime_type": "text/markdown", - "data": base64.b64encode(markdown_content).decode("utf-8"), - }, - ] - - # Find all image references in the markdown content - image_paths = re.findall(r"!\[.*?\]\((.*?)\)", markdown_text) - markdown_dir = os.path.dirname(markdown_path) - - if image_paths: - print(f"Found {len(image_paths)} image references in the markdown file.") - for image_path in image_paths: - # Construct the full path to the image file - full_image_path = os.path.join(markdown_dir, image_path) - if os.path.exists(full_image_path): - with open(full_image_path, "rb") as f: - image_data = f.read() - - human_message_parts.append( - { - "type": "text", - "text": f"这是图片 '{os.path.basename(image_path)}':\n", - } - ) - human_message_parts.append( - { - "type": "media", - "mime_type": "image/png", - "data": base64.b64encode(image_data).decode("utf-8"), - } - ) - else: - print(f"Warning: Image file not found at {full_image_path}") - - human_message_parts.extend( - [ - { - "type": "text", - "text": "这是原始的PDF文件:\n", - }, - { - "type": "media", - "mime_type": "application/pdf", - "data": base64.b64encode(pdf_bytes).decode("utf-8"), - }, - ] - ) - - message_content = [ - SystemMessage(prompt), - HumanMessage(human_message_parts), - ] - - print( - "Sending request to Gemini with the PDF, Markdown and referenced images... This may take a moment." - ) - try: - response = llm.invoke(message_content) - refined_content = response.content - except Exception as e: - print(f"An error occurred while invoking the LLM: {e}") - return - - refined_output_path = os.path.splitext(markdown_path)[0] + "_refined_zh.md" - with open(refined_output_path, "w", encoding="utf-8") as f: - f.write(str(refined_content)) - - print(f"Task complete! Refined and translated file saved to: {refined_output_path}") +from pathlib import Path def main(): - input_dir = "input" - output_dir = "output" - os.makedirs(output_dir, exist_ok=True) + input_dir = Path("input") + output_dir = Path("output") - pdf_files = [f for f in os.listdir(input_dir) if f.endswith(".pdf")] + output_dir.mkdir(parents=True, exist_ok=True) - if not pdf_files: - print(f"Error: No PDF files found in the '{input_dir}' directory.") - sys.exit(1) + for filename in os.listdir(input_dir): + if not filename.endswith(".pdf"): + continue - for fileName in pdf_files: - print(f"\nProcessing file: {fileName}") - input_doc_path = os.path.join(input_dir, fileName) - output_md_path = os.path.join(output_dir, fileName.replace(".pdf", ".md")) + pdf_path = input_dir.joinpath(filename) + current_output_dir = output_dir.joinpath( + pdf_path.name.removesuffix(pdf_path.suffix) + ) - # Step 1: Convert PDF to Markdown - convert_pdf_to_markdown(input_doc_path, output_md_path) + current_output_dir.mkdir(parents=True, exist_ok=True) - # Step 2: Simplify image references - simplify_image_references_in_markdown(output_md_path) + print(f"Processing {pdf_path} -> {current_output_dir}") - # # Step 3: Refine and translate the content - refine_and_translate_content(output_md_path, input_doc_path) + with open(pdf_path, "rb") as pdf_file: + pdf_content = pdf_file.read() + md, images = convert_pdf_to_markdown(pdf_content) + md = refine_content(md, images, pdf_content) + save_md_images(current_output_dir, md, images) if __name__ == "__main__": diff --git a/pdf_convertor.py b/pdf_convertor.py new file mode 100755 index 0000000..f91d662 --- /dev/null +++ b/pdf_convertor.py @@ -0,0 +1,190 @@ +import re +import base64 +from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions +from docling.datamodel.base_models import InputFormat +from docling.datamodel.pipeline_options import ( + PdfPipelineOptions, +) +from docling_core.types.io import DocumentStream +from docling.datamodel.settings import settings +from docling.document_converter import DocumentConverter, PdfFormatOption +from docling_core.types.doc.base import ImageRefMode +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_google_genai import ChatGoogleGenerativeAI +from llm import set_gemini_api_key, get_model_name +from io import BytesIO +from pathlib import Path + + +def save_md_images( + output: str | Path, + md_content: str, + images: dict[str, bytes], + md_name: str = "index.md", + images_dirname: str = "images", +): + output = Path(output) + md_path = output.joinpath(md_name) + md_path.parent.mkdir(exist_ok=True, parents=True) + images_dir = output.joinpath(images_dirname) + images_dir.mkdir(exist_ok=True, parents=True) + for image_name in images.keys(): + image_path = images_dir.joinpath(Path(image_name).name) + with open(image_path, "wb") as image_file: + image_file.write(images[image_name]) + md_content = md_content.replace( + f"]({image_name})", + f"]({image_path.relative_to(md_path.parent, walk_up=True)})", + ) + with open(md_path, "w") as md_file: + md_file.write(md_content) + + +def load_md_file(md_path: str | Path) -> tuple[str, dict[str, bytes]]: + md_path = Path(md_path) + with open(md_path, "r") as md_file: + md = md_file.read() + images: list[str] = re.findall(r"!\[Image\]\((.*?)\)", md) + image_dict: dict[str, bytes] = dict() + for i in range(len(images)): + image_path = images[i] + if image_path.startswith("data:image/png;base64,"): + image_dict[f"{i}.png"] = image_path.removeprefix( + "data:image/png;base64," + ).encode("UTF-8") + else: + with open( + Path(md_path.parent).joinpath(image_path), "rb" + ) as image_file: + image_dict[image_path] = image_file.read() + return (md, image_dict) + + +def convert_pdf_to_markdown(pdf: bytes) -> tuple[str, dict[str, bytes]]: + """Converts a PDF document to Markdown format.""" + + accelerator_options = AcceleratorOptions( + num_threads=16, device=AcceleratorDevice.CUDA + ) + + pipeline_options = PdfPipelineOptions() + pipeline_options.accelerator_options = accelerator_options + pipeline_options.do_ocr = True + pipeline_options.do_table_structure = True + pipeline_options.table_structure_options.do_cell_matching = True + pipeline_options.generate_page_images = True + pipeline_options.generate_picture_images = True + + converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_options=pipeline_options, + ) + } + ) + + # Enable the profiling to measure the time spent + settings.debug.profile_pipeline_timings = True + + # Convert the document + conversion_result = converter.convert( + source=DocumentStream(name="", stream=BytesIO(pdf)) + ) + doc = conversion_result.document + + doc.pictures + + md = doc.export_to_markdown( + image_mode=ImageRefMode.EMBEDDED, + ) + + images: list[str] = re.findall(r"!\[Image\]\((.*?)\)", md) + image_dict: dict[str, bytes] = dict() + for i in range(len(images)): + data = images[i].removeprefix("data:image/png;base64,") + img_data = base64.b64decode(data) + image_dict[f"{i}.png"] = img_data + md = md.replace(images[i], f"{i}.png") + + return (md, image_dict) + + +def refine_content(md: str, images: dict[str, bytes], pdf: bytes) -> str: + """Refines the Markdown content using an LLM.""" + + set_gemini_api_key() + + try: + llm = ChatGoogleGenerativeAI(model=get_model_name(), temperature=0) + except Exception as e: + raise BaseException( + f"Error initializing LLM. Make sure your Google API key is set correctly. Error: {e}" + ) + + prompt = """ + You are a professional technical document editor. Your task is to polish a Markdown text automatically converted from an accompanying PDF document. Please use the original PDF as the source of truth for layout, images, and context. + + Please perform the following operations based on the provided Markdown and PDF: + + 1. **Clean up extraneous characters**: Review the Markdown text and remove any conversion artifacts or strange formatting that do not exist in the original PDF. + 2. **Explain image content**: Refer to charts, diagrams, and images in the PDF, and add descriptions after image citations so that complete information can be obtained through text descriptions even without the images. + 3. **Correct list formatting**: The conversion may have flattened nested lists. Analyze the list structure in the PDF and restore the correct multi-level indentation in the Markdown. + 4. **Correct mathematical formulas and symbols**: Convert plain text formulas into proper formula notation, for example, `Kmin` should be `$K_{min}`, and `E = hc/λ` should be `$E = \\frac{hc}{\\lambda}`. + 5. **Adjust headings**: Rename headings of the same level that have the same name according to the different content within the subsections to avoid duplicate same-level headings and ensure the outline is clear. + 6. **Translate**: Translate the content into Simplified Chinese. Proper nouns should retain their original expression during translation, for example, `Magnetic resonance imaging` should be translated as `磁共振成像(Magnetic resonance imaging, MRI)`. + + Only output the adjusted Markdown text, without any other text content. + """ + + human_message_parts = [ + { + "type": "media", + "mime_type": "text/markdown", + "data": base64.b64encode(md.encode("UTF-8")).decode("utf-8"), + }, + ] + + for image_name in images.keys(): + human_message_parts.append( + { + "type": "text", + "text": f"This is image: '{image_name}':\n", + } + ) + human_message_parts.append( + { + "type": "media", + "mime_type": "image/png", + "data": base64.b64encode(images[image_name]).decode("utf-8"), + } + ) + + human_message_parts.extend( + [ + { + "type": "text", + "text": "This is original PDF file:\n", + }, + { + "type": "media", + "mime_type": "application/pdf", + "data": base64.b64encode(pdf).decode("utf-8"), + }, + ] + ) + + message_content = [ + SystemMessage(content=prompt), + HumanMessage(content=human_message_parts), # type: ignore + ] + + print( + "Sending request to Gemini with the PDF, Markdown and referenced images... This may take a moment." + ) + try: + response = llm.invoke(message_content) + refined_content = response.content + except Exception as e: + raise BaseException(f"An error occurred while invoking the LLM: {e}") + + return str(refined_content) diff --git a/pyproject.toml b/pyproject.toml old mode 100644 new mode 100755 diff --git a/uv.lock b/uv.lock old mode 100644 new mode 100755