Routing 是 ES 决定文档存储在哪个分片(shard)上的机制。当文档被索引时,ES 需要确定它应该存储在哪个主分片和副本分片上。

Pipeline 让 Routing 从简单(基于ID)变成智能(基于业务),实现数据自动分片,提升查询性能和数据管理效率。

默认路由机制,使用 _id 作为路由键,不同的ID值随机分配到不同分片上。

你可以通过 Pipeline 实现自动路由,可指定任意字段,任意策略作为路由键,如:通过 orderNo 的前缀或HASH

graph TB
    subgraph "集群 Cluster"
        subgraph "节点 Node 1"
            N1[数据节点]
            
            subgraph "索引 Index A"
                IA1[分片 A0 - 主]
                IA2[分片 A1 - 副本]
            end
            
            subgraph "索引 Index B"
                IB1[分片 B0 - 主]
            end
        end
        
        subgraph "节点 Node 2"
            N2[数据节点]
            
            subgraph "索引 Index A"
                IA3[分片 A0 - 副本]
                IA4[分片 A1 - 主]
            end
            
            subgraph "索引 Index B"
                IB2[分片 B0 - 副本]
                IB3[分片 B1 - 主]
            end
        end
        
        subgraph "节点 Node 3"
            N3[数据节点]
            
            subgraph "索引 Index A"
                IA5[分片 A1 - 副本]
            end
            
            subgraph "索引 Index B"
                IB4[分片 B1 - 副本]
            end
        end
        
        subgraph "节点 Node 4"
            N4[主节点 Master Node]
            N5[协调节点 Coordinating Node]
        end
    end
    
    %% 连接关系
    N4 -.->|集群管理| N1
    N4 -.->|集群管理| N2
    N4 -.->|集群管理| N3
    
    %% 副本关系
    IA1 -.->|副本同步| IA3
    IA4 -.->|副本同步| IA2
    IA4 -.->|副本同步| IA5
    IB1 -.->|副本同步| IB2
    IB3 -.->|副本同步| IB4
    
    %% 客户端请求
    Client[客户端] --> N5
    N5 -->|路由| N1
    N5 -->|路由| N2
    N5 -->|路由| N3

Routing(路由)

ES 索引由多个 主分片(primary shards) 组成,文档必须分配到一个确定的分片

分配规则:shard_num = hash(_routing) % num_primary_shards

  • 默认_routing = _id(即文档 ID 决定分片位置)。你可以在索引/更新文档时显式指定 ?routing=xxx,覆盖默认值。
  • num_primary_shards:主分片数量
场景 说明
关联文档查询性能优化 将属于同一实体的文档(如订单)用相同 routing 值,确保它们落在同一分片,避免跨分片 join。
减少搜索的分片数、减少网络开销和合并成本
热点数据隔离 按租户 ID routing,实现多租户数据物理隔离
控制写入负载均衡 避免默认 _id 哈希不均导致分片数据倾斜
事务保证 同一路由的文档操作有序
graph TB
    subgraph "文档路由流程"
        Client[客户端] --> CO[协调节点]
        
        subgraph "路由计算"
            CO --> RC[路由计算器]
            RC -->|文档ID| HASH[哈希函数]
            HASH --> MOD[取模运算]
            MOD --> SHARD_ID[分片ID]
        end
        
        subgraph "分片定位"
            SHARD_ID --> SL[分片定位器]
            SL --> NLM[节点位置映射]
            NLM --> NODE_ID[目标节点ID]
        end
        
        subgraph "请求转发"
            NODE_ID --> RF[请求转发器]
            RF --> Primary[主分片节点]
            RF --> Replica[副本分片节点]
        end
        
        Primary --> PR[主分片处理]
        Replica --> RR[副本分片处理]
        
        PR --> Sync[副本同步]
        Sync --> RR
        
        PR --> Response[响应客户端]
        Response --> Client
    end
    
    subgraph "路由策略类型"
        RT1[默认路由: _id]
        RT2[自定义路由字段]
        RT3[索引别名路由]
        RT4[查询时路由]
        
        RC --> RT1
        RC --> RT2
        RC --> RT3
        RC --> RT4
    end
    
    subgraph "哈希算法"
        HA1[MurmurHash3]
        HA2[MD5取模]
        HA3[一致性哈希]
        
        HASH --> HA1
        HASH --> HA2
        HASH --> HA3
    end

分片扩容

支持路由动态扩容,而不需要删除重建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 初始索引
PUT old_index
{
"settings": {
"number_of_shards": 2,
"number_of_routing_shards": 8, # 用于未来扩容
"index.routing_partition_size": 4 # 路由分区大小
}
}

# 扩容后(reindex)
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index"
}
}

# 新索引
PUT new_index
{
"settings": {
"number_of_shards": 4 # 从2个分片扩容到4
}
}

添加参数number_of_routing_shards后路由计算:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

分片分配策略

  1. 平衡分配策略

    • 收集所有未分配分片,对每个未分配分片选择最佳节点
    • 候选节点评分,排除不符合条件的节点,选择得分最高的节点
      • 磁盘空间(权重最高)
      • 当前分片负载
      • 硬件资源
      • 网络拓扑
       graph TB
        subgraph "再平衡决策流程"
            Monitor[集群监控器] --> Check{检查平衡条件}
            
            Check -->|不平衡| Analyze[分析不平衡程度]
            Check -->|平衡| NoAction[无操作]
            
            Analyze --> Plan[制定再平衡计划]
            
            subgraph "平衡策略选择"
                Plan --> PS1[分片移动策略]
                Plan --> PS2[分片交换策略]
                Plan --> PS3[优先级策略]
                
                PS1 --> Action1[移动过载节点分片]
                PS2 --> Action2[交换分片优化布局]
                PS3 --> Action3[按优先级重新分配]
            end
            
            Action1 --> Execute[执行再平衡]
            Action2 --> Execute
            Action3 --> Execute
            
            Execute --> Verify[验证再平衡结果]
            Verify -->|成功| Complete[完成]
            Verify -->|失败| Rollback[回滚]
            
            Rollback --> Monitor
        end
        
        subgraph "再平衡触发条件"
            TC1[节点加入/离开]
            TC2[分片分布不均
    超过阈值] TC3[磁盘使用率不均] TC4[手动触发API] TC5[索引生命周期变更] TC1 --> Check TC2 --> Check TC3 --> Check TC4 --> Check TC5 --> Check end subgraph "再平衡约束条件" CC1[并发分片移动限制] CC2[网络带宽限制] CC3[恢复优先级] CC4[索引分配过滤] CC5[节点角色限制] Execute --> CC1 Execute --> CC2 Execute --> CC3 Execute --> CC4 Execute --> CC5 end
  2. 机架感知分配

    确保主分片和副本不在同一机架,为每个副本选择不同机架的节点

  3. 热冷数据分离

    1. 热数据分配到SSD节点
    2. 温数据分配到高性能HDD节点
    3. 冷数据分配到大容量HDD节点

路由过程

执行 POST my_index/_doc/1001

路由计算过程:

  1. _routing = _id = “1001”

  2. hash(“1001”) = 123456789 (假设值)

  3. num_primary_shards = 5 (假设5个分片)

  4. shard_num = 123456789 % 5 = 4

  5. 文档存储在分片4

sequenceDiagram
    participant C as 客户端
    participant CO as 协调节点
    participant RC as 路由计算器
    participant CM as 集群管理器
    participant PN as 主分片节点
    participant RN as 副本分片节点
    
    Note over C,RN: 文档写入路由流程
    
    C->>CO: 1. 发送写入请求
(文档ID, 路由值可选) CO->>RC: 2. 计算目标分片 RC->>RC: 3. 应用路由算法
shard = hash(routing) % num_shards alt 路由值存在 RC->>RC: 使用自定义路由值 else 路由值不存在 RC->>RC: 使用文档ID作为路由 end RC->>CM: 4. 查询分片位置 CM->>RC: 5. 返回节点映射 alt 写入主分片 RC->>CO: 6. 返回主分片节点 CO->>PN: 7. 转发写入请求 PN->>PN: 8. 本地写入处理 par 并行副本同步 PN->>RN: 9a. 同步到副本分片 RN->>PN: 9b. 副本确认 end PN->>CO: 10. 写入完成确认 CO->>C: 11. 返回写入成功 end Note over C,RN: 查询路由流程 C->>CO: 12. 发送查询请求 CO->>RC: 13. 计算相关分片 alt 查询包含路由 RC->>RC: 14. 路由到特定分片 RC->>CM: 15. 定位主分片节点 CM->>CO: 16. 返回节点信息 CO->>PN: 17. 发送查询请求 PN->>CO: 18. 返回查询结果 else 查询不包含路由 RC->>CM: 19. 获取所有相关分片 CM->>CO: 20. 返回全部分片节点 par 并行分片查询 CO->>PN: 21a. 查询主分片1 CO->>RN: 21b. 查询副本分片2 CO->>PN: 21c. 查询主分片N end CO->>CO: 22. 合并所有分片结果 CO->>C: 23. 返回最终结果 end

Join 文档路由

对于 join 类型字段,父子文档必须路由到相同分片,所以 routing 必须相等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
PUT company
{
"mappings": {
"properties": {
"join_field": {
"type": "join",
"relations": {
"company": "employee"
}
}
}
}
}

# 父文档
POST company/_doc/1?routing=company_abc
{
"name": "ABC公司",
"join_field": {
"name": "company"
}
}

# 子文档(必须相同路由)
POST company/_doc/2?routing=company_abc
{
"name": "人事部门",
"join_field": {
"name": "employee",
"parent": "1"
}
}

手动路由

手动指定路由:路由值 = “user_123”,而不是文档ID。

1
2
3
4
5
6
POST orders/_doc/1?routing=user_123
{
"order_id": "1",
"user_id": "user_123",
"amount": 100
}

这种方式好处是灵活,但是每次插入数据都需要手动指定 routing

可通过以下 Pipeline 统一管理维护路由策略,实现自动路由

查询时使用路由

1
2
3
4
5
6
7
8
GET orders/_search?routing=user_123
{
"query": {
"term": {
"user_id": "user_123"
}
}
}

优点:

  • 完全控制路由逻辑

  • 无需额外 Pipeline 资源

  • 应用层逻辑清晰可见

缺点:

  • 路由逻辑与应用代码耦合

  • 多个客户端需要实现相同逻辑

  • 更改路由策略需要更新所有客户端

自定义路由策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class CustomRoutingStrategies {

// 1. 时间范围路由
class TimeBasedRouting implements RoutingStrategy {
private final String routingField;
private final DateTimeFormatter formatter;

public TimeBasedRouting(String routingField, String pattern) {
this.routingField = routingField;
this.formatter = DateTimeFormatter.ofPattern(pattern);
}

@Override
public String getRoutingValue(Document document) {
// 从文档中提取时间字段
String timeValue = document.getField(routingField);

try {
// 解析时间并格式化为路由值
LocalDateTime dateTime = LocalDateTime.parse(timeValue, formatter);
return dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM"));
} catch (Exception e) {
// 回退到默认路由
return document.getId();
}
}
}

// 2. 地理位置路由
class GeoHashRouting implements RoutingStrategy {
private final int precision; // GeoHash精度

public GeoHashRouting(int precision) {
this.precision = precision;
}

@Override
public String getRoutingValue(Document document) {
// 提取经纬度
double lat = document.getDoubleField("latitude");
double lon = document.getDoubleField("longitude");

// 生成GeoHash
String geoHash = GeoHash.encode(lat, lon, precision);

return geoHash;
}
}

// 3. 租户隔离路由
class TenantAwareRouting implements RoutingStrategy {
@Override
public String getRoutingValue(Document document) {
// 从文档或上下文中提取租户ID
String tenantId = extractTenantId(document);

// 组合租户ID和文档ID
return tenantId + ":" + document.getId();
}

private String extractTenantId(Document document) {
// 从文档字段中提取
if (document.hasField("tenant_id")) {
return document.getField("tenant_id");
}

// 从安全上下文中提取
SecurityContext context = SecurityContextHolder.getContext();
if (context != null) {
return context.getTenantId();
}

throw new IllegalArgumentException("Tenant ID not found");
}
}

// 4. 复合路由策略
class CompositeRoutingStrategy implements RoutingStrategy {
private final List<RoutingStrategy> strategies;
private final RoutingStrategy fallback;

public CompositeRoutingStrategy(List<RoutingStrategy> strategies,
RoutingStrategy fallback) {
this.strategies = strategies;
this.fallback = fallback;
}

@Override
public String getRoutingValue(Document document) {
for (RoutingStrategy strategy : strategies) {
try {
String routing = strategy.getRoutingValue(document);
if (routing != null && !routing.isEmpty()) {
return routing;
}
} catch (Exception e) {
// 当前策略失败,尝试下一个
continue;
}
}

// 所有策略都失败,使用回退策略
return fallback.getRoutingValue(document);
}
}
}

Pipeline(管道

Pipeline 是 Elasticsearch 的数据预处理管道,在文档被索引之前对其内容进行预处理的机制。

它允许你在数据写入 Elasticsearch 之前自动执行一系列 processors(处理器),比如解析、转换、路由或清理数据。

路由过程:文档进入 → Ingest Node → Pipeline处理 → 索引到目标分片

一个 Pipeline 是一组按顺序执行的 processors。它在 协调节点(coordinating node) 上运行(在文档被路由到分片之前)。

  • 适用于 indexcreateupdatebulk 等写入操作。

核心组件

  • Processor(处理器链),每个 processor 对文档的一个或多个字段执行特定操作。

  • on_failure(错误处理),对失败或异常后的结果做相应处理

Elasticsearch 内置了数十种处理器,例如:

Processor 功能
set 设置字段值
grok 用正则解析日志(类似 Logstash)
date 解析字符串为日期,并设置为 @timestamp
remove 删除字段
rename 重命名字段
script 执行 Painless 脚本
user_agent 解析 User-Agent 字符串
geoip 根据 IP 地址添加地理位置信息
attachment 解析 Base64 编码的附件(如 PDF)

可以通过插件开发自定义 processor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PUT _ingest/pipeline/my_pipeline
{
"description": "数据处理管道",
"processors": [ # ← 处理器链,按顺序执行
{
"processor_type1": { ... }
},
{
"processor_type2": { ... }
}
],
"on_failure": [ # ← 错误处理
{
"set": {
"field": "error",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PUT _ingest/pipeline/safe_routing
{
"processors": [
{
"set": {
"field": "_routing",
"value": "{{user_id}}",
"ignore_failure": true # 即使失败也继续
}
},
{
"set": {
"field": "_routing",
"value": "{{_id}}", # 回退方案
"if": "ctx._routing == null" # 如果上一个处理器失败
}
}
],
"on_failure": [
{
"set": {
"field": "error",
"value": "{{ _ingest.on_failure_message }}"
}
},
{
"set": {
"field": "_routing",
"value": "fallback_route"
}
}
]
}

应用场景

  • 数据清洗

    空格等特殊字符处理、转换、编码、移除字段等

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    PUT _ingest/pipeline/data_clean
    {
    "processors": [
    {
    "trim": {
    "field": "username"
    }
    },
    {
    "lowercase": {
    "field": "email"
    }
    },
    {
    "remove": {
    "field": "password"
    }
    }
    ]
    }
  • 字段扩展

    指定格式,值转换

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    PUT _ingest/pipeline/enrich_data
    {
    "processors": [
    {
    "set": {
    "field": "timestamp",
    "value": "{{_ingest.timestamp}}"
    }
    },
    {
    "set": {
    "field": "index_day",
    "value": "{{_ingest.timestamp}}",
    "format": "yyyy-MM-dd"
    }
    }
    ]
    }
  • 丰富订单数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    PUT _ingest/pipeline/order_enrichment
    {
    "description": "丰富订单数据",
    "processors": [
    {
    "set": {
    "field": "processing_timestamp",
    "value": "{{_ingest.timestamp}}"
    }
    },
    {
    "set": {
    "field": "index_day",
    "value": "{{_ingest.timestamp}}",
    "format": "yyyy-MM-dd"
    }
    },
    {
    "set": {
    "field": "index_hour",
    "value": "{{_ingest.timestamp}}",
    "format": "HH"
    }
    },
    {
    "enrich": {
    "policy_name": "product-info-policy",
    "field": "product_id",
    "target_field": "product_info",
    "max_matches": 1
    }
    },
    {
    "set": {
    "field": "total_amount",
    "value": "{{price}} * {{quantity}}"
    }
    },
    {
    "set": {
    "field": "price_category",
    "if": "ctx.price > 1000",
    "value": "high"
    }
    },
    {
    "set": {
    "field": "price_category",
    "if": "ctx.price <= 1000 && ctx.price > 100",
    "value": "medium"
    }
    },
    {
    "set": {
    "field": "price_category",
    "if": "ctx.price <= 100",
    "value": "low"
    }
    },
    {
    "set": {
    "field": "order_source",
    "value": "{{#order_channel}}{{order_channel}}{{/order_channel}}{{^order_channel}}web{{/order_channel}}"
    }
    }
    ]
    }
  • 数据脱敏

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    PUT _ingest/pipeline/gdpr_compliance
    {
    "description": "GDPR 合规数据脱敏",
    "processors": [
    {
    "set": {
    "field": "data_category",
    "value": "personal_identifiable_information"
    }
    },
    {
    "fingerprint": {
    "field": ["email", "phone"],
    "method": "SHA-256",
    "salt": "gdpr_salt_2024",
    "target_field": "user_hash",
    "ignore_missing": true
    }
    },
    {
    "redact": {
    "field": "credit_card",
    "pattern": "(\\d{4})-?(\\d{4})-?(\\d{4})-?(\\d{4})",
    "replacement": "****-****-****-$4"
    }
    },
    {
    "redact": {
    "field": "id_card",
    "pattern": "(\\d{6})(\\d{8})(\\d{4})",
    "replacement": "$1********$3"
    }
    },
    {
    "set": {
    "field": "retention_period_days",
    "value": 365
    }
    },
    {
    "date_index_name": {
    "field": "created_at",
    "date_rounding": "M",
    "index_name_prefix": "user_data-"
    }
    }
    ]
    }
  • IoT 设备数据聚合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    PUT _ingest/pipeline/iot_sensor_processing
    {
    "description": "IoT传感器数据处理",
    "processors": [
    {
    "script": {
    "lang": "painless",
    "source": """
    // 设备数据标准化
    if (ctx.device_type == 'temperature_sensor') {
    ctx.metric_type = 'temperature';
    ctx.unit = 'celsius';
    // 华氏转摄氏
    if (ctx.temperature_unit == 'fahrenheit') {
    ctx.value = (ctx.value - 32) * 5 / 9;
    }
    } else if (ctx.device_type == 'humidity_sensor') {
    ctx.metric_type = 'humidity';
    ctx.unit = 'percentage';
    }

    // 数据质量检查
    if (ctx.metric_type == 'temperature') {
    if (ctx.value < -50 || ctx.value > 100) {
    ctx.data_quality = 'invalid';
    ctx.error_reason = 'value_out_of_range';
    } else {
    ctx.data_quality = 'valid';
    }
    }

    // 添加地理位置哈希
    if (ctx.latitude != null && ctx.longitude != null) {
    ctx.geo_hash = ctx.latitude + ',' + ctx.longitude;
    ctx.location = ctx.latitude + ',' + ctx.longitude;
    }
    """
    }
    },
    {
    "set": {
    "field": "processing_timestamp",
    "value": "{{_ingest.timestamp}}"
    }
    },
    {
    "date": {
    "field": "event_time",
    "formats": ["yyyy-MM-dd HH:mm:ss", "epoch_millis"],
    "target_field": "event_timestamp"
    }
    },
    {
    "convert": {
    "field": "battery_level",
    "type": "float",
    "ignore_missing": true
    }
    },
    {
    "set": {
    "field": "battery_status",
    "value": "{{#battery_level}}{{^30}}critical{{/30}}{{/battery_level}}",
    "if": "ctx.battery_level < 30"
    }
    }
    ]
    }
  • 用户行为事件处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    PUT _ingest/pipeline/user_behavior_processing
    {
    "description": "用户行为事件处理,用于实时推荐",
    "processors": [
    {
    "set": {
    "field": "event_id",
    "value": "{{user_id}}_{{_ingest.timestamp}}_{{event_type}}"
    }
    },
    {
    "date": {
    "field": "event_time",
    "formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZ", "epoch_millis"],
    "target_field": "timestamp"
    }
    },
    {
    "script": {
    "lang": "painless",
    "source": """
    // 事件权重计算
    def eventWeights = [
    'purchase': 10,
    'add_to_cart': 5,
    'view': 1,
    'click': 2,
    'share': 3,
    'like': 1
    ];

    if (eventWeights.containsKey(ctx.event_type)) {
    ctx.event_weight = eventWeights[ctx.event_type];
    }

    // 会话管理
    if (ctx.session_id == null) {
    ctx.session_id = ctx.user_id + '_' +
    (System.currentTimeMillis() / 3600000); // 每小时一个会话
    }

    // 实时特征计算
    if (ctx.dwell_time != null) {
    if (ctx.dwell_time > 30000) { // 30秒
    ctx.engagement_level = 'high';
    } else if (ctx.dwell_time > 10000) { // 10秒
    ctx.engagement_level = 'medium';
    } else {
    ctx.engagement_level = 'low';
    }
    }

    // 内容类别偏好
    if (ctx.category != null) {
    ctx.user_preference = ctx.category + '_' + ctx.event_type;
    }
    """
    }
    },
    {
    "convert": {
    "field": "dwell_time",
    "type": "long",
    "ignore_missing": true
    }
    },
    {
    "set": {
    "field": "is_mobile",
    "value": "{{user_agent}} =~ /Mobile|Android|iPhone/",
    "if": "ctx.user_agent != null"
    }
    },
    {
    "set": {
    "field": "local_hour",
    "value": "{{timestamp}}",
    "format": "HH",
    "timezone": "{{timezone}}"
    }
    },
    {
    "set": {
    "field": "day_of_week",
    "value": "{{timestamp}}",
    "format": "e" // 1=Monday, 7=Sunday
    }
    }
    ]
    }
  • 系统指标监控处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    PUT _ingest/pipeline/system_monitoring
    {
    "description": "系统监控指标处理与告警预处理",
    "processors": [
    {
    "set": {
    "field": "metric_type",
    "value": "system_monitoring"
    }
    },
    {
    "script": {
    "lang": "painless",
    "source": """
    // 阈值检查
    if (ctx.metric_name == 'cpu_usage' && ctx.value > 90) {
    ctx.alert_level = 'critical';
    ctx.alert_message = 'CPU使用率超过90%';
    ctx.needs_alert = true;
    }

    if (ctx.metric_name == 'memory_usage' && ctx.value > 85) {
    ctx.alert_level = 'warning';
    ctx.alert_message = '内存使用率超过85%';
    ctx.needs_alert = true;
    }

    if (ctx.metric_name == 'disk_usage' && ctx.value > 95) {
    ctx.alert_level = 'critical';
    ctx.alert_message = '磁盘使用率超过95%';
    ctx.needs_alert = true;
    }

    // 响应时间监控
    if (ctx.metric_name == 'response_time' && ctx.value > 1000) {
    ctx.alert_level = 'warning';
    ctx.alert_message = '响应时间超过1000ms';
    ctx.needs_alert = true;
    }

    // 错误率监控
    if (ctx.metric_name == 'error_rate' && ctx.value > 5) {
    ctx.alert_level = 'critical';
    ctx.alert_message = '错误率超过5%';
    ctx.needs_alert = true;
    }

    // 趋势分析
    if (ctx.previous_value != null) {
    ctx.change_percentage =
    ((ctx.value - ctx.previous_value) / ctx.previous_value) * 100;

    if (Math.abs(ctx.change_percentage) > 50) {
    ctx.significant_change = true;
    }
    }
    """
    }
    },
    {
    "convert": {
    "field": "value",
    "type": "float"
    }
    },
    {
    "set": {
    "field": "normalized_value",
    "value": "{{value}} / {{max_value}}",
    "if": "ctx.max_value != null"
    }
    },
    {
    "date": {
    "field": "timestamp",
    "formats": ["epoch_millis", "yyyy-MM-dd HH:mm:ss"],
    "target_field": "@timestamp"
    }
    },
    {
    "set": {
    "field": "collection_interval_minutes",
    "value": 5
    }
    },
    {
    "set": {
    "field": "data_center",
    "value": "{{hostname}} =~ /^us-/ ? 'us-east' : 'cn-north'",
    "if": "ctx.hostname != null"
    }
    },
    {
    "fingerprint": {
    "field": ["hostname", "metric_name", "@timestamp"],
    "method": "SHA-1",
    "target_field": "metric_id"
    }
    }
    ]
    }

自动路由(Routing)

自动路由执行的过程

pengline-mermaid-flowchart-1766401962593

Pipeline 执行:

  1. 解析模板: “user_”
  2. 获取 user_id 值: “123”
  3. 计算结果: “user_123”
  4. 设置 _routing = “user_123”

路由计算:

  1. hash(“user_123”) = 87654321 (示例值)

  2. 分片数: 5

  3. 分片ID: 87654321 % 5 = 1

  4. 文档存储在分片1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
PUT _ingest/pipeline/set_routing
{
"processors": [
{
"set": {
"field": "_routing", # 设置路由字段
"value": "user_{{userId}}", # 路由值
"if": "ctx.userId != null" # 避免 null
}
}
]
}

// 其他方式:组合多个字段
{
"set": {
"field": "_routing",
"value": "{{tenant}}_{{userId}}"
}
}
// 其他方式:哈希路由
{
"fingerprint": {
"fields": ["userid", "tenant"],
"method": "SHA-256",
"target_field": "_routing"
}
}
  1. 文档到达: {“user_id”: “123”, “name”: “Alice”}

  2. Pipeline处理: set _routing = “user_123”

  3. 路由计算: shard = hash(“user_123”) % num_shards

  4. 存储: 文档存储在计算出的分片

自动路由示例

场景:当数据正常插入时,根据指定的方式和策略自动将数据路由到指定分片上

示例:将同一用户数据路由到同一分片(根据user_id 路由分片)

通过 Pipeline 实现 Routing 路由分片策略功能:

  1. 客户端发送文档

  2. 经过 Ingest Pipeline

  3. Pipeline 计算 _routing 值

  4. ES 根据 _routing 哈希选择分片(HASH 策略)

  5. 文档存储到选定分片

  6. 查询时提供相同的 _routing

  7. ES 只搜索对应的分片

定义 Pipeline

以下提供多种路由策略。

方式一:使用 KV Processor + 自定义字段,直接通过字段值作为路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT _ingest/pipeline/order_user_pipeline
{
"description": "使用 KV 处理器提取路由信息",
"processors": [
{
"set": {
"field": "_routing",
"value": "{{userId}}" # 直接使用 userId 作为路由
}
},
{
"set": {
"field": "_routing_key",
"value": "user_{{userId}}", # 这个是额外字段,不是真正的路由
"override": false
}
}
]
}

方式二:使用 Fingerprint Processor + Set Processor,通过 hash 算法路由。在检索时也需要给 hash 值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
PUT _ingest/pipeline/order_user_pipeline
{
"description": "基于用户ID的路由Pipeline",
"processors": [
{
"fingerprint": {
"fields": ["userId"],
"method": "SHA-256",
"target_field": "_routing_hash"
}
},
{
"set": {
"field": "_routing",
"value": "{{_routing_hash}}"
}
},
{
"remove": {
"field": "_routing_hash",
"ignore_missing": true
}
}
]
}

方式三:使用 Dissect/Grok Processor 处理复杂关系 userId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PUT _ingest/pipeline/order_user_pipeline
{
"description": "处理带前缀的 userId 格式",
"processors": [
{
"grok": {
"field": "userId",
"patterns": [
"%{DATA:tenant}_%{DATA:actual_userId}",
"%{DATA:actual_userId}"
],
"pattern_definitions": {
"DATA": ".*"
},
"ignore_missing": true
}
},
{
"set": {
"description": "设置路由字段",
"field": "_routing",
"value": "{{actual_userId}}",
"override": true
}
},
{
"remove": {
"field": ["tenant", "actual_userId"],
"ignore_missing": true
}
}
]
}

方式四:使用 Conditional Processor + 多个 Set 处理器,可以添加多条件判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
PUT _ingest/pipeline/order_user_pipeline
{
"description": "基于多条件判断的路由策略",
"processors": [
{
"set": {
"description": "默认使用 userId 作为路由",
"field": "_routing",
"value": "{{userId}}"
}
},
{
"set": {
"description": "如果 userId 是数字格式,添加前缀",
"field": "_routing",
"value": "user_{{userId}}",
"override": true,
"if": "ctx.userId != null && ctx.userId.matches('^\\d+$')"
}
},
{
"set": {
"description": "如果包含 tenant,添加租户前缀",
"field": "_routing",
"value": "{{tenant}}_user_{{userId}}",
"override": true,
"if": "ctx.tenant != null && ctx.userId != null"
}
}
]
}

方式五:使用 Append Processor 组合路由键,可以拼接多个字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
PUT _ingest/pipeline/order_user_pipeline
{
"description": "使用 append 组合多字段路由",
"processors": [
{
"append": {
"field": "routing_fields",
"value": ["{{userId}}"]
}
},
{
"append": {
"field": "routing_fields",
"value": ["{{tenant}}"],
"if": "ctx.tenant != null"
}
},
{
"append": {
"field": "routing_fields",
"value": ["{{region}}"],
"if": "ctx.region != null"
}
},
{
"join": {
"field": "_routing",
"separator": "_",
"target_field": "routing_fields"
}
},
{
"remove": {
"field": "routing_fields",
"ignore_missing": true
}
}
]
}

方式六:使用 URL Decode + 路由标准化,如果 userId 值是通过编码的可以执行解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
PUT _ingest/pipeline/order_user_pipeline
{
"description": "处理 URL 编码的 userId",
"processors": [
{
"urldecode": {
"field": "userId",
"target_field": "userid_decoded",
"ignore_missing": true
}
},
{
"set": {
"field": "_routing",
"value": "{{userId_decoded}}",
"override": false
}
},
{
"remove": {
"field": "userId_decoded",
"ignore_missing": true
}
}
]
}

方式七:使用 Convert Processor + 类型转换,可以将值转为其他类型后再路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
PUT _ingest/pipeline/order_user_pipeline
{
"description": "确保路由值类型安全",
"processors": [
{
"convert": {
"field": "userId",
"type": "string",
"target_field": "userId_string",
"ignore_missing": true
}
},
{
"lowercase": {
"field": "userId_string",
"ignore_missing": true,
"target_field": "routing_value"
}
},
{
"trim": {
"field": "routing_value",
"ignore_missing": true
}
},
{
"set": {
"field": "_routing",
"value": "{{routing_value}}"
}
},
{
"remove": {
"field": ["userId_string", "routing_value"],
"ignore_missing": true
}
}
]
}

方式八:使用 Enrich Processor + 路由映射表,可以通过指定的映射作为路由键

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
###### 创建源索引(路由映射)
PUT user_routing_mapping
{
"mappings": {
"properties": {
"userid": {
"type": "keyword"
},
"shard_number": {
"type": "integer"
},
"routing_hash": {
"type": "keyword"
}
}
}
}

###### 创建 Enrich Policy
PUT /_enrich/policy/user-routing-policy
{
"match": {
"indices": "user_routing_mapping",
"match_field": "userid",
"enrich_fields": ["shard_number", "routing_hash"]
}
}

###### 执行 Policy
POST /_enrich/policy/user-routing-policy/_execute

###### 创建使用 Enrich 的 Pipeline
PUT _ingest/pipeline/order_user_pipeline
{
"description": "使用 Enrich 查找路由信息",
"processors": [
{
"enrich": {
"policy_name": "user-routing-policy",
"field": "userId",
"target_field": "routing_info",
"max_matches": 1
}
},
{
"set": {
"field": "_routing",
"value": "{{routing_info.routing_hash}}",
"override": false
}
},
{
"remove": {
"field": "routing_info",
"ignore_missing": true
}
}
]
}

测试 Pipeline

可以查看根据以上策略生成的 routing 值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST _ingest/pipeline/order_user_pipeline/_simulate
{
"docs": [
{
"_source": {
"user_id": "501",
"order_id": "ORD123",
"amount": 100.50,
"region": "CN-North"
}
},
{
"_source": {
"user_id": "503",
"order_id": "ORD124",
"amount": 500.00,
"region": "US-West"
}
}
]
}

验证 Pipeline 结果。使用 pipeline 根据 userId 路由分区,多条数据测试结果如下:

GET /orders/_search?explain=true

  • id = 1001,userId= 501,*routing = 501,*shard:0

  • id = 1004,userId= 501,*routing = 501,*shard:0

  • id = 1007,userId= 501,*routing = 501,*shard:0

  • id = 1009,userId= 501,*routing = 501,*shard:0

  • id = 1003,userId= 503,*routing = 503,*shard:0

  • id = 1006,userId= 505,*routing = 505,*shard:1

  • id = 1005,userId= 504,*routing = 504,*shard:2

  • id = 1008,userId= 506,*routing = 506,*shard:2

  • id = 1002,userId= 502,*routing = 502,*shard:3

  • id = 1010,userId= 507,*routing = 507,*shard:3

证明 pipeline 的自动路由生效,routing 值已经为 userId,且 shard 也是根据 userId

绑定 Pipeline

Pipeline 与 Routing 的绑定关系,指定 Pipeline 实现 Routing 路由

方法一:在 创建索引时指定 default_pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
PUT /orders
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2,
"index.default_pipeline": "order_user_pipeline", # 指定 pipeline,以上创建的 order_user_pipeline
"final_pipeline": "order_user_pipeline", # 禁止覆盖 default_pipeline
"analysis": {
"analyzer": {
"standard_analyzer": {
"type": "standard",
"stopwords": "_none_"
}
}
}
},
"mappings": {
"_routing": {
"required": false
},
"properties": {
"id": {
"type": "long"
},
"orderNo": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"userId": {
"type": "long"
},
"orderItems": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"productName": {
"type": "text",
"analyzer": "standard_analyzer"
},
"unitPrice": {
"type": "scaled_float",
"scaling_factor": 100
}
}
}
}
}
}

方法二:已创建的索引设置默认 Pipeline

所有写入 orders 索引的文档(除非显式覆盖)都会自动应用该 pipeline

1
2
3
4
PUT orders/_settings
{
"index.default_pipeline": "order_user_pipeline"
}

方法三:在索引数据插入时使用指定 Pipeline(覆盖默认)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST orders/_doc/11?pipeline=order_user_pipeline
{
"id": 11,
"orderNo": "ORD202312030011",
"status": "DELIVERED",
"userId": 10003,
"orderItems": [
{
"id": 105,
"productName": "智能手机",
"unitPrice": 1889.99
}
]
}

命令写入数据

正常方式插入数据即可,支持批量插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
POST /orders/_doc/1
{
"id": 1,
"orderNo": "ORD202312030001",
"status": "PAID",
"userId": 10001,
"orderItems": [
{
"id": 101,
"productName": "智能手机",
"unitPrice": 899.99
},
{
"id": 102,
"productName": "蓝牙耳机",
"unitPrice": 299.98
}
]
}


POST /orders/_doc/2
{
"id": 2,
"orderNo": "ORD202312030002",
"status": "DELIVERED",
"userId": 10001,
"orderItems": [
{
"id": 105,
"productName": "智能手环",
"unitPrice": 489.99
}
]
}


POST /orders/_doc/3
{
"id": 3,
"orderNo": "ORD202312030003",
"status": "SHIPPING",
"userId": 10002,
"orderItems": [
{
"id": 201,
"productName": "纸巾",
"unitPrice": 9.9
},
{
"id": 102,
"productName": "蓝牙耳机",
"unitPrice": 299.98
},
{
"id": 103,
"productName": "笔记本",
"unitPrice": 59.0
}
]
}

POST /orders/_doc/4
{
"id": 4,
"orderNo": "ORD202312030004",
"status": "DELIVERED",
"userId": 10003,
"orderItems": [
{
"id": 101,
"productName": "智能手机",
"unitPrice": 899.99
},
{
"id": 102,
"productName": "蓝牙耳机",
"unitPrice": 299.98
}
]
}


POST /orders/_doc/5
{
"id": 5,
"orderNo": "ORD202312030005",
"status": "PAID",
"userId": 10001,
"orderItems": [
{
"id": 107,
"productName": "鼠标",
"unitPrice": 49.99
}
]
}

Flink SQL ,从Mysql 中读取数据(批量写入 ES),Flink 1.19 及之前不支持标准 SQL 的 ARRAY_AGG,执行命令:

需提前下载(Apache Archive Distribution Directory):

解压 Flink 后,将所有 JAR 都放入 flink-1.20.3/lib/ 目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 先启动 Flink 集群,JobManager + TaskManager。
./bin/start-cluster.sh
# 查看 Web UI:
curl http://localhost:8081

# 第1种方式:直接运行
flink run -d \
-p 4 \
-sql "orders_to_es.sql" \
--jarfile flink-sql-connector-elasticsearch7-3.1.0-1.20.jar \
--jarfile flink-connector-jdbc-3.3.0-1.20.jar \
--jarfile mysql-connector-j-8.4.0.jar

# 第2种方式:启动 SQL Client
./bin/sql-client.sh -f orders_to_es.sql

# 检查数据同步情况
curl -XGET "http://localhost:9200/orders/_search?pretty"

同步脚本 orders_to_es.sql,以下不支持实时同步,只用于批量数据导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
-- 强制设置执行模式为 streaming
SET 'execution.runtime-mode' = 'streaming';

-- ========= 1. 源表:orders =========
CREATE TEMPORARY TABLE `orders`
(
`id` BIGINT,
`order_no` STRING,
`status` STRING,
`user_id` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC',
'table-name' = 'orders',
'username' = 'root',
'password' = '123456',
'scan.fetch-size' = '1000',
'scan.partition.column' = 'id', -- 可选:并行读取
'scan.partition.num' = '2',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '10000'
);

-- ========= 2. 源表:order_items =========
CREATE TEMPORARY TABLE `order_item`
(
`id` BIGINT,
`order_id` BIGINT,
`product_name` STRING,
`unit_price` DECIMAL,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC',
'table-name' = 'order_item',
'username' = 'root',
'password' = '123456',
'scan.fetch-size' = '1000'
);

-- ========= 3. 目标表:Elasticsearch =========
CREATE TEMPORARY TABLE `es_orders` (
`id` BIGINT,
`orderNo` STRING,
`status` STRING,
`userId` BIGINT,
`orderItems` ARRAY<ROW<
`id` BIGINT,
`productName` STRING,
`unitPrice` DOUBLE
>>,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.3.146:9200',
'index' = 'orders',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.interval' = '10s'
);


-- ========= 4. 批量插入 =========
INSERT INTO `es_orders`
SELECT
o.id,
o.order_no AS orderNo,
o.status,
o.user_id AS userId,
ARRAY_AGG(
ROW(
i.id,
i.product_name,
CAST(i.unit_price AS DOUBLE) -- unitPrice (scaled_float)
)
) AS orderItems
FROM orders AS o
LEFT JOIN order_item AS i ON o.id = i.order_id
GROUP BY o.id, o.order_no, o.status, o.user_id;

该版本不支持 Ingest Pipeline

ES 的 scaled_float 底层存储为 long,但写入时仍用 double,ES 自动乘以 scaling_factor。例如:99.99 → ES 存为 9999(当 scaling_factor=100)。

嵌套结构实现

  • 使用 ARRAY_AGG(ROW(...)) 将多行 order_items 聚合成 ARRAY<ROW<...>>

  • Flink ES connector 会自动将其映射为 ES 的 nested 类型(前提是 ES 索引 mapping 已正确定义)。

批量写入优化

  • sink.bulk-flush.max-actions = '1000':每 1000 条触发一次 bulk;

  • sink.bulk-flush.interval = '10s':最多 10 秒 flush 一次;

  • 支持 并行写入(调整 Flink 并行度)。

检索数据

若不添加参数 routing 则会检索所有分片,加了会根据路由检索指定分片。

针对不同的路由策略,检索时的参数也需要对应:

第一种方式,直接使用 属性值的检索:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# 指定路由值
GET /orders/_search?routing=10001
{
"query": {
"terms": {
"userId": ["10001", "10002"]
}
}
}

# 请求体中可以不用 userId
GET /orders/_search?routing=10001
{
"query": {
"bool": {
"filter": [
{"term": {"status": "PAID"}}
]
}
}
}

GET /orders/_search?routing=10003
{
"query": {
"bool": {
"filter": [
{"term": {"orderNo": "ORD202312030004"}}
]
}
}
}

# 添加路由值和其他过滤参数
GET /orders/_search?routing=10001
{
"query": {
"bool": {
"must": [
{
"terms": {
"userId": ["10001", "10002"]
}
}
],
"filter": [
{
"term": {
"status": "PAID"
}
}
]
}
},
"fields": ["_shard", "userId", "orderId", "status"],
"_source": false,
"size": 10
}


# 包含 nested 的嵌套查询
GET /orders/_search?routing=10001
{
"query": {
"bool": {
"must": [
{
"nested": {
"path": "orderItems",
"query": {
"bool": {
"should": [
{
"match": {
"orderItems.productName": {
"query": "蓝牙耳机"
}
}
}
]
}
}
}
}
],
"should": [
{
"wildcard": {
"status": "PAID"
}
}
]
}
}
}

其他方式,需要获取指定的检索值。