技术拆解-LangExtract

以下内容基于LangExtract源码,由 Claude-4-Sonnet 生成。

项目概述

LangExtract 是由 Google 开发的一个 Python 库,专门用于使用大型语言模型(LLMs)从非结构化文本中提取结构化信息。该项目提供了一个完整的信息抽取流水线,具有高度的准确性和可扩展性。

核心特性

  1. 精确的源文本定位映射 - 将每个提取结果映射到源文本的确切位置
  2. 可靠的结构化输出 - 基于少样本示例强制执行一致的输出模式
  3. 长文档处理优化 - 通过文本分块、并行处理和多轮提取解决”大海捞针”问题
  4. 交互式可视化 - 生成独立的 HTML 文件来可视化提取结果
  5. 灵活的 LLM 支持 - 支持 Gemini、Ollama 和其他语言模型
  6. 领域适应性 - 通过少样本示例适应任何领域

技术架构

整体架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
                用户输入

┌─────────────┐
extract() │ ← 主入口函数
│ __init__ │
└─────────────┘

┌─────────────┐
│ Annotator │ ← 核心注释器
└─────────────┘

┌─────────────┐ ↓ ┌─────────────┐
│ Chunking │←──────→│ Prompting │
│ 分块器 │ │ 提示生成 │
└─────────────┘ └─────────────┘
↓ ↓
┌─────────────┐ ┌─────────────┐
│ Tokenizer │ │ Inference │
│ 标记化 │ │ LLM推理
└─────────────┘ └─────────────┘
↓ ↓
┌─────────────┐ ┌─────────────┐
│ Resolver │←───────│ Schema
│ 结果解析 │ │ 模式约束 │
└─────────────┘ └─────────────┘

┌─────────────┐
│Visualization│
│ 可视化 │
└─────────────┘

核心模块分析

1. 数据结构层 (data.py)

定义了整个系统的核心数据结构:

1
2
3
4
5
6
7
8
@dataclasses.dataclass
class Extraction:
"""表示从文本中提取的信息实体"""
extraction_class: str # 提取类别
extraction_text: str # 提取的文本
char_interval: CharInterval # 字符区间位置
alignment_status: AlignmentStatus # 对齐状态
attributes: dict # 属性字典

关键设计亮点:

  • 使用 CharIntervalTokenInterval 实现精确的位置追踪
  • 支持模糊对齐(MATCH_FUZZY)和精确对齐(MATCH_EXACT
  • 每个文档自动生成唯一标识符

2. 文本分块层 (chunking.py)

核心算法: 智能文本分块策略

1
2
3
4
5
6
7
class ChunkIterator:
"""将长文档分解为可处理的文本块"""

def _tokens_exceed_buffer(self, token_interval) -> bool:
"""检查标记区间是否超出最大缓冲区大小"""
char_interval = get_char_interval(self.tokenized_text, token_interval)
return (char_interval.end_pos - char_interval.start_pos) > self.max_char_buffer

分块策略:

  1. 句子优先: 优先保持句子完整性
  2. 换行感知: 在换行处优雅切分
  3. 标记边界: 避免在单词中间切分
  4. 单词保护: 超长单词单独成块

3. 提示工程层 (prompting.py)

设计模式: Few-shot Learning + 结构化提示

1
2
3
4
5
6
7
8
9
10
class QAPromptGenerator:
"""生成问答式提示"""

def render(self, question: str, additional_context: str = None) -> str:
"""生成完整的提示文本"""
# 模板结构:
# 1. 任务描述
# 2. 附加上下文 (可选)
# 3. 示例 (Few-shot)
# 4. 当前问题

提示结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
任务描述:Extract characters, emotions, and relationships...

示例:
Q: "ROMEO. But soft! What light through yonder window breaks?"
A: ```json
{
"extractions": [
{
"character": "ROMEO",
"character_attributes": {"emotional_state": "wonder"}
}
]
}

Q: [当前文本]
A:

1
2
3
4
5
6
7
8
9
10
11
12

#### 4. 推理层 (`inference.py`)

**多后端支持架构:**

```python
class BaseLanguageModel(abc.ABC):
"""抽象基类,支持多种LLM后端"""

@abc.abstractmethod
def infer(self, batch_prompts: Sequence[str]) -> Iterator[Sequence[ScoredOutput]]:
"""执行批量推理"""

具体实现:

  • GeminiLanguageModel: 支持结构化输出和并行处理
  • OllamaLanguageModel: 本地模型支持
  • LangFunLanguageModel: 通用 LangFun 接口

并行处理优化:

1
2
3
4
5
6
# Gemini 模型的并行推理实现
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_index = {
executor.submit(self._process_single_prompt, prompt, config.copy()): i
for i, prompt in enumerate(batch_prompts)
}

5. 解析对齐层 (resolver.py)

双重对齐算法:

  1. 精确对齐 (基于 difflib.SequenceMatcher)
  2. 模糊对齐 (滑动窗口 + 相似度计算)
1
2
3
4
5
6
def _fuzzy_align_extraction(self, extraction, source_tokens, ...):
"""模糊对齐算法"""
# 1. 标记化提取文本
# 2. 滑动窗口搜索最佳匹配
# 3. 使用 SequenceMatcher 计算相似度
# 4. 选择最高分匹配 (≥ threshold)

对齐状态机:

  • MATCH_EXACT: 完全匹配
  • MATCH_LESSER: 部分匹配(提取文本较长)
  • MATCH_FUZZY: 模糊匹配(相似度 ≥ 阈值)
  • None: 无法对齐

6. 模式约束层 (schema.py)

Gemini 结构化输出:

1
2
3
4
5
6
7
8
9
class GeminiSchema(BaseSchema):
"""为 Gemini 生成 JSON Schema 约束"""

@classmethod
def from_examples(cls, examples_data):
"""从示例数据自动生成模式"""
# 分析示例中的属性类型
# 生成对应的 JSON Schema
# 支持嵌套对象和数组

生成的 Schema 结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"type": "object",
"properties": {
"extractions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"character": { "type": "string" },
"character_attributes": {
"type": "object",
"properties": { "emotional_state": { "type": "string" } },
"nullable": true
}
}
}
}
},
"required": ["extractions"]
}

核心算法详解

1. 多轮提取算法 (annotation.py)

问题: 单轮提取可能遗漏信息
解决方案: 多轮独立提取 + 非重叠合并

1
2
3
4
5
6
7
8
9
10
11
12
13
def _annotate_documents_sequential_passes(self, documents, extraction_passes=3):
"""多轮提取算法"""
document_extractions_by_pass = {}

# 执行多轮独立提取
for pass_num in range(extraction_passes):
for annotated_doc in self._annotate_documents_single_pass(...):
document_extractions_by_pass[doc_id].append(annotated_doc.extractions)

# 合并非重叠结果(第一轮优先)
for doc_id, all_pass_extractions in document_extractions_by_pass.items():
merged_extractions = _merge_non_overlapping_extractions(all_pass_extractions)
yield AnnotatedDocument(extractions=merged_extractions)

合并策略:

  • 第一轮提取结果优先
  • 后续轮次只添加非重叠的新发现
  • 基于字符位置判断重叠

2. 智能分块算法

三层分块策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def __next__(self) -> TextChunk:
"""智能分块算法"""
sentence = next(self.sentence_iter)

# 策略1: 单个标记超限时独立成块
if self._tokens_exceed_buffer(single_token_chunk):
return single_token_chunk

# 策略2: 在句子内部分块,优先换行位置
for token_index in range(sentence.start_index, sentence.end_index):
if token.first_token_after_newline:
preferred_break_point = token_index

if chunk_would_exceed_buffer:
return chunk_ending_at_preferred_break_point

# 策略3: 跨句子合并(完整句子优先)
for next_sentence in self.sentence_iter:
if combined_chunk_exceeds_buffer:
break
else:
extend_current_chunk()

3. 模糊对齐算法

滑动窗口优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _fuzzy_align_extraction(self, extraction, source_tokens, threshold=0.75):
"""高效模糊对齐算法"""
extraction_tokens = tokenize(extraction.extraction_text)
extraction_counts = Counter(extraction_tokens)
min_overlap = int(len(extraction_tokens) * threshold)

# 滑动窗口搜索
for window_size in range(len(extraction_tokens), len(source_tokens) + 1):
window_deque = deque(source_tokens[0:window_size])
window_counts = Counter(window_deque)

for start_idx in range(len(source_tokens) - window_size + 1):
# 快速预检:检查重叠标记数量
if (extraction_counts & window_counts).total() >= min_overlap:
# 精确计算相似度
ratio = sequence_matcher.ratio()
if ratio > best_ratio:
best_match = (start_idx, window_size, ratio)

# 滑动窗口
slide_window_right()

return best_match if best_ratio >= threshold else None

技术栈分析

核心依赖

依赖库 版本要求 用途 核心功能
google-genai ≥0.1.0 Gemini API 结构化输出、并行推理
langfun ≥0.1.0 通用 LLM 接口 多模型支持
pydantic ≥1.8.0 数据验证 配置解析、类型检查
aiohttp ≥3.8.0 异步 HTTP API 调用
difflib 内置 序列匹配 对齐算法核心
more-itertools ≥8.0.0 迭代工具 批处理优化

开发工具链

1
2
3
4
5
6
7
8
9
# pyproject.toml 开发依赖
[project.optional-dependencies]
dev = [
"black>=23.7.0", # 代码格式化
"pylint>=2.17.5", # 静态分析
"pytest>=7.4.0", # 测试框架
"pytype>=2024.10.11", # 类型检查
"tox>=4.0.0", # 多环境测试
]

测试配置

1
2
3
4
5
6
7
8
# tox.ini - 多版本测试
[tox]
envlist = py310, py311

[testenv]
commands =
pylint --rcfile=.pylintrc langextract tests
pytest -q

使用流程详解

1. 基础使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import langextract as lx

# 第一步:定义提取任务
prompt = "Extract characters and emotions from the text."

# 第二步:提供示例
examples = [
lx.data.ExampleData(
text="ROMEO. But soft! What light breaks?",
extractions=[
lx.data.Extraction(
extraction_class="character",
extraction_text="ROMEO",
attributes={"emotional_state": "wonder"}
)
]
)
]

# 第三步:执行提取
result = lx.extract(
text_or_documents="Lady Juliet gazed longingly at the stars",
prompt_description=prompt,
examples=examples,
model_id="gemini-2.5-flash"
)

# 第四步:保存和可视化
lx.io.save_annotated_documents([result], "results.jsonl")
html_content = lx.visualize("results.jsonl")

2. 高级配置流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 长文档处理配置
result = lx.extract(
text_or_documents="https://example.com/long-document.txt",
prompt_description=prompt,
examples=examples,
model_id="gemini-2.5-flash",

# 性能优化参数
extraction_passes=3, # 多轮提取提高召回率
max_workers=20, # 并行处理加速
max_char_buffer=1000, # 较小上下文提高准确性
batch_length=10, # 批处理大小

# 输出控制参数
format_type=data.FormatType.JSON,
use_schema_constraints=True,
fence_output=False,
temperature=0.5
)

3. 内部处理流程

sequenceDiagram
    participant U as User
    participant E as extract()
    participant A as Annotator
    participant C as ChunkIterator
    participant P as PromptGenerator
    participant I as Inference
    participant R as Resolver

    U->>E: 调用 extract()
    E->>A: 创建 Annotator
    A->>C: 文档分块

    loop 批处理循环
        C->>P: 生成提示
        P->>I: LLM 推理
        I->>R: 解析结果
        R->>A: 对齐文本
    end

    A->>E: 返回注释文档
    E->>U: 返回结果

性能优化策略

1. 并行处理优化

批处理策略:

1
2
3
# 最优配置建议
batch_length = max_workers # 充分利用并行性
max_workers = min(20, cpu_count()) # 避免过度并发

内存优化:

  • 使用迭代器避免大文档全量加载
  • 分块处理降低内存峰值
  • 惰性计算标记化结果

2. API 调用优化

速率限制处理:

  • Gemini Tier 2 配额用于生产环境
  • 指数退避重试机制
  • 并发控制避免限流

成本控制:

1
2
3
4
5
6
7
# 成本估算公式
total_cost = (
num_chunks *
avg_tokens_per_chunk *
extraction_passes *
token_price
)

3. 准确性优化

分块策略优化:

  • 较小的 max_char_buffer (1000-2000) 提高准确性
  • 保持句子完整性减少上下文丢失
  • 多轮提取 (extraction_passes=3) 提高召回率

对齐算法优化:

  • 模糊对齐阈值调整 (0.75-0.85)
  • 标记标准化提高匹配率
  • 滑动窗口优化减少计算量

扩展和集成

1. 自定义 LLM 后端

1
2
3
4
5
6
7
8
9
class CustomLanguageModel(BaseLanguageModel):
"""自定义语言模型实现"""

def infer(self, batch_prompts):
# 实现自定义推理逻辑
responses = your_llm_api.batch_generate(batch_prompts)

for response in responses:
yield [ScoredOutput(score=1.0, output=response.text)]

2. 自定义解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class CustomResolver(AbstractResolver):
"""自定义结果解析器"""

def resolve(self, input_text):
# 实现自定义解析逻辑
parsed_data = custom_parse(input_text)
return [
data.Extraction(
extraction_class=item["class"],
extraction_text=item["text"],
attributes=item.get("attributes")
)
for item in parsed_data
]

3. 领域特化

医疗领域示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
medical_prompt = """
Extract medications, dosages, and routes of administration.
Use exact text from clinical notes. Include drug interactions if mentioned.
"""

medical_examples = [
lx.data.ExampleData(
text="Patient prescribed Metformin 500mg twice daily orally.",
extractions=[
lx.data.Extraction(
extraction_class="medication",
extraction_text="Metformin",
attributes={
"dosage": "500mg",
"frequency": "twice daily",
"route": "orally"
}
)
]
)
]

最佳实践建议

1. 提示设计原则

  • 明确性: 清晰描述提取目标和规则
  • 示例质量: 提供高质量、代表性示例
  • 属性丰富: 为每个实体提供有意义的属性
  • 避免重叠: 确保提取文本不重叠

2. 性能调优指南

小文档 (< 10K 字符):

1
2
3
4
5
6
config = {
"max_char_buffer": 5000,
"batch_length": 1,
"max_workers": 1,
"extraction_passes": 1
}

中等文档 (10K-100K 字符):

1
2
3
4
5
6
config = {
"max_char_buffer": 2000,
"batch_length": 5,
"max_workers": 10,
"extraction_passes": 2
}

大型文档 (> 100K 字符):

1
2
3
4
5
6
config = {
"max_char_buffer": 1000,
"batch_length": 20,
"max_workers": 20,
"extraction_passes": 3
}

3. 错误处理策略

1
2
3
4
5
6
7
8
9
10
11
try:
result = lx.extract(...)
except ValueError as e:
# API 密钥或参数错误
logging.error(f"Configuration error: {e}")
except requests.RequestException as e:
# 网络或 API 错误
logging.error(f"API error: {e}")
except inference.InferenceOutputError as e:
# LLM 推理错误
logging.error(f"Inference error: {e}")

总结

LangExtract 是一个设计优雅、功能强大的信息抽取框架,其核心优势在于:

  1. 模块化架构: 清晰的分层设计便于扩展和维护
  2. 高级算法: 智能分块、多轮提取、模糊对齐等先进技术
  3. 性能优化: 并行处理、批量推理、内存优化等性能策略
  4. 易用性: 简洁的 API 设计和丰富的配置选项
  5. 可扩展性: 支持多种 LLM 后端和自定义组件

该项目展现了现代 NLP 工程的最佳实践,为大规模文本信息抽取提供了完整的解决方案。无论是学术研究还是工业应用,LangExtract 都是一个值得深入学习和使用的优秀项目。