Canal
Canal介绍
功能:通过读取Mysql的Binlog,实时采集数据库数据的变化写到消息队列。
原理:将自己伪装成Slave,假装从Master复制数据
使用场景:1.异地数据库之间的同步 2.更新缓存,读取主库更新,在缓存服务器中更新 3.实时更新
文档地址:https://github.com/alibaba/canal/wiki
Binlog
Binlog:记录了DDL和DML的语句,以事件的方式记录,包含了消耗的事件。会有1%性能损耗
Binlog好处:1.主节点Master开启Binlog,Master把他的二进制日志传递给Slaves来达到主从一致
2.可以通过Binlog工具恢复数据
Binlog内容:.index二进制日志索引文件 .00000*记录数据库所有的DDL,DML
Binlog分类:1.statsment 语句级别:记录每一次写操作的语句
节省空间,但是可能会造成数据不一致(当前事件,随机数等)
2.row 行级别:记录每次操作后每行的变化 数据一致,但是比较占用空间
3.mixed statsment结合版本 默认使用statsment,特殊情况用row
单机环境安装:https://www.cnblogs.com/wuxiaolong4/p/16548910.html
mysql开启Binlog
show variables like 'log_%'; log_bin=on ##默认开启
show variables like 'binlog_format'; ##开启方式
vim /etc/my.cnf ##没有验证,mysql是默认开启的
log_bin=mysql-bin
binlog_format=row
binlog-do-db=test(也可以不要,监听所有库)
service mysql stop;service mysql start;
mysql授权canal用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
Canal 安装
mkdir /opt/canal
cd /opt/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.deployer-1.1.7-SNAPSHOT.tar.gz
tar -xvf canal.deployer-1.1.7-SNAPSHOT.tar.gz
mv canal.deployer-1.1.7-SNAPSHOT.tar.gz canal.tar.gz
mv canal.tar.gz /opt/pkg/
cd /opt/canal/conf
example:下面是监控的实例 canal.properties:配置
vim canal.properties
canal.serverMode = tcp //数据输出的位置
canal.destinations = example //监控多个实例
#canal.admin.user = admin
#canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
vim example/instance.properties
canal.instance.mysql.slaveId=20 //和mysql的ID不一样就行
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal 代码依赖、数据类介绍
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
Message:一次Canal从抓取的信息,可以包含多个SQL的结果
Entry:对应一个sql结果,一个sql可能会对多行记录造成影响
TableName EntryType(操作类型DDM,DML) StoreValue(序列化)==>RowChange(反序列化)
RowChange:EventType(当前数据类型) RowDataList数据集合(RowData:column)
canal 代码
import java.net.InetSocketAddress
import scala.collection.JavaConversions._
import com.alibaba.otter.canal.client.CanalConnectors
import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, RowChange}
val canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("114.116.44.117",11111), "example", "", "");
canalConnector.connect()
canalConnector.subscribe("test.*")//订阅数据库
while(true){
val message = canalConnector.get(100).getEntries //获取100条数据,非阻塞式
if(message.size>0){
for(messageiter <- message){
val tablename = messageiter.getHeader.getTableName
val entryType= messageiter.getEntryType//数据类型
val storeValue = messageiter.getStoreValue//数据二进制
if(entryType.equals(EntryType.ROWDATA)){//数据类型必须是ROWDATA
val rowChange = RowChange.parseFrom(storeValue)//反序列化数据
val eventType= rowChange.getEventType //当前事件的操作类型
println("eventType:"+eventType)
val rowData= rowChange.getRowDatasList.iterator() //返回数据集合
for(data <- rowData){
val before = data.getBeforeColumnsList
for(beforeItem <- before){
println("eventType:"+eventType+"|before:"+beforeItem.getName+":"+beforeItem.getValue)
}
val after = data.getAfterColumnsList
for(afterItem <- after){
println("eventType:"+eventType+"|before:"+afterItem.getName+":"+afterItem.getValue)
}
}
}
}
}else{
println("nodata")
Thread.sleep(5000);
}
}
mysql kafka环境信息
cd /opt/mysql/mysql/bin
./mysql -u root -p'123456'
cd /opt/kafka/kafka/bin/
##启动ZK
./zookeeper-server-start.sh -daemon /opt/kafka/kafka/config/zookeeper.properties
##启动kafka
./kafka-server-start.sh -daemon /opt/kafka/kafka/config/server.properties
##删除topics
./kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic events
##创建topics
./kafka-topics.sh --create --topic events --bootstrap-server localhost:9092
##desc topics
./kafka-topics.sh --describe --topic events --bootstrap-server localhost:9092
##创建命令行生产者
./kafka-console-producer.sh --topic events --bootstrap-server localhost:9092
##打印topic
./kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server localhost:9092
canal kafka
canal.properties
canal.serverMode = kafka
instance.properties
canal.mq.topic=events
canal.mq.partitionsNum=3 ##分区数量
canal.mq.partitionHash=test.table:id ##列名作为hash的值
标签:Canal,canal,val,--,kafka,##,mysql
From: https://www.cnblogs.com/wuxiaolong4/p/16804587.html