Files
rag_jrxml/md_chunker.py
T
panda 0787901acc feat: 添加Markdown分块器与统一批量分块入口,支持增量向量化与导入
- 新增 md_chunker.py: Markdown语义分块引擎,支持标题/代码块/表格智能拆分
- 新增 batch_chunker.py: 统一批量分块入口,支持JRXML+Markdown混合处理
- 新增 requirements.txt: 整理项目依赖
- embed_chunks.py: 新增 --incremental 增量模式,追加新向量到已有数据
- import_to_chroma.py: 新增 --incremental 增量模式,不再每次清空数据库
- 更新 README.md 与 docs/file_guide.md 反映最新架构

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 11:10:25 +08:00

358 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
md_chunker.py
Markdown 语义分块器
支持标题层级、代码块、表格等元素的智能分块
"""
import json
import os
import re
from typing import List, Dict, Tuple
from pathlib import Path
from dataclasses import dataclass, field, asdict
@dataclass
class MDChunk:
"""Single Markdown chunk data structure"""
chunk_id: int
chunk_type: str
human_description: str
raw_content: str
context: str
metadata: Dict = field(default_factory=dict)
class MarkdownSemanticChunker:
"""
Markdown 语义分块器 v1.0
分块策略:
1. 按标题层级(H1/H2/H3...)划分大段落
2. 代码块作为独立 chunk
3. 表格作为独立 chunk
4. 过长段落内部按句子/段落二次拆分
"""
# Heading patterns
HEADING_PATTERN = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE)
# Code block pattern (fenced)
CODE_BLOCK_PATTERN = re.compile(r'```(\w*)\n([\s\S]*?)```', re.MULTILINE)
# Inline code pattern
INLINE_CODE_PATTERN = re.compile(r'`([^`]+)`')
# Table pattern
TABLE_PATTERN = re.compile(r'\|.+\|\n\|[-| :]+\|\n((?:\|.+\|\n)*)', re.MULTILINE)
# List pattern
LIST_PATTERN = re.compile(r'^(\s*[-*+]\s+.+)+', re.MULTILINE)
def __init__(self, max_chunk_size: int = 2000):
self.max_chunk_size = max_chunk_size
def chunk_file(self, file_path: str) -> List[Dict]:
"""处理单个 Markdown 文件"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_name = Path(file_path).stem
chunks = []
chunk_id = 0
# 尝试提取文档标题(第一个 H1
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
doc_title = title_match.group(1).strip() if title_match else file_name
# 按结构化元素分割
segments = self._split_by_structure(content)
for segment in segments:
seg_type = segment['type']
seg_content = segment['content']
if not seg_content.strip():
continue
# 构建描述
description = self._build_description(seg_type, seg_content, doc_title)
# 如果超过最大长度,尝试二次拆分
if len(seg_content) > self.max_chunk_size:
sub_chunks = self._split_large_chunk(
seg_content, seg_type, doc_title, chunk_id
)
chunks.extend([asdict(c) for c in sub_chunks])
chunk_id += len(sub_chunks)
else:
chunks.append(asdict(MDChunk(
chunk_id=chunk_id,
chunk_type=seg_type,
human_description=description,
raw_content=seg_content.strip(),
context=f"{doc_title}",
metadata=segment.get('metadata', {})
)))
chunk_id += 1
return chunks
def _split_by_structure(self, content: str) -> List[Dict]:
"""
按 Markdown 结构分割内容
返回: [{'type': 'h1/h2/code/table/paragraph', 'content': '...', 'metadata': {...}}]
"""
segments = []
# 首先提取所有代码块(保留位置标记,稍后处理)
code_blocks = []
code_pattern = re.compile(r'(```\w*\n[\s\S]*?```)', re.MULTILINE)
last_end = 0
for match in code_pattern.finditer(content):
# 处理代码块前的普通文本
before = content[last_end:match.start()]
if before.strip():
segments.extend(self._process_text_section(before))
# 添加代码块
code_blocks.append(match.group(1))
lang_match = re.match(r'```(\w*)', match.group(1))
lang = lang_match.group(1) if lang_match else ''
segments.append({
'type': 'code',
'content': match.group(1),
'metadata': {'language': lang}
})
last_end = match.end()
# 处理剩余文本
remaining = content[last_end:]
if remaining.strip():
segments.extend(self._process_text_section(remaining))
return segments
def _process_text_section(self, text: str) -> List[Dict]:
"""处理普通文本区域,提取标题和段落"""
segments = []
# 按标题分割
lines = text.split('\n')
current_section = []
current_heading_level = 0
current_heading = ''
for line in lines:
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
if heading_match:
# 保存之前的段落
if current_section:
section_text = '\n'.join(current_section).strip()
if section_text:
segments.append({
'type': self._get_section_type(current_heading_level, current_heading),
'content': section_text,
'metadata': {
'heading': current_heading,
'heading_level': current_heading_level
}
})
current_section = []
# 开始新标题区域
current_heading_level = len(heading_match.group(1))
current_heading = heading_match.group(2).strip()
else:
current_section.append(line)
# 保存最后一段
if current_section:
section_text = '\n'.join(current_section).strip()
if section_text:
segments.append({
'type': self._get_section_type(current_heading_level, current_heading),
'content': section_text,
'metadata': {
'heading': current_heading,
'heading_level': current_heading_level
}
})
return segments
def _get_section_type(self, level: int, heading: str) -> str:
"""根据标题级别和内容确定段落类型"""
heading_lower = heading.lower()
if level == 1:
return 'section_h1'
elif level == 2:
# 检测特殊章节类型
if any(kw in heading_lower for kw in ['install', '安装', 'setup', '部署']):
return 'section_installation'
elif any(kw in heading_lower for kw in ['config', '配置', 'setting']):
return 'section_configuration'
elif any(kw in heading_lower for kw in ['api', '接口']):
return 'section_api'
elif any(kw in heading_lower for kw in ['example', '示例', 'usage', '使用']):
return 'section_example'
elif any(kw in heading_lower for kw in ['faq', 'question', '问题', '常见']):
return 'section_faq'
elif any(kw in heading_lower for kw in ['changelog', '更新', 'release']):
return 'section_changelog'
return 'section_h2'
elif level == 3:
return 'section_h3'
else:
return 'section_other'
def _build_description(self, chunk_type: str, content: str, doc_title: str) -> str:
"""为 chunk 生成人类可读描述"""
lines = content.split('\n')[:5]
preview = ' '.join(line.strip() for line in lines if line.strip())[:150]
if chunk_type == 'code':
lang = ''
lang_match = re.match(r'```(\w*)', content)
if lang_match:
lang = lang_match.group(1) or 'text'
return f"Code block (language: {lang}) in {doc_title}. Preview: {preview}"
elif chunk_type.startswith('section_'):
heading = content.split('\n')[0] if '\n' in content else content[:50]
heading_clean = re.sub(r'^#+\s+', '', heading)
type_hint = chunk_type.replace('section_', '')
return f"[{type_hint.upper()}] {heading_clean}. Content: {preview}"
else:
return f"Document section in {doc_title}. Content: {preview}"
def _split_large_chunk(self, content: str, chunk_type: str,
doc_title: str, start_id: int) -> List[MDChunk]:
"""对过长的 chunk 进行二次拆分"""
chunks = []
# 按段落分割(双换行符)
paragraphs = re.split(r'\n\n+', content)
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size > self.max_chunk_size and current_chunk:
# 当前块已满,生成 chunk
chunk_text = '\n\n'.join(current_chunk)
chunks.append(MDChunk(
chunk_id=start_id + len(chunks),
chunk_type=f"{chunk_type}_part",
human_description=f"Part of {doc_title} ({chunk_type}): {chunk_text[:100]}...",
raw_content=chunk_text,
context=f"{doc_title} (continued)",
metadata={'part': len(chunks) + 1, 'original_type': chunk_type}
))
current_chunk = []
current_size = 0
current_chunk.append(para)
current_size += para_size + 2
# 处理剩余内容
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(MDChunk(
chunk_id=start_id + len(chunks),
chunk_type=f"{chunk_type}_part",
human_description=f"Part of {doc_title} ({chunk_type}): {chunk_text[:100]}...",
raw_content=chunk_text,
context=f"{doc_title} (continued)",
metadata={'part': len(chunks) + 1, 'original_type': chunk_type}
))
return chunks if chunks else [MDChunk(
chunk_id=start_id,
chunk_type=chunk_type,
human_description=f"{doc_title}: {content[:100]}...",
raw_content=content[:self.max_chunk_size],
context=doc_title,
metadata={'truncated': True}
)]
def chunk_directory(self, dir_path: str, extensions: tuple = ('.md', '.markdown')) -> List[Dict]:
"""批量处理目录下所有 Markdown 文件"""
all_chunks = []
file_count = 0
for root, _, files in os.walk(dir_path):
for file in files:
if file.lower().endswith(extensions):
file_path = os.path.join(root, file)
try:
chunks = self.chunk_file(file_path)
all_chunks.extend(chunks)
file_count += 1
print(f"OK {file_path}: {len(chunks)} chunks")
except Exception as e:
print(f"FAIL {file_path}: {e}")
print(f"\nTotal: {file_count} files, {len(all_chunks)} chunks")
return all_chunks
def save_chunks_to_json(chunks: List[Dict], output_path: str):
"""保存 chunks 到 JSON 文件"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(chunks, f, ensure_ascii=False, indent=2)
print(f"Saved {len(chunks)} chunks to {output_path}")
def print_chunk_summary(chunks: List[Dict]):
"""打印 chunk 类型统计"""
type_counts = {}
for chunk in chunks:
chunk_type = chunk["chunk_type"]
type_counts[chunk_type] = type_counts.get(chunk_type, 0) + 1
print("\nChunk Type Summary:")
for chunk_type, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {chunk_type}: {count}")
if __name__ == "__main__":
import sys
chunker = MarkdownSemanticChunker(max_chunk_size=2000)
if len(sys.argv) > 1:
path = sys.argv[1]
if os.path.isdir(path):
all_chunks = chunker.chunk_directory(path)
output_path = os.path.join(os.path.dirname(path.rstrip("/\\")) if os.path.dirname(path) else ".",
os.path.basename(path.rstrip("/\\")) + "_md_chunks.json")
save_chunks_to_json(all_chunks, output_path)
print_chunk_summary(all_chunks)
else:
chunks = chunker.chunk_file(path)
output_path = path.replace(".md", "_chunks.json").replace(".markdown", "_chunks.json")
save_chunks_to_json(chunks, output_path)
print(f"\n{'='*60}")
print("Chunking Results Preview")
print(f"{'='*60}")
for chunk in chunks[:10]:
print(f"\n[Chunk {chunk['chunk_id']}] Type: {chunk['chunk_type']}")
print(f"Description: {chunk['human_description'][:120]}...")
print(f"Content length: {len(chunk['raw_content'])} chars")
if len(chunks) > 10:
print(f"\n... and {len(chunks) - 10} more chunks")
print_chunk_summary(chunks)
else:
print("=" * 60)
print("Markdown Semantic Chunking v1.0")
print("=" * 60)
print("\nUsage: python md_chunker.py <md_file_or_directory>")