import re import base64 from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import ( PdfPipelineOptions, ) from docling_core.types.io import DocumentStream from docling.datamodel.settings import settings from docling.document_converter import DocumentConverter, PdfFormatOption from docling_core.types.doc.base import ImageRefMode from langchain_core.messages import HumanMessage, SystemMessage from langchain_google_genai import ChatGoogleGenerativeAI from llm import set_gemini_api_key, get_model_name from io import BytesIO from pathlib import Path def save_md_images( output: str | Path, md_content: str, images: dict[str, bytes], md_name: str = "index.md", images_dirname: str = "images", ): output = Path(output) md_path = output.joinpath(md_name) md_path.parent.mkdir(exist_ok=True, parents=True) images_dir = output.joinpath(images_dirname) images_dir.mkdir(exist_ok=True, parents=True) for image_name in images.keys(): image_path = images_dir.joinpath(Path(image_name).name) with open(image_path, "wb") as image_file: image_file.write(images[image_name]) md_content = md_content.replace( f"]({image_name})", f"]({image_path.relative_to(md_path.parent, walk_up=True)})", ) with open(md_path, "w") as md_file: md_file.write(md_content) def load_md_file(md_path: str | Path) -> tuple[str, dict[str, bytes]]: md_path = Path(md_path) with open(md_path, "r") as md_file: md = md_file.read() images: list[str] = re.findall(r"!\[Image\]\((.*?)\)", md) image_dict: dict[str, bytes] = dict() for i in range(len(images)): image_path = images[i] if image_path.startswith("data:image/png;base64,"): image_dict[f"{i}.png"] = image_path.removeprefix( "data:image/png;base64," ).encode("UTF-8") else: with open( Path(md_path.parent).joinpath(image_path), "rb" ) as image_file: image_dict[image_path] = image_file.read() return (md, image_dict) def convert_pdf_to_markdown(pdf: bytes) -> tuple[str, dict[str, bytes]]: """Converts a PDF document to Markdown format.""" accelerator_options = AcceleratorOptions( num_threads=16, device=AcceleratorDevice.CUDA ) pipeline_options = PdfPipelineOptions() pipeline_options.accelerator_options = accelerator_options pipeline_options.do_ocr = True pipeline_options.do_table_structure = True pipeline_options.table_structure_options.do_cell_matching = True pipeline_options.generate_page_images = True pipeline_options.generate_picture_images = True converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption( pipeline_options=pipeline_options, ) } ) # Enable the profiling to measure the time spent settings.debug.profile_pipeline_timings = True # Convert the document conversion_result = converter.convert( source=DocumentStream(name="", stream=BytesIO(pdf)) ) doc = conversion_result.document doc.pictures md = doc.export_to_markdown( image_mode=ImageRefMode.EMBEDDED, ) images: list[str] = re.findall(r"!\[Image\]\((.*?)\)", md) image_dict: dict[str, bytes] = dict() for i in range(len(images)): data = images[i].removeprefix("data:image/png;base64,") img_data = base64.b64decode(data) image_dict[f"{i}.png"] = img_data md = md.replace(images[i], f"{i}.png") return (md, image_dict) def refine_content(md: str, images: dict[str, bytes], pdf: bytes) -> str: """Refines the Markdown content using an LLM.""" set_gemini_api_key() try: llm = ChatGoogleGenerativeAI(model=get_model_name(), temperature=0) except Exception as e: raise BaseException( f"Error initializing LLM. Make sure your Google API key is set correctly. Error: {e}" ) prompt = """ You are a professional technical document editor. Your task is to polish a Markdown text automatically converted from an accompanying PDF document. Please use the original PDF as the source of truth for layout, images, and context. Please perform the following operations based on the provided Markdown and PDF: 1. **Clean up extraneous characters**: Review the Markdown text and remove any conversion artifacts or strange formatting that do not exist in the original PDF. 2. **Explain image content**: Refer to charts, diagrams, and images in the PDF, and add descriptions after image citations so that complete information can be obtained through text descriptions even without the images. 3. **Correct list formatting**: The conversion may have flattened nested lists. Analyze the list structure in the PDF and restore the correct multi-level indentation in the Markdown. 4. **Correct mathematical formulas and symbols**: Convert plain text formulas into proper formula notation, for example, `Kmin` should be `$K_{min}`, and `E = hc/λ` should be `$E = \\frac{hc}{\\lambda}`. 5. **Adjust headings**: Rename headings of the same level that have the same name according to the different content within the subsections to avoid duplicate same-level headings and ensure the outline is clear. 6. **Translate**: Translate the content into Simplified Chinese. Proper nouns should retain their original expression during translation, for example, `Magnetic resonance imaging` should be translated as `磁共振成像(Magnetic resonance imaging, MRI)`. Only output the adjusted Markdown text, without any other text content. """ human_message_parts = [ { "type": "media", "mime_type": "text/markdown", "data": base64.b64encode(md.encode("UTF-8")).decode("utf-8"), }, ] for image_name in images.keys(): human_message_parts.append( { "type": "text", "text": f"This is image: '{image_name}':\n", } ) human_message_parts.append( { "type": "media", "mime_type": "image/png", "data": base64.b64encode(images[image_name]).decode("utf-8"), } ) human_message_parts.extend( [ { "type": "text", "text": "This is original PDF file:\n", }, { "type": "media", "mime_type": "application/pdf", "data": base64.b64encode(pdf).decode("utf-8"), }, ] ) message_content = [ SystemMessage(content=prompt), HumanMessage(content=human_message_parts), # type: ignore ] print( "Sending request to Gemini with the PDF, Markdown and referenced images... This may take a moment." ) try: response = llm.invoke(message_content) refined_content = response.content except Exception as e: raise BaseException(f"An error occurred while invoking the LLM: {e}") return str(refined_content)