Elasticsearch 批量写入
Elasticsearch 批量写入的工程演进:从手写 HTTP 到 BulkIndexer(以及我踩过的坑)
在做 Elasticsearch 大规模数据写入时,我完整经历了三种实现方式:
- 手动拼接 HTTP Bulk 请求
- 使用官方
client.Bulk()API - 使用
esutil.BulkIndexer
这篇文章不是教程,而是一次真实工程演进的复盘:
👉 每一版为什么存在
👉 每一版解决了什么问题
👉 以及它为什么最终会被下一版替代
如果你现在也在写 ES 批量写入,希望这篇文章能帮你少踩一次坑。
一、第一阶段:手动 HTTP Bulk —— 能跑,但非常危险
1️⃣ 为什么一开始会这么写?
最初需求其实很简单:
- 批量写入 ES
- 控制单次请求大小(避免 OOM)
- 快速实现
于是最直接的方案就是:
👉 直接调用 Elasticsearch 的 _bulk HTTP API
核心思路:
- 用
bytes.Buffer手动拼接 NDJSON - 超过大小阈值就 POST 一次
1 | { action } |
2️⃣ 这个方案的优点
说实话,第一版并不是“错误设计”:
- ✅ 非常直观
- ✅ 不依赖 SDK
- ✅ 适合 PoC / 验证阶段
- ✅ 对 Bulk 协议理解最深入
3️⃣ 致命问题:HTTP 200 ≠ 写入成功
真正的问题,出在 Bulk API 的返回语义。
在最初的实现中,我只做了这件事:
1 | if resp.StatusCode != http.StatusOK { |
并简单打印了一点 response body。
结果是一个非常隐蔽、但极其危险的坑:
Bulk 请求 HTTP 返回 200,但部分文档写入失败
4️⃣ 为什么这是一个“事故级”问题?
Bulk API 的真实返回是这样的:
1 | { |
关键点只有一个:
❗ 失败信息在 response body,而不在 HTTP status
而我当时:
- ❌ 没有完整读取 body
- ❌ 没有解析
errors - ❌ 没有逐条检查
items
结果就是:
任务“成功结束”,但 ES 里实际数据少了一部分
这是典型的 静默数据丢失,比直接报错危险得多。
二、第二阶段:client.Bulk() —— 正确,但还不够工程化
1️⃣ 为什么要换?
第一版最大的问题不是性能,而是可靠性。
于是我转向了 官方 Go SDK 提供的 Bulk API:
1 | bulk := client.Bulk().Index(index) |
2️⃣ 这一版解决了什么?
✅ SDK 自动解析 Bulk 响应
✅ 明确暴露 res.Errors
✅ 不可能再忽略部分失败
✅ 不需要手写 NDJSON
1 | res, err := bulk.Do(ctx) |
这是一个非常重要的进步:
👉 从“可能悄悄丢数据”变成“失败一定能感知”
3️⃣ 但新的问题也很明显
随着数据量继续增长,工程问题开始暴露:
❌ flush 策略需要自己控制
- 按条数?
- 按字节?
- 按时间?
❌ 并发写入要自己管理
- goroutine
- channel
- backpressure
❌ 缺少全局统计信息
- 总写入条数
- 失败条数
- flush 次数
这时我意识到:
client.Bulk()更像“底层构件”,而不是生产级工具
三、第三阶段:esutil.BulkIndexer —— 生产级方案
1️⃣ 为什么最终选择 BulkIndexer?
esutil.BulkIndexer 本质上是 官方给出的“Bulk 最终形态”,它解决的正是前两版遗留的问题:
- 自动 batching
- 自动 flush
- worker 池并发
- 背压控制
- 统一统计
- 明确的失败回调
一句话总结:
你只管 Add,剩下的工程问题交给 BulkIndexer
2️⃣ 核心使用方式
1 | bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ |
写入时:
1 | bi.Add(ctx, esutil.BulkIndexerItem{ |
3️⃣ BulkIndexer 如何避免“静默失败”?
这是它最重要的价值之一:
- 每条失败都会进入
OnFailure - 全局失败可以通过
Stats()感知
1 | stats := bi.Stats() |
你可以明确知道:
- 加了多少
- 成功多少
- 失败多少
- flush 了几次
数据一致性终于变成“可验证的事实”。
四、三种方案的本质对比
| 方案 | 本质定位 | 最大风险 |
|---|---|---|
| 手写 HTTP Bulk | 协议级 | 静默数据丢失 |
| client.Bulk() | API 级 | 工程复杂度 |
| BulkIndexer | 工具级 | 几乎没有 |
五、我的最终结论(血泪版)
❗ Bulk API 最危险的不是性能,而是“你以为成功了”
❗ HTTP 200 在 Bulk 语义下毫无意义
❗ 如果你没有解析 Bulk Response,你根本不知道自己写没写成功
因此:
- PoC / 学习:可以手写 HTTP
- 中等规模任务:
client.Bulk()勉强可用 - 生产环境 / 长期任务:直接用
esutil.BulkIndexer
六、写在最后
这次演进给我最大的感受是:
工程问题不是“能不能写进去”,而是“你怎么确认它真的写进去了”
BulkIndexer 并不是为了省代码,
而是为了避免你在凌晨两点对账时,才发现数据少了一截。
如果你现在还在:
- 手写 NDJSON
- 只看 HTTP status
- 自己控制 flush
那你很可能正站在我曾经踩过的那个坑边上。
早点跳出来,会轻松很多。