1c1c68a214
- 移除 Gemini 和 Ollama 独立适配,统一使用 ChatOpenAI + base_url - config.ini 简化为 BASE_URL / API_KEY / MODEL / TEMPERATURE / MAX_RETRIES - 新增 config.example.ini 示例配置 - 移除 langchain-google-genai / langchain-ollama / pymupdf 依赖 - main.py 新增断点续跑:跳过已有 index.md / index_refined.md - LLM 请求支持 max_retries 自动重试(默认 3 次) - 优化 README
230 lines
7.1 KiB
Python
Executable File
230 lines
7.1 KiB
Python
Executable File
import re
|
|
import base64
|
|
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
|
|
from docling.datamodel.base_models import InputFormat
|
|
from docling.datamodel.pipeline_options import (
|
|
PdfPipelineOptions,
|
|
)
|
|
from docling_core.types.io import DocumentStream
|
|
from docling.datamodel.settings import settings
|
|
from docling.document_converter import DocumentConverter, PdfFormatOption
|
|
from docling_core.types.doc.base import ImageRefMode
|
|
from langchain_core.messages import HumanMessage, SystemMessage
|
|
from langchain_openai import ChatOpenAI
|
|
from llm import set_api_key, get_model_name, get_temperature, get_base_url, get_max_retries
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
import base64
|
|
|
|
|
|
def save_md_images(
|
|
output: str | Path,
|
|
md_content: str,
|
|
images: dict[str, bytes],
|
|
md_name: str = "index.md",
|
|
images_dirname: str = "images",
|
|
):
|
|
output = Path(output)
|
|
md_path = output.joinpath(md_name)
|
|
md_path.parent.mkdir(exist_ok=True, parents=True)
|
|
images_dir = output.joinpath(images_dirname)
|
|
images_dir.mkdir(exist_ok=True, parents=True)
|
|
for image_name in images.keys():
|
|
image_path = images_dir.joinpath(Path(image_name).name)
|
|
with open(image_path, "wb") as image_file:
|
|
image_file.write(images[image_name])
|
|
md_content = md_content.replace(
|
|
f"]({image_name})",
|
|
f"]({image_path.relative_to(md_path.parent, walk_up=True)})",
|
|
)
|
|
with open(md_path, "w") as md_file:
|
|
md_file.write(md_content)
|
|
|
|
|
|
def load_md_file(md_path: str | Path) -> tuple[str, dict[str, bytes]]:
|
|
md_path = Path(md_path)
|
|
with open(md_path, "r") as md_file:
|
|
md = md_file.read()
|
|
images: list[str] = re.findall(r"!\[.*?\]\((.*?)\)", md)
|
|
image_dict: dict[str, bytes] = dict()
|
|
for i in range(len(images)):
|
|
image_path = images[i]
|
|
if image_path.startswith("data:image/png;base64,"):
|
|
image_dict[f"{i}.png"] = image_path.removeprefix(
|
|
"data:image/png;base64,"
|
|
).encode("UTF-8")
|
|
else:
|
|
with open(
|
|
Path(md_path.parent).joinpath(image_path), "rb"
|
|
) as image_file:
|
|
image_dict[image_path] = image_file.read()
|
|
return (md, image_dict)
|
|
|
|
|
|
def convert_pdf_to_markdown(pdf: bytes) -> tuple[str, dict[str, bytes]]:
|
|
"""Converts a PDF document to Markdown format."""
|
|
|
|
accelerator_options = AcceleratorOptions(
|
|
num_threads=16, device=AcceleratorDevice.CUDA
|
|
)
|
|
|
|
pipeline_options = PdfPipelineOptions()
|
|
pipeline_options.accelerator_options = accelerator_options
|
|
pipeline_options.do_ocr = True
|
|
pipeline_options.do_table_structure = True
|
|
pipeline_options.table_structure_options.do_cell_matching = True
|
|
pipeline_options.generate_page_images = True
|
|
pipeline_options.generate_picture_images = True
|
|
|
|
converter = DocumentConverter(
|
|
format_options={
|
|
InputFormat.PDF: PdfFormatOption(
|
|
pipeline_options=pipeline_options,
|
|
)
|
|
}
|
|
)
|
|
|
|
# Enable the profiling to measure the time spent
|
|
settings.debug.profile_pipeline_timings = True
|
|
|
|
# Convert the document
|
|
conversion_result = converter.convert(
|
|
source=DocumentStream(name="", stream=BytesIO(pdf))
|
|
)
|
|
doc = conversion_result.document
|
|
|
|
doc.pictures
|
|
|
|
md = doc.export_to_markdown(
|
|
image_mode=ImageRefMode.EMBEDDED,
|
|
)
|
|
|
|
images: list[str] = re.findall(r"!\[Image\]\((.*?)\)", md)
|
|
image_dict: dict[str, bytes] = dict()
|
|
for i in range(len(images)):
|
|
data = images[i].removeprefix("data:image/png;base64,")
|
|
img_data = base64.b64decode(data)
|
|
image_dict[f"{i}.png"] = img_data
|
|
md = md.replace(images[i], f"{i}.png")
|
|
|
|
return (md, image_dict)
|
|
|
|
|
|
def refine_content(md: str, images: dict[str, bytes], pdf: bytes) -> str:
|
|
"""Refines the Markdown content using an LLM."""
|
|
|
|
set_api_key()
|
|
|
|
try:
|
|
kwargs = {
|
|
"model": get_model_name(),
|
|
"temperature": get_temperature(),
|
|
}
|
|
base_url = get_base_url()
|
|
if base_url:
|
|
kwargs["base_url"] = base_url
|
|
kwargs["max_retries"] = get_max_retries()
|
|
llm = ChatOpenAI(**kwargs)
|
|
except Exception as e:
|
|
raise BaseException(
|
|
f"Error initializing LLM. Make sure your configuration is correct. Error: {e}"
|
|
)
|
|
|
|
with open("pdf_convertor_prompt.md", "r") as f:
|
|
prompt = f.read()
|
|
|
|
# 添加 Markdown
|
|
human_message_parts = []
|
|
|
|
human_message_parts.append(
|
|
{
|
|
"type": "text",
|
|
"text": "I will provide you with: \n1. Original Markdown text (with image references)\n2. Individual images with their exact filenames\n3. Original PDF for reference\n---\nOriginal Markdown text:\n",
|
|
}
|
|
)
|
|
|
|
human_message_parts.append(
|
|
{
|
|
"type": "text",
|
|
"text": md,
|
|
}
|
|
)
|
|
|
|
# 添加图片
|
|
human_message_parts.append(
|
|
{
|
|
"type": "text",
|
|
"text": "\n\n---\n\nIndividual images (use ONLY these to describe the corresponding Markdown image references):\n\n",
|
|
}
|
|
)
|
|
for image_name in images.keys():
|
|
human_message_parts.append(
|
|
{
|
|
"type": "text",
|
|
"text": f"=== START OF IMAGE: '{image_name}' ===\n",
|
|
}
|
|
)
|
|
human_message_parts.append(
|
|
{
|
|
"type": "image",
|
|
"base64": base64.b64encode(images[image_name]).decode("utf-8"),
|
|
"mime_type": "image/png",
|
|
}
|
|
)
|
|
human_message_parts.append(
|
|
{
|
|
"type": "text",
|
|
"text": f"=== END OF IMAGE: '{image_name}' ===\n\n",
|
|
}
|
|
)
|
|
|
|
# 添加 PDF
|
|
human_message_parts.append(
|
|
{
|
|
"type": "text",
|
|
"text": "\n---\n\nOriginal PDF (for overall layout and context reference only):\n\n",
|
|
}
|
|
)
|
|
|
|
human_message_parts.extend(
|
|
[
|
|
{
|
|
"type": "file",
|
|
"base64": base64.b64encode(pdf).decode("utf-8"),
|
|
"mime_type": "application/pdf",
|
|
"filename": "origin.pdf",
|
|
},
|
|
]
|
|
)
|
|
|
|
message_content = [
|
|
SystemMessage(content=prompt),
|
|
HumanMessage(content=human_message_parts), # type: ignore
|
|
]
|
|
|
|
print(
|
|
"Sending request to LLM with the PDF, Markdown and referenced images... This may take a moment."
|
|
)
|
|
try:
|
|
response = llm.invoke(message_content)
|
|
refined_content = response.content
|
|
except Exception as e:
|
|
raise BaseException(f"An error occurred while invoking the LLM: {e}")
|
|
|
|
if str(refined_content) == "":
|
|
raise BaseException("Response of LLM is empty")
|
|
|
|
return fix_output(str(refined_content))
|
|
|
|
|
|
def fix_output(md: str) -> str:
|
|
if not md.startswith("['") or not md.endswith("']"):
|
|
return md
|
|
md = md.removeprefix("['")
|
|
md = md.removesuffix("']")
|
|
md = md.replace("\\n", "\n")
|
|
md = md.replace("\", '", "\n\n")
|
|
md = md.replace("', \"", "\n\n")
|
|
md = md.replace("', '", "\n\n")
|
|
return md
|