
本文旨在解决使用debezium java嵌入式连接器捕获mysql数据变更时遇到的常见配置问题。重点阐述了`database.include.list`与`table.include.list`的正确用法,并指出`whitelist`属性的弃用。通过优化连接器配置,确保debezium能够准确监控并捕获数据库中的增删改事件,从而实现可靠的变更数据捕获。
Debezium Java嵌入式连接器概述
Debezium是一个开源的分布式平台,用于捕获各种数据库的变更数据(Change Data Capture, CDC)。它通过读取数据库的事务日志(如MySQL的binlog)来实时地捕获数据行级别的变更,并将这些变更事件流式传输到Apache Kafka或其他消息队列。Debezium提供了一个Java嵌入式连接器(Embedded Engine),允许开发者在自己的Java应用程序中直接运行Debezium连接器,无需依赖完整的Kafka Connect集群。
在使用Debezium Java嵌入式连接器时,常见的挑战是正确配置连接器以确保它能够准确地监控并捕获所需数据库或表的数据变更。即使连接成功,如果配置不当,Debezium也可能无法触发相应的事件处理逻辑。
核心配置问题分析
当Debezium连接到MySQL数据库并显示连接成功日志(例如Connected to localhost:3305 at binlog.000008/3443),但却未能捕获到数据变更事件时,问题通常出在连接器的配置上。以下是常见的配置误区:
database.include.list与table.include.list的混淆:
立即学习“Java免费学习笔记(深入)”;
database.include.list属性用于指定Debezium应监控的数据库名称列表,其值应为逗号分隔的数据库名(例如db1,db2)。table.include.list属性则用于指定Debezium应监控的表名称列表,其值应为逗号分隔的完全限定表名(例如db1.table1,db2.table2)。在原始配置中,database.include.list被错误地设置为String.join(“,”, mysql.getTables()),这实际上是将表名列表传递给了数据库列表属性,导致Debezium无法正确识别要监控的数据库。
database.whitelist和table.whitelist属性的弃用:
在Debezium的早期版本中,曾使用database.whitelist和table.whitelist来指定包含列表。然而,这些属性在较新的Debezium版本中已被弃用并移除,取而代之的是database.include.list、database.exclude.list、table.include.list和table.exclude.list。原始配置中同时使用了database.whitelist和table.whitelist,并将其值设置为mysql.*。这不仅使用了已弃用的属性,而且其值也可能与database.include.list的意图冲突,进一步导致配置混乱。
正确的配置实践
为了确保Debezium能够正确捕获MySQL的数据变更,我们需要根据其设计意图来配置database.include.list和table.include.list。
假设我们希望监控名为my_application_db数据库中的user表。
示例代码:优化Debezium连接器配置
以下是DebeziumConfigLoader.load方法的优化版本,展示了如何正确配置Debezium MySQL连接器:
Reclaim.ai
为优先事项创建完美的时间表
90 查看详情
import io.debezium.config.Configuration;import java.util.Properties;public class DebeziumConfigLoader { public static Configuration load(Connection connection) { final MysqlConnection mysql = connection.getMysqlConnection(); // 获取要监控的数据库名称 String databaseToMonitor = mysql.getDbName(); // 获取要监控的表名称列表,例如 "my_application_db.user" // 假设 mysql.getTables() 返回的是 "user" String tablesToMonitor = databaseToMonitor + "." + String.join("," , mysql.getTables()); return Configuration.create() .with("name", "customer-mysql-connector") .with("connector.class", "io.debezium.connector.mysql.MySqlConnector") .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") .with("offset.flush.interval.ms", "60000") .with("database.hostname", mysql.getHost()) .with("database.port", mysql.getPort()) .with("database.user", mysql.getUsername()) .with("database.password", mysql.getPassword()) // 正确配置:指定要监控的数据库名称 .with("database.include.list", databaseToMonitor) // 正确配置:指定要监控的完全限定表名称 .with("table.include.list", tablesToMonitor) .with("include.schema.changes", "false") .with("database.server.id", "10181") // 确保此ID在所有Debezium连接器实例中唯一 .with("database.server.name", "customer-mysql-db-server") .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") .with("database.history.file.filename", "/tmp/dbhistory.dat") .with("offset.storage.file.filename", "/tmp/offsets.dat") // 移除已弃用的 'database.whitelist' 和 'table.whitelist' // .with("database.whitelist", "mysql.*") // 此行应被移除 // .with("table.whitelist", "mysql.*") // 此行应被移除 .build(); }}
关键修改点:
database.include.list: 现在只包含实际的数据库名称(例如my_application_db)。table.include.list: 现在包含完全限定的表名(例如my_application_db.user)。如果mysql.getTables()返回的是不带数据库名的表名,需要手动拼接。移除database.whitelist和table.whitelist: 避免使用已弃用的属性,并消除潜在的配置冲突。
注意事项与最佳实践
Debezium版本兼容性: 始终查阅您所使用的Debezium版本的官方文档,以了解最新的配置属性和任何弃用信息。属性名称和行为可能在不同版本之间发生变化。
MySQL用户权限: 确保Debezium连接的MySQL用户拥有足够的权限来读取binlog和访问被监控的数据库和表。通常需要REPLICATION SLAVE和SELECT权限。
MySQL Binlog配置:
MySQL的log_bin必须开启。binlog_format应设置为ROW,这是Debezium捕获详细变更所必需的。server_id在MySQL服务器配置中必须是唯一的,并且Debezium连接器配置中的database.server.id也必须是唯一的,且与MySQL服务器的server_id不同。
database.server.id的唯一性: database.server.id用于Debezium标识自身在MySQL复制协议中的从属ID。如果运行多个Debezium连接器实例(即使是不同的应用程序),每个实例都必须配置一个唯一的database.server.id,以避免与MySQL复制协议中的其他从库或Debezium实例冲突。
偏移量存储 (offset.storage) 和数据库历史 (database.history):
offset.storage用于持久化Debezium已处理的binlog位置。当应用程序重启时,Debezium可以从上次中断的地方继续捕获事件,避免数据丢失或重复。在嵌入式模式下,FileOffsetBackingStore是一个简单的选择,但生产环境可能考虑使用更健壮的存储,如数据库。database.history用于存储数据库模式(schema)变更的历史。这对于Debezium正确解析变更事件至关重要。FileDatabaseHistory同样适用于简单场景,但生产环境可能需要考虑Kafka或其他持久化存储。确保offset.storage.file.filename和database.history.file.filename指向的路径是可写且持久化的,以防止应用程序重启后丢失状态。
总结
Debezium Java嵌入式连接器为应用程序集成CDC功能提供了强大而灵活的方式。然而,其有效性高度依赖于准确的配置。通过理解database.include.list和table.include.list的正确用法,避免使用已弃用的属性,并关注MySQL的binlog配置和Debezium的持久化机制,开发者可以确保Debezium能够可靠地捕获数据库变更事件,从而构建出响应式和数据驱动的应用程序。在遇到事件未捕获问题时,仔细检查这些配置项是解决问题的关键。
以上就是Debezium Java嵌入式连接器:解决MySQL数据变更捕获配置问题的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1061763.html
微信扫一扫
支付宝扫一扫