ES 手动路由和自动路由的实现过程
Routing 是 ES 决定文档存储在哪个分片(shard)上的机制。当文档被索引时,ES 需要确定它应该存储在哪个主分片和副本分片上。
Pipeline 让 Routing 从简单(基于ID)变成智能(基于业务),实现数据自动分片,提升查询性能和数据管理效率。
默认路由机制,使用 _id 作为路由键,不同的ID值随机分配到不同分片上。
你可以通过 Pipeline 实现自动路由,可指定任意字段,任意策略作为路由键,如:通过 orderNo 的前缀或HASH
graph TB
subgraph "集群 Cluster"
subgraph "节点 Node 1"
N1[数据节点]
subgraph "索引 Index A"
IA1[分片 A0 - 主]
IA2[分片 A1 - 副本]
end
subgraph "索引 Index B"
IB1[分片 B0 - 主]
end
end
subgraph "节点 Node 2"
N2[数据节点]
subgraph "索引 Index A"
IA3[分片 A0 - 副本]
IA4[分片 A1 - 主]
end
subgraph "索引 Index B"
IB2[分片 B0 - 副本]
IB3[分片 B1 - 主]
end
end
subgraph "节点 Node 3"
N3[数据节点]
subgraph "索引 Index A"
IA5[分片 A1 - 副本]
end
subgraph "索引 Index B"
IB4[分片 B1 - 副本]
end
end
subgraph "节点 Node 4"
N4[主节点 Master Node]
N5[协调节点 Coordinating Node]
end
end
%% 连接关系
N4 -.->|集群管理| N1
N4 -.->|集群管理| N2
N4 -.->|集群管理| N3
%% 副本关系
IA1 -.->|副本同步| IA3
IA4 -.->|副本同步| IA2
IA4 -.->|副本同步| IA5
IB1 -.->|副本同步| IB2
IB3 -.->|副本同步| IB4
%% 客户端请求
Client[客户端] --> N5
N5 -->|路由| N1
N5 -->|路由| N2
N5 -->|路由| N3
Routing(路由)
ES 索引由多个 主分片(primary shards) 组成,文档必须分配到一个确定的分片。
分配规则:
shard_num = hash(_routing) % num_primary_shards
- 默认
_routing = _id(即文档 ID 决定分片位置)。你可以在索引/更新文档时显式指定?routing=xxx,覆盖默认值。num_primary_shards:主分片数量
| 场景 | 说明 |
|---|---|
| 关联文档查询性能优化 | 将属于同一实体的文档(如订单)用相同 routing 值,确保它们落在同一分片,避免跨分片 join。 减少搜索的分片数、减少网络开销和合并成本 |
| 热点数据隔离 | 按租户 ID routing,实现多租户数据物理隔离 |
| 控制写入负载均衡 | 避免默认 _id 哈希不均导致分片数据倾斜 |
| 事务保证 | 同一路由的文档操作有序 |
graph TB
subgraph "文档路由流程"
Client[客户端] --> CO[协调节点]
subgraph "路由计算"
CO --> RC[路由计算器]
RC -->|文档ID| HASH[哈希函数]
HASH --> MOD[取模运算]
MOD --> SHARD_ID[分片ID]
end
subgraph "分片定位"
SHARD_ID --> SL[分片定位器]
SL --> NLM[节点位置映射]
NLM --> NODE_ID[目标节点ID]
end
subgraph "请求转发"
NODE_ID --> RF[请求转发器]
RF --> Primary[主分片节点]
RF --> Replica[副本分片节点]
end
Primary --> PR[主分片处理]
Replica --> RR[副本分片处理]
PR --> Sync[副本同步]
Sync --> RR
PR --> Response[响应客户端]
Response --> Client
end
subgraph "路由策略类型"
RT1[默认路由: _id]
RT2[自定义路由字段]
RT3[索引别名路由]
RT4[查询时路由]
RC --> RT1
RC --> RT2
RC --> RT3
RC --> RT4
end
subgraph "哈希算法"
HA1[MurmurHash3]
HA2[MD5取模]
HA3[一致性哈希]
HASH --> HA1
HASH --> HA2
HASH --> HA3
end
分片扩容
支持路由动态扩容,而不需要删除重建
1 | # 初始索引 |
添加参数
number_of_routing_shards后路由计算:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
分片分配策略
-
平衡分配策略
- 收集所有未分配分片,对每个未分配分片选择最佳节点
- 候选节点评分,排除不符合条件的节点,选择得分最高的节点
- 磁盘空间(权重最高)
- 当前分片负载
- 硬件资源
- 网络拓扑
graph TB subgraph "再平衡决策流程" Monitor[集群监控器] --> Check{检查平衡条件} Check -->|不平衡| Analyze[分析不平衡程度] Check -->|平衡| NoAction[无操作] Analyze --> Plan[制定再平衡计划] subgraph "平衡策略选择" Plan --> PS1[分片移动策略] Plan --> PS2[分片交换策略] Plan --> PS3[优先级策略] PS1 --> Action1[移动过载节点分片] PS2 --> Action2[交换分片优化布局] PS3 --> Action3[按优先级重新分配] end Action1 --> Execute[执行再平衡] Action2 --> Execute Action3 --> Execute Execute --> Verify[验证再平衡结果] Verify -->|成功| Complete[完成] Verify -->|失败| Rollback[回滚] Rollback --> Monitor end subgraph "再平衡触发条件" TC1[节点加入/离开] TC2[分片分布不均
超过阈值] TC3[磁盘使用率不均] TC4[手动触发API] TC5[索引生命周期变更] TC1 --> Check TC2 --> Check TC3 --> Check TC4 --> Check TC5 --> Check end subgraph "再平衡约束条件" CC1[并发分片移动限制] CC2[网络带宽限制] CC3[恢复优先级] CC4[索引分配过滤] CC5[节点角色限制] Execute --> CC1 Execute --> CC2 Execute --> CC3 Execute --> CC4 Execute --> CC5 end -
机架感知分配
确保主分片和副本不在同一机架,为每个副本选择不同机架的节点
-
热冷数据分离
- 热数据分配到SSD节点
- 温数据分配到高性能HDD节点
- 冷数据分配到大容量HDD节点
路由过程
执行 POST my_index/_doc/1001 时
路由计算过程:
_routing = _id = “1001”
hash(“1001”) = 123456789 (假设值)
num_primary_shards = 5 (假设5个分片)
shard_num = 123456789 % 5 = 4
文档存储在分片4
sequenceDiagram
participant C as 客户端
participant CO as 协调节点
participant RC as 路由计算器
participant CM as 集群管理器
participant PN as 主分片节点
participant RN as 副本分片节点
Note over C,RN: 文档写入路由流程
C->>CO: 1. 发送写入请求
(文档ID, 路由值可选)
CO->>RC: 2. 计算目标分片
RC->>RC: 3. 应用路由算法
shard = hash(routing) % num_shards
alt 路由值存在
RC->>RC: 使用自定义路由值
else 路由值不存在
RC->>RC: 使用文档ID作为路由
end
RC->>CM: 4. 查询分片位置
CM->>RC: 5. 返回节点映射
alt 写入主分片
RC->>CO: 6. 返回主分片节点
CO->>PN: 7. 转发写入请求
PN->>PN: 8. 本地写入处理
par 并行副本同步
PN->>RN: 9a. 同步到副本分片
RN->>PN: 9b. 副本确认
end
PN->>CO: 10. 写入完成确认
CO->>C: 11. 返回写入成功
end
Note over C,RN: 查询路由流程
C->>CO: 12. 发送查询请求
CO->>RC: 13. 计算相关分片
alt 查询包含路由
RC->>RC: 14. 路由到特定分片
RC->>CM: 15. 定位主分片节点
CM->>CO: 16. 返回节点信息
CO->>PN: 17. 发送查询请求
PN->>CO: 18. 返回查询结果
else 查询不包含路由
RC->>CM: 19. 获取所有相关分片
CM->>CO: 20. 返回全部分片节点
par 并行分片查询
CO->>PN: 21a. 查询主分片1
CO->>RN: 21b. 查询副本分片2
CO->>PN: 21c. 查询主分片N
end
CO->>CO: 22. 合并所有分片结果
CO->>C: 23. 返回最终结果
end
Join 文档路由
对于 join 类型字段,父子文档必须路由到相同分片,所以 routing 必须相等:
1 | PUT company |
手动路由
手动指定路由:路由值 = “user_123”,而不是文档ID。
1 | POST orders/_doc/1?routing=user_123 |
这种方式好处是灵活,但是每次插入数据都需要手动指定 routing
可通过以下 Pipeline 统一管理维护路由策略,实现自动路由
查询时使用路由
1 | GET orders/_search?routing=user_123 |
优点:
-
完全控制路由逻辑
-
无需额外 Pipeline 资源
-
应用层逻辑清晰可见
缺点:
-
路由逻辑与应用代码耦合
-
多个客户端需要实现相同逻辑
-
更改路由策略需要更新所有客户端
自定义路由策略
1 | public class CustomRoutingStrategies { |
Pipeline(管道)
Pipeline 是 Elasticsearch 的数据预处理管道,在文档被索引之前对其内容进行预处理的机制。
它允许你在数据写入 Elasticsearch 之前自动执行一系列 processors(处理器),比如解析、转换、路由或清理数据。
路由过程:文档进入 → Ingest Node → Pipeline处理 → 索引到目标分片
一个 Pipeline 是一组按顺序执行的 processors。它在 协调节点(coordinating node) 上运行(在文档被路由到分片之前)。
适用于
index、create、update、bulk等写入操作。
核心组件
-
Processor(处理器链),每个 processor 对文档的一个或多个字段执行特定操作。
-
on_failure(错误处理),对失败或异常后的结果做相应处理
Elasticsearch 内置了数十种处理器,例如:
| Processor | 功能 |
|---|---|
set |
设置字段值 |
grok |
用正则解析日志(类似 Logstash) |
date |
解析字符串为日期,并设置为 @timestamp |
remove |
删除字段 |
rename |
重命名字段 |
script |
执行 Painless 脚本 |
user_agent |
解析 User-Agent 字符串 |
geoip |
根据 IP 地址添加地理位置信息 |
attachment |
解析 Base64 编码的附件(如 PDF) |
可以通过插件开发自定义 processor。
1 | PUT _ingest/pipeline/my_pipeline |
示例:
1 | PUT _ingest/pipeline/safe_routing |
应用场景
-
数据清洗
空格等特殊字符处理、转换、编码、移除字段等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20PUT _ingest/pipeline/data_clean
{
"processors": [
{
"trim": {
"field": "username"
}
},
{
"lowercase": {
"field": "email"
}
},
{
"remove": {
"field": "password"
}
}
]
} -
字段扩展
指定格式,值转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18PUT _ingest/pipeline/enrich_data
{
"processors": [
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "index_day",
"value": "{{_ingest.timestamp}}",
"format": "yyyy-MM-dd"
}
}
]
} -
丰富订单数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67PUT _ingest/pipeline/order_enrichment
{
"description": "丰富订单数据",
"processors": [
{
"set": {
"field": "processing_timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "index_day",
"value": "{{_ingest.timestamp}}",
"format": "yyyy-MM-dd"
}
},
{
"set": {
"field": "index_hour",
"value": "{{_ingest.timestamp}}",
"format": "HH"
}
},
{
"enrich": {
"policy_name": "product-info-policy",
"field": "product_id",
"target_field": "product_info",
"max_matches": 1
}
},
{
"set": {
"field": "total_amount",
"value": "{{price}} * {{quantity}}"
}
},
{
"set": {
"field": "price_category",
"if": "ctx.price > 1000",
"value": "high"
}
},
{
"set": {
"field": "price_category",
"if": "ctx.price <= 1000 && ctx.price > 100",
"value": "medium"
}
},
{
"set": {
"field": "price_category",
"if": "ctx.price <= 100",
"value": "low"
}
},
{
"set": {
"field": "order_source",
"value": "{{#order_channel}}{{order_channel}}{{/order_channel}}{{^order_channel}}web{{/order_channel}}"
}
}
]
} -
数据脱敏
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48PUT _ingest/pipeline/gdpr_compliance
{
"description": "GDPR 合规数据脱敏",
"processors": [
{
"set": {
"field": "data_category",
"value": "personal_identifiable_information"
}
},
{
"fingerprint": {
"field": ["email", "phone"],
"method": "SHA-256",
"salt": "gdpr_salt_2024",
"target_field": "user_hash",
"ignore_missing": true
}
},
{
"redact": {
"field": "credit_card",
"pattern": "(\\d{4})-?(\\d{4})-?(\\d{4})-?(\\d{4})",
"replacement": "****-****-****-$4"
}
},
{
"redact": {
"field": "id_card",
"pattern": "(\\d{6})(\\d{8})(\\d{4})",
"replacement": "$1********$3"
}
},
{
"set": {
"field": "retention_period_days",
"value": 365
}
},
{
"date_index_name": {
"field": "created_at",
"date_rounding": "M",
"index_name_prefix": "user_data-"
}
}
]
} -
IoT 设备数据聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68PUT _ingest/pipeline/iot_sensor_processing
{
"description": "IoT传感器数据处理",
"processors": [
{
"script": {
"lang": "painless",
"source": """
// 设备数据标准化
if (ctx.device_type == 'temperature_sensor') {
ctx.metric_type = 'temperature';
ctx.unit = 'celsius';
// 华氏转摄氏
if (ctx.temperature_unit == 'fahrenheit') {
ctx.value = (ctx.value - 32) * 5 / 9;
}
} else if (ctx.device_type == 'humidity_sensor') {
ctx.metric_type = 'humidity';
ctx.unit = 'percentage';
}
// 数据质量检查
if (ctx.metric_type == 'temperature') {
if (ctx.value < -50 || ctx.value > 100) {
ctx.data_quality = 'invalid';
ctx.error_reason = 'value_out_of_range';
} else {
ctx.data_quality = 'valid';
}
}
// 添加地理位置哈希
if (ctx.latitude != null && ctx.longitude != null) {
ctx.geo_hash = ctx.latitude + ',' + ctx.longitude;
ctx.location = ctx.latitude + ',' + ctx.longitude;
}
"""
}
},
{
"set": {
"field": "processing_timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"date": {
"field": "event_time",
"formats": ["yyyy-MM-dd HH:mm:ss", "epoch_millis"],
"target_field": "event_timestamp"
}
},
{
"convert": {
"field": "battery_level",
"type": "float",
"ignore_missing": true
}
},
{
"set": {
"field": "battery_status",
"value": "{{#battery_level}}{{^30}}critical{{/30}}{{/battery_level}}",
"if": "ctx.battery_level < 30"
}
}
]
} -
用户行为事件处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90PUT _ingest/pipeline/user_behavior_processing
{
"description": "用户行为事件处理,用于实时推荐",
"processors": [
{
"set": {
"field": "event_id",
"value": "{{user_id}}_{{_ingest.timestamp}}_{{event_type}}"
}
},
{
"date": {
"field": "event_time",
"formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZ", "epoch_millis"],
"target_field": "timestamp"
}
},
{
"script": {
"lang": "painless",
"source": """
// 事件权重计算
def eventWeights = [
'purchase': 10,
'add_to_cart': 5,
'view': 1,
'click': 2,
'share': 3,
'like': 1
];
if (eventWeights.containsKey(ctx.event_type)) {
ctx.event_weight = eventWeights[ctx.event_type];
}
// 会话管理
if (ctx.session_id == null) {
ctx.session_id = ctx.user_id + '_' +
(System.currentTimeMillis() / 3600000); // 每小时一个会话
}
// 实时特征计算
if (ctx.dwell_time != null) {
if (ctx.dwell_time > 30000) { // 30秒
ctx.engagement_level = 'high';
} else if (ctx.dwell_time > 10000) { // 10秒
ctx.engagement_level = 'medium';
} else {
ctx.engagement_level = 'low';
}
}
// 内容类别偏好
if (ctx.category != null) {
ctx.user_preference = ctx.category + '_' + ctx.event_type;
}
"""
}
},
{
"convert": {
"field": "dwell_time",
"type": "long",
"ignore_missing": true
}
},
{
"set": {
"field": "is_mobile",
"value": "{{user_agent}} =~ /Mobile|Android|iPhone/",
"if": "ctx.user_agent != null"
}
},
{
"set": {
"field": "local_hour",
"value": "{{timestamp}}",
"format": "HH",
"timezone": "{{timezone}}"
}
},
{
"set": {
"field": "day_of_week",
"value": "{{timestamp}}",
"format": "e" // 1=Monday, 7=Sunday
}
}
]
} -
系统指标监控处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101PUT _ingest/pipeline/system_monitoring
{
"description": "系统监控指标处理与告警预处理",
"processors": [
{
"set": {
"field": "metric_type",
"value": "system_monitoring"
}
},
{
"script": {
"lang": "painless",
"source": """
// 阈值检查
if (ctx.metric_name == 'cpu_usage' && ctx.value > 90) {
ctx.alert_level = 'critical';
ctx.alert_message = 'CPU使用率超过90%';
ctx.needs_alert = true;
}
if (ctx.metric_name == 'memory_usage' && ctx.value > 85) {
ctx.alert_level = 'warning';
ctx.alert_message = '内存使用率超过85%';
ctx.needs_alert = true;
}
if (ctx.metric_name == 'disk_usage' && ctx.value > 95) {
ctx.alert_level = 'critical';
ctx.alert_message = '磁盘使用率超过95%';
ctx.needs_alert = true;
}
// 响应时间监控
if (ctx.metric_name == 'response_time' && ctx.value > 1000) {
ctx.alert_level = 'warning';
ctx.alert_message = '响应时间超过1000ms';
ctx.needs_alert = true;
}
// 错误率监控
if (ctx.metric_name == 'error_rate' && ctx.value > 5) {
ctx.alert_level = 'critical';
ctx.alert_message = '错误率超过5%';
ctx.needs_alert = true;
}
// 趋势分析
if (ctx.previous_value != null) {
ctx.change_percentage =
((ctx.value - ctx.previous_value) / ctx.previous_value) * 100;
if (Math.abs(ctx.change_percentage) > 50) {
ctx.significant_change = true;
}
}
"""
}
},
{
"convert": {
"field": "value",
"type": "float"
}
},
{
"set": {
"field": "normalized_value",
"value": "{{value}} / {{max_value}}",
"if": "ctx.max_value != null"
}
},
{
"date": {
"field": "timestamp",
"formats": ["epoch_millis", "yyyy-MM-dd HH:mm:ss"],
"target_field": "@timestamp"
}
},
{
"set": {
"field": "collection_interval_minutes",
"value": 5
}
},
{
"set": {
"field": "data_center",
"value": "{{hostname}} =~ /^us-/ ? 'us-east' : 'cn-north'",
"if": "ctx.hostname != null"
}
},
{
"fingerprint": {
"field": ["hostname", "metric_name", "@timestamp"],
"method": "SHA-1",
"target_field": "metric_id"
}
}
]
}
自动路由(Routing)
自动路由执行的过程

Pipeline 执行:
- 解析模板: “user_”
- 获取 user_id 值: “123”
- 计算结果: “user_123”
- 设置 _routing = “user_123”
路由计算:
hash(“user_123”) = 87654321 (示例值)
分片数: 5
分片ID: 87654321 % 5 = 1
文档存储在分片1
1 | PUT _ingest/pipeline/set_routing |
文档到达: {“user_id”: “123”, “name”: “Alice”}
Pipeline处理: set _routing = “user_123”
路由计算: shard = hash(“user_123”) % num_shards
存储: 文档存储在计算出的分片
自动路由示例
场景:当数据正常插入时,根据指定的方式和策略自动将数据路由到指定分片上
示例:将同一用户数据路由到同一分片(根据user_id 路由分片)
通过 Pipeline 实现 Routing 路由分片策略功能:
客户端发送文档
经过 Ingest Pipeline
Pipeline 计算 _routing 值
ES 根据 _routing 哈希选择分片(HASH 策略)
文档存储到选定分片
查询时提供相同的 _routing
ES 只搜索对应的分片
定义 Pipeline
以下提供多种路由策略。
方式一:使用 KV Processor + 自定义字段,直接通过字段值作为路由
1 | PUT _ingest/pipeline/order_user_pipeline |
方式二:使用 Fingerprint Processor + Set Processor,通过 hash 算法路由。在检索时也需要给 hash 值。
1 | PUT _ingest/pipeline/order_user_pipeline |
方式三:使用 Dissect/Grok Processor 处理复杂关系 userId
1 | PUT _ingest/pipeline/order_user_pipeline |
方式四:使用 Conditional Processor + 多个 Set 处理器,可以添加多条件判断
1 | PUT _ingest/pipeline/order_user_pipeline |
方式五:使用 Append Processor 组合路由键,可以拼接多个字段
1 | PUT _ingest/pipeline/order_user_pipeline |
方式六:使用 URL Decode + 路由标准化,如果 userId 值是通过编码的可以执行解码
1 | PUT _ingest/pipeline/order_user_pipeline |
方式七:使用 Convert Processor + 类型转换,可以将值转为其他类型后再路由
1 | PUT _ingest/pipeline/order_user_pipeline |
方式八:使用 Enrich Processor + 路由映射表,可以通过指定的映射作为路由键
1 | ###### 创建源索引(路由映射) |
测试 Pipeline
可以查看根据以上策略生成的 routing 值
1 | POST _ingest/pipeline/order_user_pipeline/_simulate |
验证 Pipeline 结果。使用 pipeline 根据 userId 路由分区,多条数据测试结果如下:
GET /orders/_search?explain=true
id = 1001,userId= 501,*routing = 501,*shard:0
id = 1004,userId= 501,*routing = 501,*shard:0
id = 1007,userId= 501,*routing = 501,*shard:0
id = 1009,userId= 501,*routing = 501,*shard:0
id = 1003,userId= 503,*routing = 503,*shard:0
id = 1006,userId= 505,*routing = 505,*shard:1
id = 1005,userId= 504,*routing = 504,*shard:2
id = 1008,userId= 506,*routing = 506,*shard:2
id = 1002,userId= 502,*routing = 502,*shard:3
id = 1010,userId= 507,*routing = 507,*shard:3
证明 pipeline 的自动路由生效,routing 值已经为 userId,且 shard 也是根据 userId
绑定 Pipeline
Pipeline 与 Routing 的绑定关系,指定 Pipeline 实现 Routing 路由
方法一:在 创建索引时指定 default_pipeline 值
1 | PUT /orders |
方法二:已创建的索引设置默认 Pipeline
所有写入 orders 索引的文档(除非显式覆盖)都会自动应用该 pipeline
1 | PUT orders/_settings |
方法三:在索引数据插入时使用指定 Pipeline(覆盖默认)
1 | POST orders/_doc/11?pipeline=order_user_pipeline |
命令写入数据
正常方式插入数据即可,支持批量插入
1 | POST /orders/_doc/1 |
Flink SQL写入
Flink SQL ,从Mysql 中读取数据(批量写入 ES),Flink 1.19 及之前不支持标准 SQL 的 ARRAY_AGG,执行命令:
需提前下载(Apache Archive Distribution Directory):
- Flink(
flink-1.20.3-bin-scala_2.12.tgz) 必须与 connector 版本一致,且各 connector 版本都要一致(1.20)- flink-sql-connector-elasticsearch7(
flink-sql-connector-elasticsearch7-3.1.0-1.20.jar)- flink-table-api-java-bridge(
flink-table-api-java-bridge-1.20.3.jar)- flink-connector-jdbc(
flink-connector-jdbc-3.3.0-1.20.jar)- mysql-connector-java(
mysql-connector-j-8.4.0.jar)解压 Flink 后,将所有 JAR 都放入
flink-1.20.3/lib/目录
1 | # 先启动 Flink 集群,JobManager + TaskManager。 |
同步脚本 orders_to_es.sql,以下不支持实时同步,只用于批量数据导入
1 | -- 强制设置执行模式为 streaming |
该版本不支持 Ingest Pipeline
ES 的
scaled_float底层存储为long,但写入时仍用double,ES 自动乘以scaling_factor。例如:99.99→ ES 存为9999(当scaling_factor=100)。嵌套结构实现
使用
ARRAY_AGG(ROW(...))将多行order_items聚合成ARRAY<ROW<...>>;Flink ES connector 会自动将其映射为 ES 的
nested类型(前提是 ES 索引 mapping 已正确定义)。批量写入优化
sink.bulk-flush.max-actions = '1000':每 1000 条触发一次 bulk;
sink.bulk-flush.interval = '10s':最多 10 秒 flush 一次;支持 并行写入(调整 Flink 并行度)。
检索数据
若不添加参数 routing 则会检索所有分片,加了会根据路由检索指定分片。
针对不同的路由策略,检索时的参数也需要对应:
第一种方式,直接使用 属性值的检索:
1 | # 指定路由值 |
其他方式,需要获取指定的检索值。