canal实现mysql与redis数据同步
一、canal
1.简介
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
2.工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
二、部署
1.mysql
a) docker安装mysql
docker run -p 3306:3306 --name mysql --privileged --restart=always -v /home/mysql/conf:/etc/mysql/conf.d -v /home/mysql/logs:/var/log -v /home/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --lower_case_table_names=1
b) 开启binlog记录功能,修改/home/mysql/conf/mysqld.cnf,在[mysqld]下加入
server-id=1 #主从复制时主机的服务id
log-bin=mysql-bin #开启nbinlog日志记录
binlog_format = ROW #日志记录格式
c) 验证是否开启binlog
docker restart mysql #重启mymsql
docekr exec -it mysql bash #进入mysql容器
mysql -uroot -p123456 #登录mysql
show variables like 'log_bin'; #查询binlong是否开启
show variables like 'binlog_format'; #查询binlong日志记录格式
d) 创建数据同步账号
use mysql;
grant all privileges on *.* to [email protected]'%' identified by 'canal';
flush privileges;
2.redis
a) docker安装redis
docker run -p 6379:6379 --name redis --privileged --restart=always -v /home/redis/conf/redis.conf:/etc/redis/redis.conf -v /home/redis/data:/data -d redis redis-server --appendonly yes
3.canal
a) docker安装canal
docker run -p 11111:11111 --name canal -v /home/canal/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/logs:/home/admin/canal-server/logs -d canal/canal-server
b) 修改/home/canal/conf/example/instance.properties配置
canal.instance.mysql.slaveId=1 #与server-id一致
canal.instance.master.address=172.17.0.1:3306 #mysql服务ip和端口
canal.instance.dbUsername=canal #mysql登录账号
canal.instance.dbPassword=canal #mysql登录账号密码
三、开发
a) 引入依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
b) 开发测试代码
public static void main(String[] args) { //创建单链接的客户端链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal"); int batchSize = 1000; //建立链接 connector.connect(); //客户端订阅,重复订阅时会更新对应的filter信息 connector.subscribe("shed-reform\\..*"); //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 connector.rollback(); try { while(true) { /** * 获取数据,自动进行确认 * 该方法返回的条件: * a. 拿够batchSize条记录或者超过timeout时间 * b. 如果timeout=0,则阻塞至拿到batchSize记录才返回 */ Message message = connector.get(batchSize, 5000L, TimeUnit.MILLISECONDS); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { continue; } else { dataHandle(message.getEntries()); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { connector.unsubscribe(); connector.disconnect(); } }
public static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.INSERT) { System.out.println("insert"); } else if (eventType == EventType.UPDATE) { System.out.println("update"); update(entry); } else if (eventType == EventType.DELETE) { System.out.println("delete"); } else if (eventType == EventType.QUERY) { System.out.println("query"); } } } }
public static void update(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); JSONObject json = new JSONObject(); for (Column column : newColumnList) { json.put(column.getName(), column.getValue()); } System.out.println(json.toJSONString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }