首页 > 数据库 >Django 实现Canal 读取 MySQL 写入到 Kafka中

Django 实现Canal 读取 MySQL 写入到 Kafka中

时间:2023-03-08 16:22:52浏览次数:35  
标签:Canal canal settings CANAL SETTINGS Django MySQL message

安装canal-python库:
pip install canal-python
修改settings.py 文件,添加Canal相关配置
CANAL_SETTINGS = {
    "canal_host": "127.0.0.1",
    "canal_port": 11111,
    "canal_username": "",
    "canal_password": "",
    "canal_destination": "example",
    "canal_filter": ".*\\..*",
}
#  其中,canal_host、canal_port、canal_username、canal_password是Canal服务器的相关配置信息;canal_destination是要同步的数据库的名称;canal_filter是要同步的表的名称,可以使用正则表达式匹配多个表。
在Django中创建一个canal.py文件,添加以下代码
from django.conf import settings
from canal.client import Client
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)

def message_handler(message):
    # 将同步的数据发送到Kafka中
    producer.send("example", message['data'])

def start_canal():
    # 连接Canal服务器
    client = Client()
    client.connect(host=settings.CANAL_SETTINGS["canal_host"],
                   port=settings.CANAL_SETTINGS["canal_port"],
                   username=settings.CANAL_SETTINGS["canal_username"],
                   password=settings.CANAL_SETTINGS["canal_password"])
    # 订阅binlog日志
    client.subscribe(
        destination=settings.CANAL_SETTINGS["canal_destination"],
        filter=settings.CANAL_SETTINGS["canal_filter"]
    )
    # 处理同步的数据
    while True:
        message = client.get(1)
        for entry in message['entries']:
            if entry.entryType == "ROWDATA":
                for rowChange in entry.rowChanges:
                    if rowChange.eventType == "INSERT" or rowChange.eventType == "UPDATE":
                        message_handler(rowChange)
    # 关闭Canal连接
    client.disconnect()
在Django的启动文件中(例如manage.py)中添加以下代码:
from canal import start_canal

if __name__ == '__main__':
    start_canal()
需要注意的是,为了确保Canal和Kafka的正常运行,需要在Django的环境中安装canal-python和kafka-python库,并在settings.py文件中添加Kafka的相关配置信息。

标签:Canal,canal,settings,CANAL,SETTINGS,Django,MySQL,message
From: https://www.cnblogs.com/qingtianyu2015/p/17192447.html

相关文章

  • 安装goldengate软件for mysql(ogg)
    os:centos7ogg版本:19.1.0.0.3mysql:5.71.将安装文件上传到指定的目录#cd/#mkdir/soft/ogg将ogg安装包上传到该目录2.创建安装软件的目录[root@localhost/]#cd/[ro......
  • 如何学习MySQL,这几本书初学者必看!
    《高性能MySQL》第四版发布后,收到了很多读者的反馈,其中关注最多的是作为一个初学者,应该如何能够较为系统的学习MySQL,从而应对日常工作或者获得更好的职业发展。于是和多个......
  • 云图说丨云数据库GaussDB(for MySQL)事务拆分大揭秘
    摘要:数据库代理提供事务拆分的功能,能够将事务内写操作之前的读请求转发到只读节点,降低主节点负载。本文分享自华为云社区《【云图说】第270期云数据库GaussDB(forMySQL)......
  • redis之列表、redis之hash、redis其他操作、redis 管道、django中使用redis、celery介
    目录1redis之列表2redis之hash3redis其他操作4redis管道5django中使用redis5celery介绍和安装6celery快速使用7celery包结构#1登录注册前端 -登录 -手......
  • mysql5.7msi安装
    本文介绍的是只安装MySQL数据库的过程,并不包含各种其他附加工具。安装完成之后通常使用Navicat或SQLyog进行可视化操作。清华的镜像网站只保存最新的几个MySQL版本,所以直......
  • 【黄啊码】MySQL入门—4、掌握这些数据筛选技能比你学python还有用-1
    大家好!我是黄啊码,今天没继续select*了吧,如果还继续,那接下来的课程先别学,回去好好把之前的课程重复复习一遍,学明白了我们再会?废话不多说,学今天的课程之前我们先来说说这几......
  • MYSQL性能优化的最佳20+条经验
    MYSQL性能优化的最佳20+条经验 2009年11月27日  陈皓 评论 169条评论  254,080人阅读今天,数据库的操作越来越成为整个应用的性能瓶颈了,这点对于Web应用尤其......
  • mysql invalid conn排查
    mysqlinvalidconn排查服务监控系列文章服务监控系列视频问题背景服务使用golang,客户端库是go-mysql-driver,系统测试环境频繁但是不总是报出invalidconn错误,但实......
  • 记- django通过celery beat results实现定时任务
    1.实验环境python版本:3.7.8django版本:3.2.15celery版本:5.2.7django-celery版本:3.2.1django-celery-beat版本:2.4.0django-celery-results版本:2.4.0django-redis版本......
  • MySQL timestamp类型
    MySQLTIMESTAMP简介MySQLTIMESTAMP是一种保存日期和时间组合的时间数据类型。TIMESTAMP列的格式为YYYY-MM-DDHH:MM:SS,固定为19个字符。TIMESTAMP值的范围从'1970-01-......