Canal

简介

https://github.com/alibaba/canal/wiki

https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B

架构

img

**canal [kə’næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

搭建

准备

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

给表增加权限

1
2
3
GRANT INSERT ON canal_node_server TO 'canal'@'%';
GRANT INSERT ON canal_cluster TO 'canal'@'%';
GRANT INSERT ON canal_config TO 'canal'@'%';

查看同步配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)

mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)

admin

先安装 admin 更好

https://github.com/alibaba/canal/releases 下载最新 admin

解压缩

1
2
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin

配置修改

1
vi conf/application.yml

一般来说,默认即可,或者修改 MySQL 对应配置,例如 MySQL 8.x 版本增加 &allowPublicKeyRetrieval=true

MySQL 8.0 的使用方式:https://help.aliyun.com/zh/es/use-cases/use-canal-to-synchronize-mysql-data-to-alibaba-cloud-elasticsearch

1
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true

初始化元数据库

1
2
3
4
mysql -h127.1 -uroot -p

# 导入初始化SQL
> source conf/canal_manager.sql

初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

启动

1
sh bin/startup.sh

查看 server 日志

1
vi logs/canal/canal.log</pre>

如果启动后没有日志,且进程没有启动,可修改 startup.sh 脚本

1
2
3
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.admin.CanalAdminApplication 1>>/dev/null 2>&1 &
# 改为
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.admin.CanalAdminApplication

可以直接看到打印后的日志

例如可能会出现 AggressiveOpts 参数在高 Java 版本中失效,需要注释

1
2
3
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"
# 改为
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"

启动后,可以通过 :8089 打开管理界面

Server 端

release 页面下载对应版本包

解压缩

1
2
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal

配置修改

1
vi conf/example/instance.properties

一般使用默认即可

启动,因为需要通过 admin 进行管理,携带 local 参数即可增加 admin 的配置

1
sh bin/startup.sh local

查看 server 日志

1
vi logs/canal/canal.log</pre>

日志中有正常输出代表服务启动正常

查看 instance 的日志

1
vi logs/example/example.log

Adapter

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器
  • 同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能
  • (后续支持) ElasticSearch多表数据同步,ETL功能

client-adapter 分为适配器和启动器两部分, 适配器为多个 fat jar, 每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载, 目前所有支持的适配器都放置在 plugin 目录下

启动器为 SpringBoot 项目, 支持canal-client启动的同时提供相关REST管理接口, 运行目录结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- bin
restart.sh
startup.bat
startup.sh
stop.sh
- lib
...
- plugin
client-adapter.logger-1.1.1-jar-with-dependencies.jar
client-adapter.hbase-1.1.1-jar-with-dependencies.jar
...
- conf
application.yml
- hbase
mytest_person2.yml
- logs

以上目录结构最终会打包成 canal-adapter-*.tar.gz 压缩包

修改bootstrap.yml配置

1
2
3
4
5
6
canal:
manager:
jdbc:
url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
username: canal
password: canal

可以将本地application.yml文件和其他子配置文件删除或清空, 启动工程将自动从远程加载配置

修改mysql中的配置信息后会自动刷新到本地动态加载相应的实例或者应用

修改conf/application.yml为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
server:
port: 8081
logging:
level:
com.alibaba.otter.canal.client.adapter.hbase: DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: root

canal.conf:
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
mode: tcp
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger

将 adapter 使用 logger,直接以日志的方式打印出来

1
2
3
4
5
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: root

代表数据源,mytest 为对应数据库

创建测试表

1
2
3
4
5
CREATE TABLE canal_test (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`content` VARCHAR(10) NOT NULL COMMENT '内容',
PRIMARY KEY (`id`) USING BTREE
)

查看适配器日志

1
2
3
4
5
6
7
8
9
tail -f logs/adapter/adapter.log

# 新增数据
2024-04-07 21:38:23.913 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":2,"content":"test5"}],"database":"mytest","destination":"example","es":1712496311000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"canal_test","ts":1712497103761,"type":"INSERT"}

2024-04-07 21:38:50.545 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":3,"content":"test5"}],"database":"mytest","destination":"example","es":1712497130000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"canal_test","ts":1712497130545,"type":"INSERT"}

# 更新数据
2024-04-07 21:41:34.702 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"content":"test110"}],"database":"mytest","destination":"example","es":1712497294000,"groupId":"g1","isDdl":false,"old":[{"content":"test5"}],"pkNames":["id"],"sql":"","table":"canal_test","ts":1712497294702,"type":"UPDATE"}