Elasticsearch 批量写入

Elasticsearch 批量写入的工程演进:从手写 HTTP 到 BulkIndexer(以及我踩过的坑)

在做 Elasticsearch 大规模数据写入时,我完整经历了三种实现方式:

  1. 手动拼接 HTTP Bulk 请求
  2. 使用官方 client.Bulk() API
  3. 使用 esutil.BulkIndexer

这篇文章不是教程,而是一次真实工程演进的复盘
👉 每一版为什么存在
👉 每一版解决了什么问题
👉 以及它为什么最终会被下一版替代

如果你现在也在写 ES 批量写入,希望这篇文章能帮你少踩一次坑


一、第一阶段:手动 HTTP Bulk —— 能跑,但非常危险

1️⃣ 为什么一开始会这么写?

最初需求其实很简单:

  • 批量写入 ES
  • 控制单次请求大小(避免 OOM)
  • 快速实现

于是最直接的方案就是:
👉 直接调用 Elasticsearch 的 _bulk HTTP API

核心思路:

  • bytes.Buffer 手动拼接 NDJSON
  • 超过大小阈值就 POST 一次
1
2
3
4
{ action }
{ document }
{ action }
{ document }

2️⃣ 这个方案的优点

说实话,第一版并不是“错误设计”:

  • ✅ 非常直观
  • ✅ 不依赖 SDK
  • ✅ 适合 PoC / 验证阶段
  • ✅ 对 Bulk 协议理解最深入

3️⃣ 致命问题:HTTP 200 ≠ 写入成功

真正的问题,出在 Bulk API 的返回语义

在最初的实现中,我只做了这件事:

1
2
3
if resp.StatusCode != http.StatusOK {
return error
}

并简单打印了一点 response body。

结果是一个非常隐蔽、但极其危险的坑:

Bulk 请求 HTTP 返回 200,但部分文档写入失败


4️⃣ 为什么这是一个“事故级”问题?

Bulk API 的真实返回是这样的:

1
2
3
4
5
6
7
8
{
"took": 30,
"errors": true,
"items": [
{ "index": { "status": 201 } },
{ "index": { "status": 409, "error": { ... } } }
]
}

关键点只有一个:

失败信息在 response body,而不在 HTTP status

而我当时:

  • ❌ 没有完整读取 body
  • ❌ 没有解析 errors
  • ❌ 没有逐条检查 items

结果就是:

任务“成功结束”,但 ES 里实际数据少了一部分

这是典型的 静默数据丢失,比直接报错危险得多。


二、第二阶段:client.Bulk() —— 正确,但还不够工程化

1️⃣ 为什么要换?

第一版最大的问题不是性能,而是可靠性
于是我转向了 官方 Go SDK 提供的 Bulk API

1
2
3
bulk := client.Bulk().Index(index)
bulk.IndexOp(op, doc)
bulk.Do(ctx)

2️⃣ 这一版解决了什么?

✅ SDK 自动解析 Bulk 响应
✅ 明确暴露 res.Errors
✅ 不可能再忽略部分失败
✅ 不需要手写 NDJSON

1
2
3
4
res, err := bulk.Do(ctx)
if res.Errors {
return fmt.Errorf("bulk response contains errors")
}

这是一个非常重要的进步
👉 从“可能悄悄丢数据”变成“失败一定能感知”


3️⃣ 但新的问题也很明显

随着数据量继续增长,工程问题开始暴露:

❌ flush 策略需要自己控制

  • 按条数?
  • 按字节?
  • 按时间?

❌ 并发写入要自己管理

  • goroutine
  • channel
  • backpressure

❌ 缺少全局统计信息

  • 总写入条数
  • 失败条数
  • flush 次数

这时我意识到:

client.Bulk() 更像“底层构件”,而不是生产级工具


三、第三阶段:esutil.BulkIndexer —— 生产级方案

1️⃣ 为什么最终选择 BulkIndexer?

esutil.BulkIndexer 本质上是 官方给出的“Bulk 最终形态”,它解决的正是前两版遗留的问题:

  • 自动 batching
  • 自动 flush
  • worker 池并发
  • 背压控制
  • 统一统计
  • 明确的失败回调

一句话总结:

你只管 Add,剩下的工程问题交给 BulkIndexer


2️⃣ 核心使用方式

1
2
3
4
5
6
7
8
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,

NumWorkers: 5,
FlushBytes: 5 << 20,
FlushInterval: 30 * time.Second,
})

写入时:

1
2
3
4
bi.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(body),
})

3️⃣ BulkIndexer 如何避免“静默失败”?

这是它最重要的价值之一:

  • 每条失败都会进入 OnFailure
  • 全局失败可以通过 Stats() 感知
1
stats := bi.Stats()

你可以明确知道:

  • 加了多少
  • 成功多少
  • 失败多少
  • flush 了几次

数据一致性终于变成“可验证的事实”


四、三种方案的本质对比

方案 本质定位 最大风险
手写 HTTP Bulk 协议级 静默数据丢失
client.Bulk() API 级 工程复杂度
BulkIndexer 工具级 几乎没有

五、我的最终结论(血泪版)

Bulk API 最危险的不是性能,而是“你以为成功了”

HTTP 200 在 Bulk 语义下毫无意义

如果你没有解析 Bulk Response,你根本不知道自己写没写成功

因此:

  • PoC / 学习:可以手写 HTTP
  • 中等规模任务client.Bulk() 勉强可用
  • 生产环境 / 长期任务直接用 esutil.BulkIndexer

六、写在最后

这次演进给我最大的感受是:

工程问题不是“能不能写进去”,而是“你怎么确认它真的写进去了”

BulkIndexer 并不是为了省代码,
而是为了避免你在凌晨两点对账时,才发现数据少了一截

如果你现在还在:

  • 手写 NDJSON
  • 只看 HTTP status
  • 自己控制 flush

那你很可能正站在我曾经踩过的那个坑边上。

早点跳出来,会轻松很多。