将 MySQL 数据同步至 Elasticsearch(ES)或 Redis 是系统中的常见需求。根据业务场景、数据量、一致性要求和实时性要求,可以采用多种策略。

双写机制(增量)

应用在写入 MySQL 的同时,也向 ES 写入一份数据。

优点

  • 实现简单,无需额外中间件。
  • 写入延迟低(同步写)。

缺点

  • 一致性难保障:若 ES 写入失败,MySQL 已提交,会造成数据不一致。

  • 耦合度高:业务代码需同时处理两个数据源。

sequenceDiagram
    participant Client as 客户端
    participant App as 应用服务
    participant MySQL as MySQL
    participant ES as Elasticsearch

    Client->>App: 发送写请求 (e.g., 更新用户)
    App->>MySQL: 1. 执行 SQL 写入/更新
    MySQL-->>App: 2. 返回成功
    alt MySQL 写入成功
        App->>ES: 3. 同步写入 ES 文档
        alt ES 写入成功
            ES-->>App: 4. 返回成功
            App-->>Client: 5. 返回操作成功
        else ES 写入失败
            Note right of App: 数据不一致!
MySQL 已提交,ES 未更新 App-->>Client: 6. 返回部分成功或错误 end else MySQL 写入失败 App-->>Client: 返回失败(ES 不会写入) end

适用场景:数据一致性要求不高、写入量小、业务逻辑简单的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class DualWriteDemo {
private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
private static final String DB_USER = "root";
private static final String DB_PASS = "123456";

public static void main(String[] args) throws Exception {
// 初始化 ES Client
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient esClient = new ElasticsearchClient(transport);

long userId = 1001L;
String name = "张三";

// 1. 写 MySQL
try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS)) {
String sql = "INSERT INTO users(id, name) VALUES (?, ?) " +
"ON DUPLICATE KEY UPDATE name = ?";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, userId);
ps.setString(2, name);
ps.setString(3, name);
ps.executeUpdate();
}
}

// 2. 写 ES
esClient.index(new IndexRequest.Builder<>()
.index("users")
.id(String.valueOf(userId))
.document(java.util.Map.of("id", userId, "name", name))
.build()
);

esClient._transport().close();
}
}

定时轮询(全量/增量)

通过定时任务,定时查询 MySQL(如通过 update_timeID 字段),将新增或修改的数据同步到 ES。

优点

  • 实现简单,无需 binlog 权限。
  • 适合历史数据初始化。

缺点

  • 延迟高:取决于轮询间隔。

  • 性能压力:频繁查询可能影响 MySQL。

  • 难以处理删除操作(需软删除标记)。

sequenceDiagram
    participant Scheduler as 定时调度器
    participant SyncService as 同步服务
    participant MySQL as MySQL
    participant ES as Elasticsearch

    loop 每 N 秒/分钟执行一次
        Scheduler->>SyncService: 触发同步任务
        SyncService->>MySQL: 1. 查询增量数据
(如 WHERE update_time > '${last_sync_time}') MySQL-->>SyncService: 2. 返回变更记录列表 alt 存在变更数据 loop 遍历每条记录 SyncService->>ES: 3. 写入/更新 ES 文档 ES-->>SyncService: 4. 确认写入 end SyncService->>SyncService: 5. 更新 last_sync_time = max(update_time) else 无新数据 Note over SyncService: 无操作,等待下次轮询 end end

适用场景:对实时性要求低、数据变更频率低的小型系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 伪代码:定时轮询 MySQL 同步到 Elasticsearch
public class MySqlToEsSyncDemo {
private static final String ES_INDEX = "demo";
private static final String MYSQL_QUERY_TEMPLATE =
"SELECT id, name, update_time FROM demo WHERE update_time > ? ORDER BY update_time ASC";

private DataSource mysqlDataSource; // MySQL 连接池
private RestHighLevelClient esClient; // Elasticsearch 客户端

// 定时任务入口
public static void main(String[] args) {
MySqlToEsSyncDemo sync = new MySqlToEsSyncDemo();
sync.init();

// 使用 ScheduledExecutorService 定时执行
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(sync::syncData, 0, 30, TimeUnit.SECONDS); // 每30秒同步一次
}

private void init() {
// 初始化 MySQL 数据源
mysqlDataSource = createMySqlDataSource();
// 初始化 ES 客户端
esClient = createEsClient();
}

private void syncData() {
try {
// 读取上次同步的时间戳
Instant lastSyncTime = readLastSyncTime();

// 查询 MySQL 中 update_time > lastSyncTime 的记录
List<Record> records = queryFromMysql(lastSyncTime);
if (records.isEmpty()) {
return;
}

// 批量写入 Elasticsearch
BulkRequest bulkRequest = new BulkRequest();
for (Record record : records) {
// 构造 ES Document
Map<String, Object> source = new HashMap<>();
source.put("id", record.id);
source.put("name", record.name);
source.put("update_time", record.updateTime);

// 使用 id 作为 ES 的 _id,避免重复
bulkRequest.add(new IndexRequest(ES_INDEX)
.id(String.valueOf(record.id))
.source(source));
}

BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (!bulkResponse.hasFailures()) {
// 更新最后同步时间(取本次同步的最新 update_time)
Instant newSyncTime = records.get(records.size() - 1).updateTime;
writeLastSyncTime(newSyncTime);
} else {
log.error("Bulk write to ES failed");
}

} catch (Exception e) {
log.error("Sync failed", e);
}
}

private Instant readLastSyncTime() {
// 从 Redis / DB 中读取上次同步时间,若首次运行,返回一个较早的时间(如 1970-01-01)
}

private void writeLastSyncTime(Instant time) {
// 持久化最新的同步时间
}

private List<Record> queryFromMysql(Instant sinceTime) {
// 注意:update_time 字段需有索引以提高查询效率
}
}

消息队列(全量/增量)

应用在写 MySQL 后,发送消息到 Kafka/RabbitMQ,由消费者负责写入 ES。

优点

  • 解耦应用与 ES。
  • 可重试、削峰填谷。

缺点

  • 存在“先写 DB 成功、发消息失败”的一致性问题(需事务消息或本地消息表)。

    • 本地事务表 + 定时补偿
    • RocketMQ 事务消息Kafka 事务(需强一致性)。
  • 架构复杂度增加。

sequenceDiagram
    participant Client as 客户端
    participant App as 应用服务
    participant MQ as 消息队列
    participant SyncConsumer as ES 同步消费者
    participant MySQL as MySQL
    participant ES as Elasticsearch

    Client->>App: 发送写请求 (e.g., 创建/更新用户)
    
    %% 应用先写数据库(关键:DB 写入必须在发消息前完成)
    App->>MySQL: 1. 执行 SQL 写入/更新
    MySQL-->>App: 2. 返回成功

    alt MySQL 写入成功
        App->>MQ: 3. 发送变更消息
({id:1001, name:"张三", op:"upsert"}) MQ-->>App: 4. 消息确认 (ACK) App-->>Client: 5. 返回操作成功 %% 异步消费 activate SyncConsumer MQ->>SyncConsumer: 6. 推送/拉取消息 SyncConsumer->>ES: 7. 写入/更新 ES 文档 alt ES 写入成功 ES-->>SyncConsumer: 8a. 确认 SyncConsumer->>MQ: 9a. 提交偏移量 (Commit Offset) else ES 写入失败 Note right of SyncConsumer: 重试或进入死信队列 loop 重试 (带退避) SyncConsumer->>ES: 重试写入 end %% 可选:记录失败日志供人工干预 end deactivate SyncConsumer else MySQL 写入失败 App-->>Client: 返回失败(不发消息) end

Producer:业务服务写 DB成功好发送 RocketMQ

全量同步时,发送所有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Service
public class EsSyncProducer {

@Autowired
private UserMapper userMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;

@Transactional
public void createUser(String name, String email) {
User user = new User();
user.setName(name);
user.setEmail(email);
user.setCreateTime(LocalDateTime.now());
userMapper.insert(user);

// 构造同步事件(必须在事务提交后发送,避免回滚后消息已发)推荐使用 TransactionalEventListener 或异步线程池
sendSyncEventToMQ(user, "INSERT");
}

// 手动触发事件(在事务内调用)
private void sendSyncEventToMQ(User user, String eventType) {
// Spring 会在事务成功提交后触发 @EventListener
applicationEventPublisher.publishEvent(
new UserSavedEvent(this, user, true)
);
}

// 使用 TransactionalEventListener 确保在事务成功提交后才发消息
@EventListener(condition = "#event.success")
@Async
public void handleUserSaved(UserSavedEvent event) {
SyncEvent syncEvent = new SyncEvent();
syncEvent.setEventType("INSERT");
syncEvent.setTable("user");
syncEvent.setId(event.getUser().getId());
syncEvent.setData(Map.of(
"id", event.getUser().getId(),
"name", event.getUser().getName(),
"email", event.getUser().getEmail()
));

rocketMQTemplate.convertAndSend("MYSQL_TO_ES_TOPIC", syncEvent);
}

}

Consumer:消费 RocketMQ 消息并同步到 Elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
@RocketMQMessageListener(
topic = "MYSQL_TO_ES_TOPIC",
consumerGroup = "es-sync-group"
)
public class EsSyncConsumer implements RocketMQListener<SyncEvent> {

@Autowired
private RestHighLevelClient esClient;

@Override
public void onMessage(SyncEvent event) {
try {
String index = event.getTable();
String docId = String.valueOf(event.getId());
Map<String, Object> doc = event.getData();

if ("DELETE".equals(event.getEventType())) {
esClient.delete(
new DeleteRequest(index).id(docId),
RequestOptions.DEFAULT
);
} else {
esClient.index(
new IndexRequest(index)
.id(docId)
.source(doc, XContentType.JSON),
RequestOptions.DEFAULT
);
}

} catch (Exception e) {
log.error("ES sync failed, event: {}, error: {}", event, e.getMessage());
throw new RuntimeException(e); // 触发重试
}
}
}

三方工具(生产)

基于 Binlog 工具

通过解析 MySQL 的 binlog(如使用 Canal、Debezium、Maxwell),捕获数据变更事件,再推送到 ES。

流程:MySQL → Binlog → Canal/Debezium → Kafka → 消费者 → ES

优点

  • 解耦:业务无感知,不侵入应用代码。
  • 可靠性高:基于 binlog,能保证数据变更不丢失(配合 ACK 机制)。
  • 支持增量同步

缺点

  • 部署维护较复杂(需部署 binlog 解析服务 + 消息队列)。

  • 需处理 DDL 变更、字段映射、ES 文档结构变更等问题。

  • 涉及节点较多,变更维护成本高

sequenceDiagram
    participant MySQL as MySQL
    participant CanalServer as Canal Server
    participant CanalClient as Canal Client (Java 应用)
    participant ES as Elasticsearch

    Note over MySQL: 开启 binlog
binlog-format=ROW %% 初始化:Canal Server 连接 MySQL CanalServer->>MySQL: 1. 模拟 MySQL Slave
请求 binlog dump MySQL-->>CanalServer: 2. 允许连接,发送 binlog 流 %% 数据变更触发 Note right of MySQL: 用户执行:
UPDATE users SET name='李四' WHERE id=1001; MySQL->>MySQL: 3. 写入 binlog (Row 格式) MySQL->>CanalServer: 4. 实时推送 binlog 事件 CanalServer->>CanalClient: 5. 推送解析后的结构化变更事件
(如 JSON 或 Protocol Buffer) activate CanalClient CanalClient->>CanalClient: 6. 解析事件:
table=test.users, op=UPDATE,
after={id:1001, name:"李四"} alt 是目标表(users) CanalClient->>ES: 7. 构造 ES 文档并写入
PUT /users/_doc/1001 { "name": "李四" } ES-->>CanalClient: 8. 写入成功响应 CanalClient->>CanalServer: 9. 确认消费位点 (ACK) else 非目标表 CanalClient->>CanalServer: 忽略并 ACK end deactivate CanalClient Note over CanalClient: 持续监听,支持 INSERT/UPDATE/DELETE

常见工具

  • Canal(阿里开源,Java)

  • Debezium(基于 Kafka Connect,支持多种数据库)

  • Maxwell(轻量,输出 JSON 到 Kafka/Stdout)

适用场景:高可靠、准实时同步,数据量大,需解耦业务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class CanalSyncDemo {

public static void main(String[] args) throws Exception {
// 1. 连接 Canal Server
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe("test\\.users");

// 2. 初始化 ES
ElasticsearchClient esClient = createEsClient();

while (true) {
Message message = connector.get(1000);
handleEntries(message.getEntries(), esClient);
}
}

private static void handleEntries(List<CanalEntry.Entry> entries, ElasticsearchClient esClient) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue; // 非行数据,跳过
}

try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
handleRowChange(rowChange, esClient);
} catch (InvalidProtocolBufferException e) {
log.error("Failed to parse RowChange: " + e.getMessage());
}
}
}

private static void handleRowChange(CanalEntry.RowChange rowChange, ElasticsearchClient esClient) {
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) {
return; // 只处理 INSERT 和 UPDATE
}

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
extractAndSyncToEs(rowData, esClient);
}
}

private static void extractAndSyncToEs(CanalEntry.RowData rowData, ElasticsearchClient esClient) {
Map<String, String> afterMap = rowData.getAfterColumnsList().stream()
.filter(CanalEntry.Column::getUpdated) // 只取更新过的列(或保留所有)
.collect(Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue,
(v1, v2) -> v1 // 防重,理论上不会有重复列名
));

// 如果 afterColumns 为空(比如 DELETE),跳过
if (afterMap.isEmpty()) {
return;
}

String id = afterMap.get("id");
String name = afterMap.get("name");

if (id == null || name == null) {
return;
}

try {
esClient.index(b -> b
.index("users")
.id(id)
.document(Map.of("id", Long.parseLong(id), "name", name))
);
} catch (Exception e) {
log.error("Failed to sync to ES: id=" + id + ", error=" + e.getMessage());
}
}
}

Logstash 插件

Logstash 通过 jdbc input 插件定期查询 MySQL,同步到 ES。

优点

  • 配置简单,Elastic 官方支持。
  • 支持 SQL 定制查询。

缺点

  • 本质仍是轮询,无法做到实时。

  • 不支持监听 binlog。

sequenceDiagram
    participant Scheduler as Logstash 调度器
    participant JDBCInput as JDBC Input 插件
    participant MySQL as MySQL
    participant Filter as (可选) Filter 插件
    participant ESOutput as Elasticsearch Output 插件
    participant ES as Elasticsearch

    loop 按 schedule 配置周期执行 (e.g., 每30秒)
        Scheduler->>JDBCInput: 触发下一次轮询
        JDBCInput->>MySQL: 1. 执行预定义 SQL 查询
(含增量条件,如 update_time > :sql_last_value) MySQL-->>JDBCInput: 2. 返回结果集 (ResultSet) alt 有新数据返回 JDBCInput->>Filter: 3. (可选) 字段清洗/转换 Filter-->>ESOutput: 处理后的事件 (Event) ESOutput->>ES: 4. 批量写入 ES
(使用 document_id 映射主键) ES-->>ESOutput: 5. 写入确认 ESOutput->>JDBCInput: 6. 更新 last_run_metadata
(记录本次最大 update_time) else 无新数据 Note over JDBCInput: 无事件生成,跳过输出 end end

适用场景:中小型项目,允许分钟级延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
input {
jdbc {
jdbc_driver_library => "/path/mysql-connector-java-8.0.33.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
jdbc_user => "logstash"
jdbc_password => "logstash"

# 增量同步关键:使用时间戳字段
schedule => "*/30 * * * *" # 每30秒执行
statement => "SELECT id, name, update_time FROM users WHERE update_time > :sql_last_value"
use_column_value => true
tracking_column => "update_time"
tracking_column_type => "timestamp"

# 记录上次同步时间的文件
last_run_metadata_path => "/etc/logstash/.users_last_run"
}
}

filter {
# 可选:重命名字段、添加标签等
mutate {
copy => { "id" => "[@metadata][_id]" }
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "users"
document_id => "%{[@metadata][_id]}" # 确保 ES 文档 ID 与 MySQL 主键一致
}
}

基于 Flink 的 CDC 连接器(如 flink-cdc-connectors),直接读取 MySQL binlog,进行流式 ETL 后写入 ES。

MySQL → Flink CDC Source → Flink Job(Transform) → Flink ES Sink → Elasticsearch

优势

  • 真正的流处理,低延迟、高吞put。

  • 支持多种数据库(MySQL、Doris、Oracle、Postgres、SQLServer、DB2、MongoDB、Elasticsearch等)

  • 支持 Exactly-Once 语义(需配合 Flink Checkpoint)。

  • 可在同步过程中做字段转换、JOIN、聚合等。

  • 支持多种同步模式:

    • Flink SQL:声明式 (SQL),适用于 简单ETL、分析场景
    • Table API:程序式声明,适用于中等复杂度场景
    • DataStream API:命令式,适用于复杂业务逻辑场景
    • Pipeline Config:配置声明式,适用于快速同步任务场景
sequenceDiagram
    participant MySQL as MySQL
    participant FlinkJob as Flink Job (含 CDC Source)
    participant FlinkEngine as Flink Runtime Engine
    participant ESSink as Elasticsearch Sink
    participant ES as Elasticsearch

    Note over MySQL: 开启 binlog
binlog-format=ROW %% 初始化:Flink CDC Source 连接 MySQL FlinkJob->>MySQL: 1. 启动时读取表结构 + 当前 binlog 位点 MySQL-->>FlinkJob: 2. 返回 schema 和初始快照(可选) alt 全量 + 增量模式(默认) FlinkJob->>MySQL: 3a. 先 SELECT 全量数据(快照阶段) MySQL-->>FlinkJob: 3b. 返回全量结果集 FlinkJob->>ESSink: 4a. 写入 ES(全量初始化) end %% 持续监听 binlog Note right of MySQL: 用户执行:
INSERT INTO users VALUES(1002, '王五'); MySQL->>MySQL: 5. 写入 binlog (Row 格式) MySQL->>FlinkJob: 6. Flink CDC Source 实时拉取 binlog 事件 FlinkJob->>FlinkEngine: 7. 转换为 DataStream activate FlinkEngine FlinkEngine->>FlinkEngine: 8. (可选) 流式 ETL:
字段过滤、类型转换、脱敏等 FlinkEngine->>ESSink: 9. 发送记录到 ES Sink ESSink->>ES: 10. 批量/逐条写入 ES
(使用主键作为 document_id) ES-->>ESSink: 11. 写入确认 ESSink-->>FlinkEngine: 12. 触发 checkpoint ACK FlinkEngine->>FlinkJob: 13. 持久化 binlog 位点(Exactly-Once 保障) deactivate FlinkEngine Note over FlinkJob: 持续处理 INSERT/UPDATE/DELETE
DELETE → 调用 ES delete API

适用场景:需要实时计算 + 同步的复杂数据管道。

组件 作用 是否需手动安装
flink-dist-*.jar Flink 核心运行时 无需,自带
flink-table-api-java-*.jar Table API 基础类 无需,自带(SQL 依赖 Table API)
flink-table-planner_*.jar SQL 解析器、优化器、执行器(Blink Planner) 无需,自带
flink-table-runtime-*.jar 运行时函数、类型系统 无需,自带
flink-json-*.jar JSON 格式支持(如 JSON_OBJECT 无需,自带
MySQL / JDBC flink-connector-jdbc-*.jar 手动下载并放入 lib/ 目录
MySQL / CDC flink-sql-connector-mysql-cdc-*.jar 手动下载并放入 lib/ 目录
Elasticsearch 7 flink-sql-connector-elasticsearch7-*.jar 手动下载并放入 lib/ 目录
MySQL 驱动 mysql-connector-j-8.0.33.jar(或 9.x) 手动下载并放入 lib/ 目录
Kafka flink-sql-connector-kafka-*.jar 非必须

示例:从 MySQL 到 Elasticsearch 的同步,将 orders 和 order_items 表关联后写入 ES。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 定义数据结构
public static class Order {
public long id;
public String orderNo;
public String status;
public long userId;
public List<OrderItem> items;

public Order(long id, String orderNo, String status, long userId, List<OrderItem> items) {
this.id = id;
this.orderNo = orderNo;
this.status = status;
this.userId = userId;
this.items = items;
}
}

public static class OrderItem {
public long id;
public long orderId;
public String productName;
public double unitPrice;

public OrderItem(long id, long orderId, String productName, double unitPrice) {
this.id = id;
this.orderId = orderId;
this.productName = productName;
this.unitPrice = unitPrice;
}
}

Flink SQL 实现数据同步(如 MySQL → Elasticsearch)主要有两种主流方式:

  1. 基于 JDBC 的批量/轮询同步(Polling-based)

  2. 基于 CDC(Change Data Capture)的实时增量同步

对于这种标准的关联查询,SQL 是最简洁、最高效的方式。我用的 ES 版本 为 7.17.29

对比项 JDBC 轮询 CDC
数据捕获层 SQL 查询(应用层) 数据库日志(存储引擎层)
变更可见性 只能看到最终状态 看到完整变更过程(I/U/D)
DELETE 支持 不支持(除非软删除) 原生支持
UPDATE 识别 需时间戳字段 自动识别 before/after 值
对源库影响 高(查询压力) 低(只读 binlog)
首次同步 全量查询 全量快照 + 增量日志
Exactly-Once 难实现 原生支持(配合 checkpoint)

需提前下载(Apache Archive Distribution Directory):

百度网盘分享的文件:Flink: https://pan.baidu.com/s/1q_W6tcfnfapHs9Ps37M7hA?pwd=wdv4 提取码: wdv4

解压 Flink 后,将所有 JAR 都放入 flink-1.20.3/lib/ 目录下

注意:该版本(1.20.3)不支持 Pipeline,如果你需要自动路由可能得需要其他方式或升级最新版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 先启动 Flink 集群,JobManager + TaskManager。
./bin/start-cluster.sh
# 查看 Web UI:
curl http://localhost:8081

# 第1种方式:直接运行
flink run -d \
-p 4 \
-sql "orders_to_es.sql" \
--jarfile flink-sql-connector-elasticsearch7-3.1.0-1.20.jar \
--jarfile flink-connector-jdbc-3.3.0-1.20.jar \
--jarfile mysql-connector-j-8.4.0.jar

# 第2种方式:启动 SQL Client
./bin/sql-client.sh -f orders_to_es.sql

# 检查数据同步情况
curl -XGET "http://localhost:9200/orders/_search?pretty"
JDBC 批量/轮询

无法捕获 DELETE 操作(除非有软删除字段如 is_deleted

  • Flink 通过 JDBC 连接器,周期性地全表扫描或按条件查询源表(如 SELECT * FROM orders WHERE update_time > last_max_time)。

  • 数据被视为静态快照(bounded stream)或周期性 append-only 流

  • 通常运行在 Batch 模式或 Streaming 模式下的有限流

orders_to_es.sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
-- ========= 1. 源表:orders =========
CREATE TEMPORARY TABLE `orders`
(
`id` BIGINT,
`order_no` STRING,
`status` STRING,
`user_id` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC',
'table-name' = 'orders',
'username' = 'root',
'password' = '123456',
'scan.fetch-size' = '1000',
'scan.partition.column' = 'id', -- 可选:并行读取
'scan.partition.num' = '2',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '10000'
);

-- ========= 2. 源表:order_items =========
CREATE TEMPORARY TABLE `order_item`
(
`id` BIGINT,
`order_id` BIGINT,
`product_name` STRING,
`unit_price` DECIMAL,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC',
'table-name' = 'order_item',
'username' = 'root',
'password' = '123456',
'scan.fetch-size' = '1000'
);

-- ========= 3. 目标表:Elasticsearch =========
CREATE TEMPORARY TABLE `es_orders` (
`id` BIGINT,
`orderNo` STRING,
`status` STRING,
`userId` BIGINT,
`orderItems` ARRAY<ROW<
`id` BIGINT,
`productName` STRING,
`unitPrice` DOUBLE
>>,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.3.146:9200',
'index' = 'orders',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.interval' = '10s'
);


-- ========= 4. 批量插入 =========
INSERT INTO `es_orders`
SELECT
o.id,
o.order_no AS orderNo,
o.status,
o.user_id AS userId,
ARRAY_AGG(
ROW(
i.id,
i.product_name,
CAST(i.unit_price AS DOUBLE) -- unitPrice (scaled_float)
)
) AS orderItems
FROM orders AS o
LEFT JOIN order_item AS i ON o.id = i.order_id
GROUP BY o.id, o.order_no, o.status, o.user_id;
CDC 全量+增量

支持 全量 + 增量自动切换(Flink CDC 2.0+)。

  • Flink 通过 CDC 连接器(如 Debezium)读取数据库的事务日志(MySQL 的 binlog、PostgreSQL 的 WAL)。

  • 直接解析 INSERT / UPDATE / DELETE 的原始变更事件(RowKind+I, -U, +U, -D)。

  • 数据以 changelog stream 形式流入 Flink,天然支持精确一次语义

‘scan.incremental.snapshot.enabled’ = ‘true’ – 启用全量+增量

orders_to_es.sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
-- ========= 1. 源表:orders =========
CREATE TEMPORARY TABLE `orders`
(
`id` BIGINT,
`order_no` STRING,
`status` STRING,
`user_id` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.146',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo_order',
'table-name' = 'orders',
'server-time-zone' = 'Asia/Shanghai'
);

-- ========= 2. 源表:order_items =========
CREATE TEMPORARY TABLE `order_item`
(
`id` BIGINT,
`order_id` BIGINT,
`product_name` STRING,
`unit_price` DECIMAL,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.146',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo_order',
'table-name' = 'order_item',
'server-time-zone' = 'Asia/Shanghai'
);

-- ========= 3. 目标表:Elasticsearch =========
CREATE TEMPORARY TABLE `es_orders` (
`id` BIGINT,
`orderNo` STRING,
`status` STRING,
`userId` BIGINT,
`orderItems` ARRAY<ROW<
`id` BIGINT,
`productName` STRING,
`unitPrice` DOUBLE
>>,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.3.146:9200',
'index' = 'orders',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.interval' = '1000',
'format' = 'json'
);


-- ========= 4. 批量插入 =========
INSERT INTO `es_orders`
SELECT
o.id,
o.order_no AS orderNo,
o.status,
o.user_id AS userId,
ARRAY_AGG(
ROW(
i.id,
i.product_name,
CAST(i.unit_price AS DOUBLE) -- unitPrice (scaled_float)
)
) AS orderItems
FROM orders AS o
LEFT JOIN order_item AS i ON o.id = i.order_id
GROUP BY o.id, o.order_no, o.status, o.user_id;
常见异常
  • 对于版本不兼容或不支持(Flink 1.19 及之前不支持标准 SQL 的 ARRAY_AGG),可能出错:

    image-20251227231702027
  • 如果报错:[ERROR] Could not execute SQL statement. Reason:java.net.ConnectException: 拒绝连接

    先查看 flink/log 下的日志详细信息 vim log/flink-root-sql-client-主机名.log

    如果出错,拒绝连接:localhost/127.0.0.1:8081,Flink SQL Client(sql-client.sh)默认会连接 本地 localhost:8081 的 JobManager,这是因为你 只解压了 Flink,没有启动集群导致的,解决方法:

    1
    2
    3
    # Flink SQL 不能独立运行,它需要一个 运行中的 Flink 集群来执行作业
    # 在 Flink 目录下,启动:JobManager(默认监听 localhost:8081)和 TaskManager
    ./bin/start-cluster.sh

    如果是mysql 无法连接,使用以下步骤排查:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    ping 192.168.3.146

    # 1.使用 MySQL 客户端连接
    mysql -h 192.168.3.146 -u root -p

    # 2.登录到 运行 Flink 任务的机器,执行:
    # 如果 不通,说明 Flink 节点网络隔离,无法访问 MySQL
    telnet 192.168.3.146 3306

    # 3.检查 MySQL 配置(在 MySQL 服务器上):
    # 如果返回 127.0.0.1,只允许本地连接,拒绝远程连接。正确值应为 0.0.0.0 或 192.168.3.146
    # 编辑 my.cnf(通常在 /etc/mysql/my.cnf 或 /etc/my.cnf 或 C:\ProgramData\MySQL\MySQL Server 8.0)
    # 在 [mysqld] 节点下添加:bind-address = 0.0.0.0,然后重启 MySQL:
    SHOW VARIABLES LIKE 'bind_address';

    # 4.MySQL 用户可能只允许从特定主机连接,检查用户权限:
    # 如果 host 是 localhost 或 127.0.0.1 不能从其他 IP 连。你需要 host = '%' 或 host = '你的Flink节点IP'。
    SELECT host, user FROM mysql.user WHERE user = 'root';

    # 5.防火墙或安全组拦截
    # MySQL 服务器防火墙(如 ufw、iptables)可能阻止 3306 端口。

    # 6.Flink 使用了错误的 URL(拼写、端口)
    # MySQL 8+ 默认要求 SSL,useSSL=false 可临时绕过(测试环境)
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC'

    # 7.MySQL 驱动版本兼容性问题,MySQL 9.x 驱动可能有行为变更(如默认 SSL、认证插件)
    # link 1.20 + MySQL 8.0 推荐使用 mysql-connector-j:8.0.33

    # 其他情况:如果你用的 VMware Linux服务器执行Flink,但是数据库在主机
    # 你需要使用这个网络:以太网适配器 VMware Network Adapter VMnet8,而不是:无线局域网适配器 WLAN
  • 如果JDBC数据同步失败,查看日志:vim log/flink-root-taskexecutor-0-主机名.log,详细出错信息

  • 如果CDC数据同步失败,查看日志:vim log/flink-root-standalonesession-0-主机名.log,报错:Failed to create Source Enumerator for source Source: order_item[3] java.lang.NullPointerException: null,说明Mysql连接的配置有误,可以删除多余的。

  • 如果出错:[ERROR] Could not execute SQL statement. Reason:java.io.StreamCorruptedException: unexpected block data,可能原因:

    • 版本不匹配:CDC 连接器版本与 Flink 版本不兼容

    • 序列化问题:Debezium 序列化器配置错误

    • 数据格式不一致:期望的数据格式与实际接收的不匹配

    • 清除检查点数据,重新启动

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      # 停止 Flink 集群
      cd flink-1.20.3
      ./bin/stop-cluster.sh

      # 删除所有检查点和状态数据
      rm -rf /tmp/flink-checkpoints /tmp/flink-savepoints 2>/dev/null
      rm -rf /tmp/blobStore-* 2>/dev/null
      rm -rf /tmp/hadoop-unjar-* 2>/dev/null

      # 删除 Flink 的临时目录
      find /tmp -name "*flink*" -type d -exec rm -rf {} + 2>/dev/null || true

      # 重启集群
      ./bin/start-cluster.sh

Pipeline Config 适合简单同步,只需要单表同步,无需关联处理的场景。对复杂关联支持有限

YAML 配置文件:mysql-to-es-pipeline.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 方案一:分别同步两个表到 ES(简单但无法关联)
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
database: test_db
tables: test_db.orders # 只能同步一个表
server-id: 5400-5404
server-time-zone: Asia/Shanghai

sink:
type: elasticsearch
hosts: http://localhost:9200
index: orders
username: elastic # 如果 ES 有认证
password: elastic # 如果 ES 有认证
document-type: _doc
bulk-flush:
max-actions: 1000
interval: 1s

pipeline:
name: MySQL Orders to ES
parallelism: 2

# 方案二:使用 Transformation 进行简单处理(需要 Flink CDC 3.0+)
# source: ...
# sink: ...
#
# transformation:
# - type: column-rename
# source-column: order_no
# target-column: orderNo
# - type: column-rename
# source-column: user_id
# target-column: userId
# - type: column-drop
# columns: [column_to_drop]

运行脚本:run-pipeline.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash
# 运行 Pipeline 模式

# 1. 下载 Flink CDC Pipeline JAR
# wget https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connectors/3.0.0/flink-cdc-pipeline-connectors-3.0.0.jar

# 2. 提交任务到 Flink
./bin/flink run \
-d \
-p 2 \
-c com.ververica.cdc.pipeline.cli.PipelineCLI \
flink-cdc-pipeline-connectors-3.0.0.jar \
--config mysql-to-es-pipeline.yaml

# 3. 查看任务状态
./bin/flink list

Table API 适合中等复杂度,需要在 SQL 和 DataStream 之间平衡的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class MySQLToESWithTableAPI {
public static void main(String[] args) {
// 1. 创建 Table 环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// 2. 创建 MySQL CDC 源表 - orders
tableEnv.executeSql(
"CREATE TABLE mysql_orders (\n" +
" id BIGINT,\n" +
" order_no STRING,\n" +
" status STRING,\n" +
" user_id BIGINT,\n" +
" PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'test_db',\n" +
" 'table-name' = 'orders'\n" +
")"
);

// 3. 创建 MySQL CDC 源表 - order_items
tableEnv.executeSql(
"CREATE TABLE mysql_order_items (\n" +
" id BIGINT,\n" +
" order_id BIGINT,\n" +
" product_name STRING,\n" +
" unit_price DECIMAL(10, 2),\n" +
" PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'test_db',\n" +
" 'table-name' = 'order_items'\n" +
")"
);

// 4. 创建 Elasticsearch 目标表
tableEnv.executeSql(
"CREATE TABLE es_orders (\n" +
" id BIGINT,\n" +
" orderNo STRING,\n" +
" status STRING,\n" +
" userId BIGINT,\n" +
" orderItems ARRAY<ROW<\n" +
" id BIGINT,\n" +
" productName STRING,\n" +
" unitPrice DOUBLE\n" +
" >>,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'elasticsearch-7',\n" +
" 'hosts' = 'http://localhost:9200',\n" +
" 'index' = 'orders',\n" +
" 'document-id.key-delimiter' = '$',\n" +
" 'format' = 'json'\n" +
")"
);

// 5. 使用 Table API 进行查询和写入
Table orders = tableEnv.from("mysql_orders");
Table orderItems = tableEnv.from("mysql_order_items");

// 使用 Table API 进行关联和聚合
Table result = orders
.leftOuterJoin(orderItems, $("id").isEqual($("order_id")))
.groupBy($("id"), $("order_no"), $("status"), $("user_id"))
.select(
$("id"),
$("order_no").as("orderNo"),
$("status"),
$("user_id").as("userId"),
collect(
row(
orderItems.$("id"),
orderItems.$("product_name").as("productName"),
orderItems.$("unit_price").cast(DataTypes.DOUBLE()).as("unitPrice")
)
).as("orderItems")
);

// 6. 执行并写入 ES
tableEnv.createTemporaryView("result_view", result);
tableEnv.executeSql("INSERT INTO es_orders SELECT * FROM result_view");
// result.executeInsert("es_orders");
}
}

这是最灵活的方式,需要复杂处理时用 DataStream API:如果有特殊的业务逻辑(如风控、复杂状态)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class MySQLToESWithDataStream {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);

// 2. 创建 MySQL CDC Source - orders
MySqlSource<String> ordersSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test_db")
.tableList("test_db.orders")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

// 3. 创建 MySQL CDC Source - order_items
MySqlSource<String> itemsSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test_db")
.tableList("test_db.order_items")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

// 4. 添加 Source 到数据流
DataStream<String> ordersStream = env.fromSource(
ordersSource,
WatermarkStrategy.noWatermarks(),
"MySQL Orders Source"
);

DataStream<String> itemsStream = env.fromSource(
itemsSource,
WatermarkStrategy.noWatermarks(),
"MySQL Order Items Source"
);

// 5. 解析和处理数据
// 解析 orders 流
DataStream<Order> orders = ordersStream
.map(new MapFunction<String, Order>() {
private final ObjectMapper mapper = new ObjectMapper();

@Override
public Order map(String value) throws Exception {
ObjectNode node = mapper.readValue(value, ObjectNode.class);
ObjectNode after = (ObjectNode) node.get("after");

if (after != null) {
long id = after.get("id").asLong();
String orderNo = after.get("order_no").asText();
String status = after.get("status").asText();
long userId = after.get("user_id").asLong();

return new Order(id, orderNo, status, userId, new ArrayList<>());
}
return null;
}
})
.filter(order -> order != null);

// 解析 order_items 流
DataStream<OrderItem> items = itemsStream
.map(new MapFunction<String, OrderItem>() {
private final ObjectMapper mapper = new ObjectMapper();

@Override
public OrderItem map(String value) throws Exception {
ObjectNode node = mapper.readValue(value, ObjectNode.class);
ObjectNode after = (ObjectNode) node.get("after");

if (after != null) {
long id = after.get("id").asLong();
long orderId = after.get("order_id").asLong();
String productName = after.get("product_name").asText();
double unitPrice = after.get("unit_price").asDouble();

return new OrderItem(id, orderId, productName, unitPrice);
}
return null;
}
})
.filter(item -> item != null);

// 6. 关联 orders 和 items(这里简化处理,实际需要 keyBy 和 connect)
// 实际生产环境可能需要使用 CoProcessFunction 或 IntervalJoin

// 7. 创建 Elasticsearch Sink
List<HttpHost> httpHosts = Arrays.asList(
new HttpHost("localhost", 9200, "http")
);

ElasticsearchSink.Builder<Order> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(Order order, RuntimeContext ctx, RequestIndexer indexer) -> {
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();

json.put("id", order.id);
json.put("orderNo", order.orderNo);
json.put("status", order.status);
json.put("userId", order.userId);

// 处理 orderItems
List<ObjectNode> itemList = new ArrayList<>();
for (OrderItem item : order.items) {
ObjectNode itemJson = mapper.createObjectNode();
itemJson.put("id", item.id);
itemJson.put("productName", item.productName);
itemJson.put("unitPrice", item.unitPrice);
itemList.add(itemJson);
}
json.set("orderItems", mapper.valueToTree(itemList));

IndexRequest indexRequest = new IndexRequest("orders")
.id(String.valueOf(order.id))
.source(mapper.writeValueAsString(json), XContentType.JSON);

indexer.add(indexRequest);
}
);

// 配置 ES Sink
esSinkBuilder.setBulkFlushMaxActions(1000);
esSinkBuilder.setBulkFlushInterval(1000L);
esSinkBuilder.setBulkFlushBackoff(true);

// 8. 添加 Sink
orders.addSink(esSinkBuilder.build());

// 9. 执行任务
env.execute("MySQL to ES DataStream Job");
}

}

Apache NiFi

可视化数据流引擎,通过 CaptureChangeMySQLQueryDatabaseTable 处理器捕获变更,输出到 ES。

优势:拖拽式 UI,支持复杂路由、重试、背压控制。

缺点:资源消耗较大,学习曲线较陡。

其他工具

社区有一些基于 Go/Python 的轻量 binlog 同步工具(如 go-mysql-elasticsearchGo-MySQL-ES),直接监听 binlog 并写 ES。

适合对轻量级、可控性要求高的场景,但需自行维护。

云原生

AWS

DMS(Database Migration Service) + OpenSearch(ES 托管版)

  • AWS DMS 支持从 RDS MySQL 持续复制(CDC) 到 OpenSearch。

  • 全托管,自动处理 binlog、断点续传、网络加密。

阿里云

DTS(Data Transmission Service) + Elasticsearch

  • DTS 支持 MySQL 到 阿里云 ES 的实时数据订阅与同步

  • 支持结构初始化、全量 + 增量同步。

  • 可过滤表、列,支持冲突处理。

Serverless 方案

  • AWS Lambda + RDS Binlog:DMS 将变更发布到 Kafka(MSK)或 SNS/SQS,触发 Lambda 写入 OpenSearch。

  • 阿里云函数计算(FC) + DTS 消息订阅:DTS 支持将变更以消息形式投递到 MNS/MQ,函数计算消费后写 ES。

优点:按需计费,无服务器运维。

缺点:冷启动延迟,适合低频或中小流量场景。