将 MySQL 数据同步至 Elasticsearch(ES)或 Redis 是系统中的常见需求。根据业务场景、数据量、一致性要求和实时性要求,可以采用多种策略。
双写机制(增量)
应用在写入 MySQL 的同时,也向 ES 写入一份数据。
优点 :
实现简单,无需额外中间件。
写入延迟低(同步写)。
缺点 :
sequenceDiagram
participant Client as 客户端
participant App as 应用服务
participant MySQL as MySQL
participant ES as Elasticsearch
Client->>App: 发送写请求 (e.g., 更新用户)
App->>MySQL: 1. 执行 SQL 写入/更新
MySQL-->>App: 2. 返回成功
alt MySQL 写入成功
App->>ES: 3. 同步写入 ES 文档
alt ES 写入成功
ES-->>App: 4. 返回成功
App-->>Client: 5. 返回操作成功
else ES 写入失败
Note right of App: 数据不一致! MySQL 已提交,ES 未更新
App-->>Client: 6. 返回部分成功或错误
end
else MySQL 写入失败
App-->>Client: 返回失败(ES 不会写入)
end
适用场景 :数据一致性要求不高、写入量小、业务逻辑简单的场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class DualWriteDemo { private static final String DB_URL = "jdbc:mysql://localhost:3306/test" ; private static final String DB_USER = "root" ; private static final String DB_PASS = "123456" ; public static void main (String[] args) throws Exception { RestClient restClient = RestClient.builder(new HttpHost ("localhost" , 9200 )).build(); ElasticsearchTransport transport = new RestClientTransport (restClient, new JacksonJsonpMapper ()); ElasticsearchClient esClient = new ElasticsearchClient (transport); long userId = 1001L ; String name = "张三" ; try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS)) { String sql = "INSERT INTO users(id, name) VALUES (?, ?) " + "ON DUPLICATE KEY UPDATE name = ?" ; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1 , userId); ps.setString(2 , name); ps.setString(3 , name); ps.executeUpdate(); } } esClient.index(new IndexRequest .Builder<>() .index("users" ) .id(String.valueOf(userId)) .document(java.util.Map.of("id" , userId, "name" , name)) .build() ); esClient._transport().close(); } }
定时轮询(全量/增量)
通过定时任务,定时查询 MySQL(如通过 update_time或 ID 字段),将新增或修改的数据同步到 ES。
优点 :
实现简单,无需 binlog 权限。
适合历史数据初始化。
缺点 :
延迟高 :取决于轮询间隔。
性能压力 :频繁查询可能影响 MySQL。
难以处理删除操作(需软删除标记)。
sequenceDiagram
participant Scheduler as 定时调度器
participant SyncService as 同步服务
participant MySQL as MySQL
participant ES as Elasticsearch
loop 每 N 秒/分钟执行一次
Scheduler->>SyncService: 触发同步任务
SyncService->>MySQL: 1. 查询增量数据 (如 WHERE update_time > '${last_sync_time}')
MySQL-->>SyncService: 2. 返回变更记录列表
alt 存在变更数据
loop 遍历每条记录
SyncService->>ES: 3. 写入/更新 ES 文档
ES-->>SyncService: 4. 确认写入
end
SyncService->>SyncService: 5. 更新 last_sync_time = max(update_time)
else 无新数据
Note over SyncService: 无操作,等待下次轮询
end
end
适用场景 :对实时性要求低、数据变更频率低的小型系统。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class MySqlToEsSyncDemo { private static final String ES_INDEX = "demo" ; private static final String MYSQL_QUERY_TEMPLATE = "SELECT id, name, update_time FROM demo WHERE update_time > ? ORDER BY update_time ASC" ; private DataSource mysqlDataSource; private RestHighLevelClient esClient; public static void main (String[] args) { MySqlToEsSyncDemo sync = new MySqlToEsSyncDemo (); sync.init(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(sync::syncData, 0 , 30 , TimeUnit.SECONDS); } private void init () { mysqlDataSource = createMySqlDataSource(); esClient = createEsClient(); } private void syncData () { try { Instant lastSyncTime = readLastSyncTime(); List<Record> records = queryFromMysql(lastSyncTime); if (records.isEmpty()) { return ; } BulkRequest bulkRequest = new BulkRequest (); for (Record record : records) { Map<String, Object> source = new HashMap <>(); source.put("id" , record.id); source.put("name" , record.name); source.put("update_time" , record.updateTime); bulkRequest.add(new IndexRequest (ES_INDEX) .id(String.valueOf(record.id)) .source(source)); } BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT); if (!bulkResponse.hasFailures()) { Instant newSyncTime = records.get(records.size() - 1 ).updateTime; writeLastSyncTime(newSyncTime); } else { log.error("Bulk write to ES failed" ); } } catch (Exception e) { log.error("Sync failed" , e); } } private Instant readLastSyncTime () { } private void writeLastSyncTime (Instant time) { } private List<Record> queryFromMysql (Instant sinceTime) { } }
消息队列(全量/增量)
应用在写 MySQL 后,发送消息到 Kafka/RabbitMQ,由消费者负责写入 ES。
优点 :
缺点 :
sequenceDiagram
participant Client as 客户端
participant App as 应用服务
participant MQ as 消息队列
participant SyncConsumer as ES 同步消费者
participant MySQL as MySQL
participant ES as Elasticsearch
Client->>App: 发送写请求 (e.g., 创建/更新用户)
%% 应用先写数据库(关键:DB 写入必须在发消息前完成)
App->>MySQL: 1. 执行 SQL 写入/更新
MySQL-->>App: 2. 返回成功
alt MySQL 写入成功
App->>MQ: 3. 发送变更消息 ({id:1001, name:"张三", op:"upsert"})
MQ-->>App: 4. 消息确认 (ACK)
App-->>Client: 5. 返回操作成功
%% 异步消费
activate SyncConsumer
MQ->>SyncConsumer: 6. 推送/拉取消息
SyncConsumer->>ES: 7. 写入/更新 ES 文档
alt ES 写入成功
ES-->>SyncConsumer: 8a. 确认
SyncConsumer->>MQ: 9a. 提交偏移量 (Commit Offset)
else ES 写入失败
Note right of SyncConsumer: 重试或进入死信队列
loop 重试 (带退避)
SyncConsumer->>ES: 重试写入
end
%% 可选:记录失败日志供人工干预
end
deactivate SyncConsumer
else MySQL 写入失败
App-->>Client: 返回失败(不发消息)
end
Producer:业务服务写 DB成功好发送 RocketMQ
全量同步时,发送所有数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Service public class EsSyncProducer { @Autowired private UserMapper userMapper; @Autowired private RocketMQTemplate rocketMQTemplate; @Transactional public void createUser (String name, String email) { User user = new User (); user.setName(name); user.setEmail(email); user.setCreateTime(LocalDateTime.now()); userMapper.insert(user); sendSyncEventToMQ(user, "INSERT" ); } private void sendSyncEventToMQ (User user, String eventType) { applicationEventPublisher.publishEvent( new UserSavedEvent (this , user, true ) ); } @EventListener(condition = "#event.success") @Async public void handleUserSaved (UserSavedEvent event) { SyncEvent syncEvent = new SyncEvent (); syncEvent.setEventType("INSERT" ); syncEvent.setTable("user" ); syncEvent.setId(event.getUser().getId()); syncEvent.setData(Map.of( "id" , event.getUser().getId(), "name" , event.getUser().getName(), "email" , event.getUser().getEmail() )); rocketMQTemplate.convertAndSend("MYSQL_TO_ES_TOPIC" , syncEvent); } }
Consumer:消费 RocketMQ 消息并同步到 Elasticsearch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Component @RocketMQMessageListener( topic = "MYSQL_TO_ES_TOPIC", consumerGroup = "es-sync-group" ) public class EsSyncConsumer implements RocketMQListener <SyncEvent> { @Autowired private RestHighLevelClient esClient; @Override public void onMessage (SyncEvent event) { try { String index = event.getTable(); String docId = String.valueOf(event.getId()); Map<String, Object> doc = event.getData(); if ("DELETE" .equals(event.getEventType())) { esClient.delete( new DeleteRequest (index).id(docId), RequestOptions.DEFAULT ); } else { esClient.index( new IndexRequest (index) .id(docId) .source(doc, XContentType.JSON), RequestOptions.DEFAULT ); } } catch (Exception e) { log.error("ES sync failed, event: {}, error: {}" , event, e.getMessage()); throw new RuntimeException (e); } } }
三方工具(生产)
基于 Binlog 工具
通过解析 MySQL 的 binlog(如使用 Canal、Debezium、Maxwell ),捕获数据变更事件,再推送到 ES。
流程 :MySQL → Binlog → Canal/Debezium → Kafka → 消费者 → ES
优点 :
解耦 :业务无感知,不侵入应用代码。
可靠性高 :基于 binlog,能保证数据变更不丢失(配合 ACK 机制)。
支持增量同步 。
缺点 :
sequenceDiagram
participant MySQL as MySQL
participant CanalServer as Canal Server
participant CanalClient as Canal Client (Java 应用)
participant ES as Elasticsearch
Note over MySQL: 开启 binlog binlog-format=ROW
%% 初始化:Canal Server 连接 MySQL
CanalServer->>MySQL: 1. 模拟 MySQL Slave 请求 binlog dump
MySQL-->>CanalServer: 2. 允许连接,发送 binlog 流
%% 数据变更触发
Note right of MySQL: 用户执行: UPDATE users SET name='李四' WHERE id=1001;
MySQL->>MySQL: 3. 写入 binlog (Row 格式)
MySQL->>CanalServer: 4. 实时推送 binlog 事件
CanalServer->>CanalClient: 5. 推送解析后的结构化变更事件 (如 JSON 或 Protocol Buffer)
activate CanalClient
CanalClient->>CanalClient: 6. 解析事件: table=test.users, op=UPDATE, after={id:1001, name:"李四"}
alt 是目标表(users)
CanalClient->>ES: 7. 构造 ES 文档并写入 PUT /users/_doc/1001 { "name": "李四" }
ES-->>CanalClient: 8. 写入成功响应
CanalClient->>CanalServer: 9. 确认消费位点 (ACK)
else 非目标表
CanalClient->>CanalServer: 忽略并 ACK
end
deactivate CanalClient
Note over CanalClient: 持续监听,支持 INSERT/UPDATE/DELETE
常见工具 :
适用场景 :高可靠、准实时同步,数据量大,需解耦业务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class CanalSyncDemo { public static void main (String[] args) throws Exception { CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress ("127.0.0.1" , 11111 ), "example" , "" , "" ); connector.connect(); connector.subscribe("test\\.users" ); ElasticsearchClient esClient = createEsClient(); while (true ) { Message message = connector.get(1000 ); handleEntries(message.getEntries(), esClient); } } private static void handleEntries (List<CanalEntry.Entry> entries, ElasticsearchClient esClient) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue ; } try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); handleRowChange(rowChange, esClient); } catch (InvalidProtocolBufferException e) { log.error("Failed to parse RowChange: " + e.getMessage()); } } } private static void handleRowChange (CanalEntry.RowChange rowChange, ElasticsearchClient esClient) { CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) { return ; } for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { extractAndSyncToEs(rowData, esClient); } } private static void extractAndSyncToEs (CanalEntry.RowData rowData, ElasticsearchClient esClient) { Map<String, String> afterMap = rowData.getAfterColumnsList().stream() .filter(CanalEntry.Column::getUpdated) .collect(Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue, (v1, v2) -> v1 )); if (afterMap.isEmpty()) { return ; } String id = afterMap.get("id" ); String name = afterMap.get("name" ); if (id == null || name == null ) { return ; } try { esClient.index(b -> b .index("users" ) .id(id) .document(Map.of("id" , Long.parseLong(id), "name" , name)) ); } catch (Exception e) { log.error("Failed to sync to ES: id=" + id + ", error=" + e.getMessage()); } } }
Logstash 插件
Logstash 通过 jdbc input 插件定期查询 MySQL,同步到 ES。
优点 :
配置简单,Elastic 官方支持。
支持 SQL 定制查询。
缺点 :
本质仍是轮询,无法做到实时。
不支持监听 binlog。
sequenceDiagram
participant Scheduler as Logstash 调度器
participant JDBCInput as JDBC Input 插件
participant MySQL as MySQL
participant Filter as (可选) Filter 插件
participant ESOutput as Elasticsearch Output 插件
participant ES as Elasticsearch
loop 按 schedule 配置周期执行 (e.g., 每30秒)
Scheduler->>JDBCInput: 触发下一次轮询
JDBCInput->>MySQL: 1. 执行预定义 SQL 查询 (含增量条件,如 update_time > :sql_last_value)
MySQL-->>JDBCInput: 2. 返回结果集 (ResultSet)
alt 有新数据返回
JDBCInput->>Filter: 3. (可选) 字段清洗/转换
Filter-->>ESOutput: 处理后的事件 (Event)
ESOutput->>ES: 4. 批量写入 ES (使用 document_id 映射主键)
ES-->>ESOutput: 5. 写入确认
ESOutput->>JDBCInput: 6. 更新 last_run_metadata (记录本次最大 update_time)
else 无新数据
Note over JDBCInput: 无事件生成,跳过输出
end
end
适用场景 :中小型项目,允许分钟级延迟。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 input { jdbc { jdbc_driver_library => "/path /mysql-connector-java-8 .0 .33 .jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306 /test" jdbc_user => "logstash" jdbc_password => "logstash" # 增量同步关键:使用时间戳字段 schedule => "*/30 * * * *" # 每30 秒执行 statement => "SELECT id, name, update_time FROM users WHERE update_time > :sql_last_value" use_column_value => true tracking_column => "update_time" tracking_column_type => "timestamp" # 记录上次同步时间的文件 last_run_metadata_path => "/etc/logstash/.users_last_run" } } filter { # 可选:重命名字段、添加标签等 mutate { copy => { "id" => "[@metadata][_id]" } } } output { elasticsearch { hosts => ["http://localhost:9200 "] index => "users" document_id => "%{[@metadata][_id]}" # 确保 ES 文档 ID 与 MySQL 主键一致 } }
基于 Flink 的 CDC 连接器(如 flink-cdc-connectors),直接读取 MySQL binlog,进行流式 ETL 后写入 ES。
MySQL → Flink CDC Source → Flink Job(Transform) → Flink ES Sink → Elasticsearch
优势 :
sequenceDiagram
participant MySQL as MySQL
participant FlinkJob as Flink Job (含 CDC Source)
participant FlinkEngine as Flink Runtime Engine
participant ESSink as Elasticsearch Sink
participant ES as Elasticsearch
Note over MySQL: 开启 binlog binlog-format=ROW
%% 初始化:Flink CDC Source 连接 MySQL
FlinkJob->>MySQL: 1. 启动时读取表结构 + 当前 binlog 位点
MySQL-->>FlinkJob: 2. 返回 schema 和初始快照(可选)
alt 全量 + 增量模式(默认)
FlinkJob->>MySQL: 3a. 先 SELECT 全量数据(快照阶段)
MySQL-->>FlinkJob: 3b. 返回全量结果集
FlinkJob->>ESSink: 4a. 写入 ES(全量初始化)
end
%% 持续监听 binlog
Note right of MySQL: 用户执行: INSERT INTO users VALUES(1002, '王五');
MySQL->>MySQL: 5. 写入 binlog (Row 格式)
MySQL->>FlinkJob: 6. Flink CDC Source 实时拉取 binlog 事件
FlinkJob->>FlinkEngine: 7. 转换为 DataStream
activate FlinkEngine
FlinkEngine->>FlinkEngine: 8. (可选) 流式 ETL: 字段过滤、类型转换、脱敏等
FlinkEngine->>ESSink: 9. 发送记录到 ES Sink
ESSink->>ES: 10. 批量/逐条写入 ES (使用主键作为 document_id)
ES-->>ESSink: 11. 写入确认
ESSink-->>FlinkEngine: 12. 触发 checkpoint ACK
FlinkEngine->>FlinkJob: 13. 持久化 binlog 位点(Exactly-Once 保障)
deactivate FlinkEngine
Note over FlinkJob: 持续处理 INSERT/UPDATE/DELETE DELETE → 调用 ES delete API
适用场景 :需要实时计算 + 同步的复杂数据管道。
组件
作用
是否需手动安装
flink-dist-*.jar
Flink 核心运行时
无需,自带
flink-table-api-java-*.jar
Table API 基础类
无需,自带(SQL 依赖 Table API)
flink-table-planner_*.jar
SQL 解析器、优化器、执行器(Blink Planner)
无需,自带
flink-table-runtime-*.jar
运行时函数、类型系统
无需,自带
flink-json-*.jar
JSON 格式支持(如 JSON_OBJECT)
无需,自带
MySQL / JDBC
flink-connector-jdbc-*.jar
需手动下载并放入 lib/ 目录
MySQL / CDC
flink-sql-connector-mysql-cdc-*.jar
需手动下载并放入 lib/ 目录
Elasticsearch 7
flink-sql-connector-elasticsearch7-*.jar
需手动下载并放入 lib/ 目录
MySQL 驱动
mysql-connector-j-8.0.33.jar(或 9.x)
需手动下载并放入 lib/ 目录
Kafka
flink-sql-connector-kafka-*.jar
非必须
示例:从 MySQL 到 Elasticsearch 的同步,将 orders 和 order_items 表关联后写入 ES。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static class Order { public long id; public String orderNo; public String status; public long userId; public List<OrderItem> items; public Order (long id, String orderNo, String status, long userId, List<OrderItem> items) { this .id = id; this .orderNo = orderNo; this .status = status; this .userId = userId; this .items = items; } } public static class OrderItem { public long id; public long orderId; public String productName; public double unitPrice; public OrderItem (long id, long orderId, String productName, double unitPrice) { this .id = id; this .orderId = orderId; this .productName = productName; this .unitPrice = unitPrice; } }
Flink SQL 实现数据同步(如 MySQL → Elasticsearch)主要有两种主流方式:
基于 JDBC 的批量/轮询同步(Polling-based)
基于 CDC(Change Data Capture)的实时增量同步
对于这种标准的关联查询,SQL 是最简洁、最高效的方式。我用的 ES 版本 为 7.17.29
对比项
JDBC 轮询
CDC
数据捕获层
SQL 查询(应用层)
数据库日志(存储引擎层)
变更可见性
只能看到最终状态
看到完整变更过程(I/U/D)
DELETE 支持
不支持(除非软删除)
原生支持
UPDATE 识别
需时间戳字段
自动识别 before/after 值
对源库影响
高(查询压力)
低(只读 binlog)
首次同步
全量查询
全量快照 + 增量日志
Exactly-Once
难实现
原生支持(配合 checkpoint)
需提前下载(Apache Archive Distribution Directory ):
百度网盘分享的文件:Flink: https://pan.baidu.com/s/1q_W6tcfnfapHs9Ps37M7hA?pwd=wdv4 提取码: wdv4
解压 Flink 后,将所有 JAR 都放入 flink-1.20.3/lib/ 目录下
注意:该版本(1.20.3)不支持 Pipeline,如果你需要自动路由可能得需要其他方式或升级最新版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ./bin/start-cluster.sh curl http://localhost:8081 flink run -d \ -p 4 \ -sql "orders_to_es.sql" \ --jarfile flink-sql-connector-elasticsearch7-3.1.0-1.20.jar \ --jarfile flink-connector-jdbc-3.3.0-1.20.jar \ --jarfile mysql-connector-j-8.4.0.jar ./bin/sql-client.sh -f orders_to_es.sql curl -XGET "http://localhost:9200/orders/_search?pretty"
JDBC 批量/轮询
无法捕获 DELETE 操作 (除非有软删除字段如 is_deleted)
Flink 通过 JDBC 连接器 ,周期性地全表扫描或按条件查询 源表(如 SELECT * FROM orders WHERE update_time > last_max_time)。
数据被视为静态快照(bounded stream)或周期性 append-only 流 。
通常运行在 Batch 模式或 Streaming 模式下的有限流 。
orders_to_es.sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 CREATE TEMPORARY TABLE `orders`( `id` BIGINT , `order_no` STRING, `status` STRING, `user_id` BIGINT , PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc' , 'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC' , 'table-name' = 'orders' , 'username' = 'root' , 'password' = '123456' , 'scan.fetch-size' = '1000' , 'scan.partition.column' = 'id' , 'scan.partition.num' = '2' , 'scan.partition.lower-bound' = '1' , 'scan.partition.upper-bound' = '10000' ); CREATE TEMPORARY TABLE `order_item`( `id` BIGINT , `order_id` BIGINT , `product_name` STRING, `unit_price` DECIMAL , PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc' , 'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC' , 'table-name' = 'order_item' , 'username' = 'root' , 'password' = '123456' , 'scan.fetch-size' = '1000' ); CREATE TEMPORARY TABLE `es_orders` ( `id` BIGINT , `orderNo` STRING, `status` STRING, `userId` BIGINT , `orderItems` ARRAY < ROW < `id` BIGINT , `productName` STRING, `unitPrice` DOUBLE >> , PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7' , 'hosts' = 'http://192.168.3.146:9200' , 'index' = 'orders' , 'sink.bulk-flush.max-actions' = '1000' , 'sink.bulk-flush.interval' = '10s' ); INSERT INTO `es_orders`SELECT o.id, o.order_no AS orderNo, o.status, o.user_id AS userId, ARRAY_AGG ( ROW ( i.id, i.product_name, CAST (i.unit_price AS DOUBLE ) ) ) AS orderItems FROM orders AS o LEFT JOIN order_item AS i ON o.id = i.order_id GROUP BY o.id, o.order_no, o.status, o.user_id;
CDC 全量+增量
支持 全量 + 增量自动切换 (Flink CDC 2.0+)。
Flink 通过 CDC 连接器 (如 Debezium)读取数据库的事务日志 (MySQL 的 binlog、PostgreSQL 的 WAL)。
直接解析 INSERT / UPDATE / DELETE 的原始变更事件(RowKind:+I, -U, +U, -D)。
数据以 changelog stream 形式流入 Flink,天然支持精确一次语义 。
‘scan.incremental.snapshot.enabled’ = ‘true’ – 启用全量+增量
orders_to_es.sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 CREATE TEMPORARY TABLE `orders`( `id` BIGINT , `order_no` STRING, `status` STRING, `user_id` BIGINT , PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc' , 'hostname' = '192.168.3.146' , 'port' = '3306' , 'username' = 'root' , 'password' = '123456' , 'database-name' = 'demo_order' , 'table-name' = 'orders' , 'server-time-zone' = 'Asia/Shanghai' ); CREATE TEMPORARY TABLE `order_item`( `id` BIGINT , `order_id` BIGINT , `product_name` STRING, `unit_price` DECIMAL , PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc' , 'hostname' = '192.168.3.146' , 'port' = '3306' , 'username' = 'root' , 'password' = '123456' , 'database-name' = 'demo_order' , 'table-name' = 'order_item' , 'server-time-zone' = 'Asia/Shanghai' ); CREATE TEMPORARY TABLE `es_orders` ( `id` BIGINT , `orderNo` STRING, `status` STRING, `userId` BIGINT , `orderItems` ARRAY < ROW < `id` BIGINT , `productName` STRING, `unitPrice` DOUBLE >> , PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7' , 'hosts' = 'http://192.168.3.146:9200' , 'index' = 'orders' , 'document-id.key-delimiter' = '$' , 'sink.bulk-flush.max-actions' = '1000' , 'sink.bulk-flush.interval' = '1000' , 'format' = 'json' ); INSERT INTO `es_orders`SELECT o.id, o.order_no AS orderNo, o.status, o.user_id AS userId, ARRAY_AGG ( ROW ( i.id, i.product_name, CAST (i.unit_price AS DOUBLE ) ) ) AS orderItems FROM orders AS o LEFT JOIN order_item AS i ON o.id = i.order_id GROUP BY o.id, o.order_no, o.status, o.user_id;
常见异常
对于版本不兼容或不支持(Flink 1.19 及之前不支持标准 SQL 的 ARRAY_AGG ),可能出错:
如果报错:[ERROR] Could not execute SQL statement. Reason:java.net.ConnectException: 拒绝连接
先查看 flink/log 下的日志详细信息 vim log/flink-root-sql-client-主机名.log。
如果出错,拒绝连接:localhost/127.0.0.1:8081,Flink SQL Client(sql-client.sh )默认会连接 本地 localhost:8081 的 JobManager,这是因为你 只解压了 Flink,没有启动集群 导致的,解决方法:
1 2 3 ./bin/start-cluster.sh
如果是mysql 无法连接,使用以下步骤排查:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 ping 192.168.3.146 mysql -h 192.168.3.146 -u root -p telnet 192.168.3.146 3306 SHOW VARIABLES LIKE 'bind_address' ; SELECT host, user FROM mysql.user WHERE user = 'root' ; 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://192.168.3.146:3306/demo_order?useSSL=false&serverTimezone=UTC'
如果JDBC数据同步失败,查看日志:vim log/flink-root-taskexecutor-0-主机名.log,详细出错信息
如果CDC数据同步失败,查看日志:vim log/flink-root-standalonesession-0-主机名.log,报错:Failed to create Source Enumerator for source Source: order_item[3] java.lang.NullPointerException: null,说明Mysql连接的配置有误,可以删除多余的。
如果出错:[ERROR] Could not execute SQL statement. Reason:java.io.StreamCorruptedException: unexpected block data,可能原因:
Flink Pipeline Config
Pipeline Config 适合简单同步 ,只需要单表同步,无需关联处理的场景。对复杂关联支持有限
YAML 配置文件:mysql-to-es-pipeline.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 source: type: mysql hostname: localhost port: 3306 username: root password: 123456 database: test_db tables: test_db.orders server-id: 5400 -5404 server-time-zone: Asia/Shanghai sink: type: elasticsearch hosts: http://localhost:9200 index: orders username: elastic password: elastic document-type: _doc bulk-flush: max-actions: 1000 interval: 1s pipeline: name: MySQL Orders to ES parallelism: 2
运行脚本:run-pipeline.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #!/bin/bash ./bin/flink run \ -d \ -p 2 \ -c com.ververica.cdc.pipeline.cli.PipelineCLI \ flink-cdc-pipeline-connectors-3.0.0.jar \ --config mysql-to-es-pipeline.yaml ./bin/flink list
Flink Table API
Table API 适合中等复杂度 ,需要在 SQL 和 DataStream 之间平衡的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public class MySQLToESWithTableAPI { public static void main (String[] args) { EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.executeSql( "CREATE TABLE mysql_orders (\n" + " id BIGINT,\n" + " order_no STRING,\n" + " status STRING,\n" + " user_id BIGINT,\n" + " PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'test_db',\n" + " 'table-name' = 'orders'\n" + ")" ); tableEnv.executeSql( "CREATE TABLE mysql_order_items (\n" + " id BIGINT,\n" + " order_id BIGINT,\n" + " product_name STRING,\n" + " unit_price DECIMAL(10, 2),\n" + " PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'test_db',\n" + " 'table-name' = 'order_items'\n" + ")" ); tableEnv.executeSql( "CREATE TABLE es_orders (\n" + " id BIGINT,\n" + " orderNo STRING,\n" + " status STRING,\n" + " userId BIGINT,\n" + " orderItems ARRAY<ROW<\n" + " id BIGINT,\n" + " productName STRING,\n" + " unitPrice DOUBLE\n" + " >>,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'elasticsearch-7',\n" + " 'hosts' = 'http://localhost:9200',\n" + " 'index' = 'orders',\n" + " 'document-id.key-delimiter' = '$',\n" + " 'format' = 'json'\n" + ")" ); Table orders = tableEnv.from("mysql_orders" ); Table orderItems = tableEnv.from("mysql_order_items" ); Table result = orders .leftOuterJoin(orderItems, $("id" ).isEqual($("order_id" ))) .groupBy($("id" ), $("order_no" ), $("status" ), $("user_id" )) .select( $("id" ), $("order_no" ).as("orderNo" ), $("status" ), $("user_id" ).as("userId" ), collect( row( orderItems.$("id" ), orderItems.$("product_name" ).as("productName" ), orderItems.$("unit_price" ).cast(DataTypes.DOUBLE()).as("unitPrice" ) ) ).as("orderItems" ) ); tableEnv.createTemporaryView("result_view" , result); tableEnv.executeSql("INSERT INTO es_orders SELECT * FROM result_view" ); } }
Flink DataStream API
这是最灵活的方式,需要复杂处理时用 DataStream API :如果有特殊的业务逻辑(如风控、复杂状态)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 public class MySQLToESWithDataStream { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000 ); MySqlSource<String> ordersSource = MySqlSource.<String>builder() .hostname("localhost" ) .port(3306 ) .databaseList("test_db" ) .tableList("test_db.orders" ) .username("root" ) .password("123456" ) .deserializer(new JsonDebeziumDeserializationSchema ()) .build(); MySqlSource<String> itemsSource = MySqlSource.<String>builder() .hostname("localhost" ) .port(3306 ) .databaseList("test_db" ) .tableList("test_db.order_items" ) .username("root" ) .password("123456" ) .deserializer(new JsonDebeziumDeserializationSchema ()) .build(); DataStream<String> ordersStream = env.fromSource( ordersSource, WatermarkStrategy.noWatermarks(), "MySQL Orders Source" ); DataStream<String> itemsStream = env.fromSource( itemsSource, WatermarkStrategy.noWatermarks(), "MySQL Order Items Source" ); DataStream<Order> orders = ordersStream .map(new MapFunction <String, Order>() { private final ObjectMapper mapper = new ObjectMapper (); @Override public Order map (String value) throws Exception { ObjectNode node = mapper.readValue(value, ObjectNode.class); ObjectNode after = (ObjectNode) node.get("after" ); if (after != null ) { long id = after.get("id" ).asLong(); String orderNo = after.get("order_no" ).asText(); String status = after.get("status" ).asText(); long userId = after.get("user_id" ).asLong(); return new Order (id, orderNo, status, userId, new ArrayList <>()); } return null ; } }) .filter(order -> order != null ); DataStream<OrderItem> items = itemsStream .map(new MapFunction <String, OrderItem>() { private final ObjectMapper mapper = new ObjectMapper (); @Override public OrderItem map (String value) throws Exception { ObjectNode node = mapper.readValue(value, ObjectNode.class); ObjectNode after = (ObjectNode) node.get("after" ); if (after != null ) { long id = after.get("id" ).asLong(); long orderId = after.get("order_id" ).asLong(); String productName = after.get("product_name" ).asText(); double unitPrice = after.get("unit_price" ).asDouble(); return new OrderItem (id, orderId, productName, unitPrice); } return null ; } }) .filter(item -> item != null ); List<HttpHost> httpHosts = Arrays.asList( new HttpHost ("localhost" , 9200 , "http" ) ); ElasticsearchSink.Builder<Order> esSinkBuilder = new ElasticsearchSink .Builder<>( httpHosts, (Order order, RuntimeContext ctx, RequestIndexer indexer) -> { ObjectMapper mapper = new ObjectMapper (); ObjectNode json = mapper.createObjectNode(); json.put("id" , order.id); json.put("orderNo" , order.orderNo); json.put("status" , order.status); json.put("userId" , order.userId); List<ObjectNode> itemList = new ArrayList <>(); for (OrderItem item : order.items) { ObjectNode itemJson = mapper.createObjectNode(); itemJson.put("id" , item.id); itemJson.put("productName" , item.productName); itemJson.put("unitPrice" , item.unitPrice); itemList.add(itemJson); } json.set("orderItems" , mapper.valueToTree(itemList)); IndexRequest indexRequest = new IndexRequest ("orders" ) .id(String.valueOf(order.id)) .source(mapper.writeValueAsString(json), XContentType.JSON); indexer.add(indexRequest); } ); esSinkBuilder.setBulkFlushMaxActions(1000 ); esSinkBuilder.setBulkFlushInterval(1000L ); esSinkBuilder.setBulkFlushBackoff(true ); orders.addSink(esSinkBuilder.build()); env.execute("MySQL to ES DataStream Job" ); } }
Apache NiFi
可视化数据流引擎,通过 CaptureChangeMySQL 或 QueryDatabaseTable 处理器捕获变更,输出到 ES。
优势 :拖拽式 UI,支持复杂路由、重试、背压控制。
缺点 :资源消耗较大,学习曲线较陡。
其他工具
社区有一些基于 Go/Python 的轻量 binlog 同步工具(如 go-mysql-elasticsearch,Go-MySQL-ES),直接监听 binlog 并写 ES。
适合对轻量级、可控性要求高的场景,但需自行维护。
云原生
AWS
DMS(Database Migration Service) + OpenSearch(ES 托管版)
阿里云
DTS(Data Transmission Service) + Elasticsearch
Serverless 方案
优点 :按需计费,无服务器运维。
缺点 :冷启动延迟,适合低频或中小流量场景。