帮助中心/最新通知

质量为本、客户为根、勇于拼搏、务实创新

< 返回文章列表

【服务器相关】Canal监听MySQL的实现步骤

发表时间:2025-06-16 03:46:00 小编:主机乐-Yutio

1、Mysql数据库开启binlog模式

注意:Mysql容器,此处Mysql版本为5.7

修改instance.properties,配置数据库连接地址:

这里的canal.instance.filter.regex有多种配置,如下:

可以参考地址如下: https://github.com/alibaba/canal/wiki/AdminGuide

mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 

常见例子:

1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

配置完成后,设置开机启动,并记得重启canal。

  • 首先解压spring-boot-starter-canal-master.zip
  • 在spring-boot-starter-canal-master\starter-canal文件夹下执行mvn clean install
  • 此时在本地仓库就会存在jar包
  • 引入依赖

<!–canal依赖–>
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

(2)启动类编写


@SpringBootApplication
@EnableCanalClient
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}

(3)监听器编写


@CanalEventListener
public class CanalDataEventListener {


@InsertListenPoint
public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
rowData.getAfterColumnsList().forEach((c) -> System.out.println(“By–Annotation: ” + c.getName() + ” :: ” + c.getValue()));
}


@UpdateListenPoint
public void onEventUpdate(CanalEntry.RowData rowData) {
System.out.println(“UpdateListenPoint”);
rowData.getAfterColumnsList().forEach((c) -> System.out.println(“By–Annotation: ” + c.getName() + ” :: ” + c.getValue()));
}


@DeleteListenPoint
public void onEventDelete(CanalEntry.EventType eventType) {
System.out.println(“DeleteListenPoint”);
}


@ListenPoint(destination = “example”, schema = “torlesse_test”, table = {“tb_user”, “tb_order”}, eventType = CanalEntry.EventType.UPDATE)
public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println(“DeleteListenPoint”);
rowData.getAfterColumnsList().forEach((c) -> System.out.println(“By–Annotation: ” + c.getName() + ” :: ” + c.getValue()));
}

@ListenPoint(destination = “example”,
schema = “test_canal”, //所要监听的数据库名
table = {“tb_user”}, //所要监听的数据库表名
eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
public void onEventCustomUpdateForTbUser(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
getChangeValue(eventType,rowData);
}

public static void getChangeValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
if(eventType == CanalEntry.EventType.DELETE){
rowData.getBeforeColumnsList().forEach(column -> {
//获取删除前的数据
System.out.println(column.getName() + ” == ” + column.getValue());
});
}else {
rowData.getBeforeColumnsList().forEach(column -> {
//打印改变前的字段名和值
System.out.println(column.getName() + ” == ” + column.getValue());
});

rowData.getAfterColumnsList().forEach(column -> {
//打印改变后的字段名和值
System.out.println(column.getName() + ” == ” + column.getValue());
});
}
}
}

到此就可以实现Canal监听Mysql

项目gitee地址:test-canal

详情可以查看:

https://github.com/alibaba/canal (阿里官方)

https://github.com/alibaba/canal/wiki/AdminGuide (阿里官方)

https://github.com/chenqian56131/spring-boot-starter-canal (自制starter)

到此这篇关于Canal监听MySQL的实现步骤的文章就介绍到这了,更多相关Canal监听MySQL内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


联系我们
返回顶部