首页 > 其他分享 >kafka的使用—系统保卫战

kafka的使用—系统保卫战

时间:2023-06-11 18:01:59浏览次数:48  
标签:goods zookeeper 系统 保卫战 topic dict mysql kafka


前言

最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。

讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~

下面就使用python来模拟一下我们的方案(希望大家来吐槽 :) )

软件介绍

在这里我们使用 zookeeper + kafka 的方案来做。

软件

版本

其他

zookeeper

3.4.6

 

kafka

2.10-0.9.0.0

 

pykafka

2.1.2

python的kafka API

zookeeper + kafka 基本使用教程

http://www.linuxidc.com/Linux/2014-07/104470.htm

先决条件

  1. 使用zookeeper、kafka创建一个topic名为 goods-topic
  2. 需要安装pykafka一个python的zookeeper、kafka API
  3. 一个goods示例数据库
  • 使用消息队列:

 

# 启动zookeeper
/ usr / local / zookeeper / bin / zkServer . sh start
# 启动kafka
/ usr / local / kafka / bin / kafka - server - start . sh / usr / local / kafka / config / server . properties > / tmp / kafka - logs / kafka . out 2 > & 1 &
# 创建 goods-topic
/ usr / local / kafka / bin / kafka - topics . sh \
   -- create \
   -- zookeeper localhost : 2181 \
   -- replication - factor 1 \
   -- partitions 1 \
   -- topic test



 




 

  • 安装pykafka:

 

	
pip install pykafka


 




官网:http://readthedocs.org/projects/pykafka/

  • 创建示例数据库:

 

CREATE TABLE goods (
   goods_id INT NOT NULL AUTO_INCREMENT ,
   goods_name VARCHAR ( 30 ) NOT NULL ,
   goods_price DECIMAL ( 13 , 2 ) NOT NULL DEFAULT 0.00 ,
   create_time DATETIME NOT NULL ,
   PRIMARY KEY ( goods_id )
) ;



 




 

伪代码展示

  • 生产者端伪代码-python

 

import time , json
from pykafka import KafkaClient
 
# 相关的mysql操作
mysql_op ( )
 
# 可接受多个Client这是重点
client = KafkaClient ( hosts = "192.168.1.233:9092, \
                            192.168.1.233:9093, \
                            192.168.1.233:9094" )
# 选择一个topic
topic = client . topics [ 'goods-topic' ]
# 创建一个生产者
producer = topic . get_producer ( )
# 模拟接收前端生成的商品信息
goods_dict = {
   'option_type' : 'insert'
   'option_obj' : {
     'goods_name' : 'goods-1' ,
     'goods_price' : 10.00 ,
     'create_time' : time . strftime ( '%Y-%m-%d %H:%M:%S' )
   }
}
goods_json = json . dumps ( goods_dict )
# 生产消息
producer . produce ( msg )



 




 

  • 消费者端伪代码-python(作为后台进程在跑)

 

import time , json
from pykafka import KafkaClient
# 可接受多个Client这是重点
client = KafkaClient ( hosts = "192.168.1.233:9092, \
                            192.168.1.233:9093, \
                            192.168.1.233:9094" )
# 选择一个topic
topic = client . topics [ 'goods-topic' ]
# 生成一个消费者
balanced_consumer = topic . get_balanced_consumer (
   consumer_group = 'goods_group' ,
   auto_commit_enable = True ,
   zookeeper_connect = 'localhost:2181'
)
# 消费信息
for message in balanced_consumer :
   if message is not None :
     # 解析json为dict
     goods_dict = json . loads ( message )
     # 对数据库进行操作
     if goods_dict [ 'option_type' ] == 'insert' :
       mysql_insert ( )
     elif goods_dict [ 'option_type' ] == 'update' :
       mysql_update ( )
     elif goods_dict [ 'option_type' ] == 'delete' :
       mysql_delete ( )
     else :
       order_option ( )
 


 




 

 作者信息

昵称:HH


标签:goods,zookeeper,系统,保卫战,topic,dict,mysql,kafka
From: https://blog.51cto.com/u_6186189/6458431

相关文章

  • Python使用Redis实现一个简单作业调度系统
        概述Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。下面是实现上......
  • Python使用multiprocessing实现一个最简单的分布式作业调度系统
    介绍Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。实现Job首先创建一个Job类,为......
  • m基于FPGA的QPSK调制解调通信系统verilog实现,包含testbench,不包含载波同步
    1.算法仿真效果本系统进行了两个平台的开发,分别是:Vivado2019.2Quartusii18.0+ModelSim-Altera6.6dStarterEdition其中Vivado2019.2仿真结果如下:Quartusii18.0+ModelSim-Altera6.6dStarterEdition的测试结果如下:2.算法涉及理论知识概要QPSK是一种数字调制方式,它将......
  • Debian 12 "bookworm" 发布 - 通用操作系统
    Debian12"bookworm"发布-通用操作系统基于Linuxkernel6.1LTS,支持APFS读写请访问原文链接:https://sysin.org/blog/debian-12/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgDebian12“bookworm”发布2023年6月10日经过1年9个月28天的开......
  • m基于FPGA的QPSK调制解调通信系统verilog实现,包含testbench,不包含载波同步
    1.算法仿真效果 本系统进行了两个平台的开发,分别是: Vivado2019.2 Quartusii18.0+ModelSim-Altera6.6d StarterEdition 其中Vivado2019.2仿真结果如下:   Quartusii18.0+ModelSim-Altera6.6d StarterEdition的测试结果如下:    2.算法涉及理......
  • Vue_Django 登录注册+图书管理系统
    Vue前端注册页面点击查看代码<template><divclass="register"><el-row:gutter="20"><el-col:span="12":offset="6"><divclass="grid-contentbg-purple">&......
  • Spring Boot&Vue3前后端分离实战wiki知识库系统<八>--分类管理功能开发二
    接着上一次SpringBoot&Vue3前后端分离实战wiki知识库系统<七>--分类管理功能开发的分类功能继续完善。分类编辑功能优化:概述:现在分类编辑时的界面长这样:很明显目前的父分类的展现形式不太人性,这里需要指定父分类的id才可以,对于用户来说这种交互是反人道的,用户怎么知道父分类......
  • 雷达原理与系统 第十八讲 雷达终端(1)
    雷达原理与系统第十八讲雷达终端(1)0.概述——1).主要分为2讲,即4个方面内容,包括——A.雷达终端任务B.雷达终端显示器C.雷达点迹录取D.雷达数据处理2).本文主要介绍——A.雷达终端任务B.雷达终端显示器3).雷达接收机将天线接收到的回波信号进行射频放大、混频......
  • 基于QT实现的影院票务系统[2023-06-11]
    基于QT实现的影院票务系统[2023-06-11]1系统权限管理系统分3种用户权限:A游客权限-注册会员,查看电影场次信息,购买电影票。B会员权限-登录系统,管理个人信息,查看电影场次信息,购买电影票。C票务管理权限-登录系统,管理电影场次信息,查看电影票售卖情况,管理会员。以上为基础需......
  • 【Java技术专题】「Guava开发指南」手把手教你如何进行使用Guava工具箱进行开发系统实
    异常传播有时候,您可能需要重新抛出捕获到的异常。这种情况通常发生在捕获到Error或RuntimeException时,因为您可能没有预料到这些异常,但在声明捕获Throwable和Exception时,它们也被包含在内了。为了解决这个问题,Guava提供了多种方法来判断异常类型并重新抛出异常。例如:try{......