十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
为了实现 实时同步数据,在mac环境搭建了canal,MySQL,kafka的一套流程
10年积累的网站建设、成都网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有孝南免费网站建设让你可以放心的选择与我们合作。
使用canal加mysql加kafka的方式传递数据
mysql 数据源头
canal模仿slave冲mysql取数据。。是一个管道
kafka 将canal获取的数据放入kafka 。然后消费(程序获取kafka的队列。消费数据)
mysql安装
这个就不详述了。。不要安装最新版的mysql。本人亲测8.0版本和canal不一定能很好的兼容
所以。。安装了mysql5.7
安装命令(这样安装 安装的是8.0版本)
brew install mysql
2.java安装
不管是canal还是kafka都是需要java环境的。。
也不能用最新版的java,推荐使用java8也就是jdk8
最好是从官网oracle下载
安装方法:
https://blog.csdn.net/oumuv/article/details/84064169
3.canal安装
在装calal之前 确保mysql 及 java8 环境安装好了
下载地址: https://github.com/alibaba/canal/releases
下载 canal.deployer-1.1.4.tar.gz
在家目录下面新建一个canal目录.直接解压就行
官方文件已经编译好了,不需再编译
需要添加账户(canal@%及canal@local)
在canal/bin目录下有几个脚本文件,startup.sh 启动服务用的,stop.sh 停止服务用的
在canal/logs目录下放的是日志文件。
在canal/conf目录下放的是配置文件。
实例配置文件
canal/conf/example/instance.properties
实例的配置文件。。决定了 你连接哪个实例的数据库 可以精确到表
instance.properties中比较重要的参数
#实例
canal.instance.master.address = 127.0.0.1:3306
#db
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =test
canal.instance.connectionCharset = UTF-8
#table
# table regex
canal.instance.filter.regex = test.ttt
#mq topic
canal.mq.topic=test12345
上面这样配置 就代表了。。通过canal连接127.0.0.1中的test库的ttt表,放到一个叫做test12345的topic里面
验证canal是否和mysql连接 只需要在mysql的进程里面查看是否有一个复制连接(因为canal就是模仿了一个slave)
还有一个全局的文件
表示了需要连接的kafka zookeeper等
canal/conf/canal.properties
列举canal连接kafka的重要参数
canal.id= 1#如果有多个canal 这个值和集群中的canal不能冲突
canal.ip=172.17.61.113#canal的ip
canal.port= 11111
canal.zkServers=172.17.61.113:2181#zookeeper的ip:port
canal.serverMode = kafka
canal.destinations = example
#mq
canal.mq.servers = 172.17.61.113:9092#kafka的ip:port
上面有些关于kafka的参数 是 需要kafka安装好之后才能陪得(因为已经装好了 所以先列举出来)
4.kafka安装
kafka安装时需要zookeeper的。。
但是一般zookeeper会集成在kafka里面 所以不需要特别单独安装(也可以单独安装)
mac环境安装命令如下
brew install kafka
一直等安装完成就行
运行kafka是需要依赖于zookeeper的,所以安装kafka的时候也会包含zookeeper。
kafka的安装目录:/usr/local/Cellar/kafka/2.0.0/bin
kafka的配置文件目录:/usr/local/Cellar/kafka/2.0.0/bin
kafka服务的配置文件:/usr/local/etc/kafka/server.properties
zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties
listeners=PLAINTEXT://172.17.61.113:9092
advertised.listeners=PLAINTEXT://172.17.61.113:9092
zookeeper 配置文件我在安装的时候 没有做任何修改
然后 通过上面配置canal.properties 就连接上kafka
kafka的基本命令(可以通过find / -name zookeeper-server-start 查找具体的位置)
首先,启动zookeeper:
zookeeper-server-start /usr/ local /etc/kafka/zookeeper.properties
然后,启动kafka
kafka- server -start /usr/local/etc/kafka/ server .properties
创建一个“使用单个分区”、“只有一个副本”、名为“test”的主题的topic
kafka-topics --create --zookeeper localhost :2181 --replication-factor 1 --partitions 1 --topic test
(创建topic,这个topic。我们在canal中已经有了配置)
使用如下命令创建topic
kafka-topics --create --zookeeper localhost:2181/kafka --replication-factor 1
--partitions 1 --topic topic1 查看创建的topic,运行list topic命令:
kafka-topics --list --zookeeper 172.17.61.113:2181 生产消息
kafka-console-producer --broker-list 172.17.61.113:9092 --topic test
(消息数据由canal自动发送。所以这里的命令我们只有明白就行)
消费消息(这个地方就可以最后来查看mysql+canal+kafka是否已经联动起来了)
kafka-console-consumer --bootstrap-server 172.17.61.113:9092 --topic test12345 --from-beginning
我在mysql里面的test库的ttt表里加了一条记录
然后反应在kafka就是如下结果
{"data":[{"id":"13","var":"ded"}],"database":"test","es":1575448571000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","var":"varchar(5)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"var":12},"table":"ttt","ts":1575448571758,"type":"INSERT"}
这就表示整个mysql+canal+kafka已经成功了
接下来只要等着程序那边去消费这个队列信息就好了
##############
有可能的报错
服务端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
是由于你改了配置文件,导致meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
解决方法:删除meta.dat删除,再重启canal,问题解决;
详见
https://www.cnblogs.com/shaozhiqi/p/11534658.html