首页 > 编程语言 >Python + Apollo 实现配置中心

Python + Apollo 实现配置中心

时间:2024-02-26 17:33:50浏览次数:41  
标签:配置 Python ip self cache namespace url ._ Apollo

-- coding: utf-8 --

import json
import os
import threading
import time
from datetime import datetime, timedelta
from typing import Optional

import requests

import LogConfig as logging

class ApolloClient(object):
def init(self, app_id, cluster='default', config_server_url=os.getenv('CONFIG_SERVERS',
'http://10.160.3.160:8080'), timeout=61,
ip=None,
cache_file_dir="config"):
self.config_server_url = config_server_url
self.appId = app_id
self.cluster = cluster
self.timeout = timeout
self.stopped = False
self.ip = self.init_ip(ip)
self.cache_file_dir = cache_file_dir
self._stopping = False
self._cache = {}
self._notification_map = {'application': -1}
self.init_cache_file(cache_file_dir)

@staticmethod
def init_ip(ip: Optional[str]) -> str:
    """
    get ip
    :param ip:
    :return:
    """
    if ip is None:
        try:
            import socket
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("8.8.8.8", 53))
            ip = s.getsockname()[0]
            s.close()
        except BaseException:
            logging.error("Get ip error use default ip:127.0.0.1 ")
            return "127.0.0.1"
    return ip

@staticmethod
def init_cache_file(cache_file_dir: str):
    if not os.path.isdir(cache_file_dir):
        os.mkdir(cache_file_dir)

# Main method
def get_value(self, key, default_val=None, namespace='application', auto_fetch_on_cache_miss=False):
    if namespace not in self._notification_map:
        self._notification_map[namespace] = -1
        logging.info("Add namespace {} to local notification map", namespace)

    if namespace not in self._cache:
        self._cache[namespace] = {}
        logging.info("Add namespace {} to local cache", namespace)
        # This is a new namespace, need to do a blocking fetch to populate the local cache
        self._long_poll()
    if key in self._cache[namespace]:
        return self._cache[namespace][key]
    else:
        if auto_fetch_on_cache_miss:
            return self._cached_http_get(key, default_val, namespace)
        else:
            return default_val

def update_local_cache(self, data: json, namespace: str = "application") -> None:
    """
    if local cache file exits, update the content
    if local cache file not exits, create a version
    :param data: new configuration content
    :param namespace::s
    :return:
    """
    # trans the config map to md5 string, and check it's been updated or not
    # if it's updated, update the local cache file
    with open(
            os.path.join(
                self.cache_file_dir,
                "%s-configuration-%s.txt" % (self.appId, namespace),
            ), "w", ) as f:
        new_string = json.dumps(data)
        f.write(new_string)
        f.close()

# Start the long polling loop. Two modes are provided:
# 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop
# 2: eventlet mode (recommended), no need to call the .stop() since it is async
#    def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=True):
def start(self):
    # # First do a blocking long poll to populate the local cache, otherwise we may get racing problems
    # if len(self._cache) == 0:
    #     self._long_poll()
    # if use_eventlet:
    #     import eventlet
    #     if eventlet_monkey_patch:
    #         eventlet.monkey_patch()
    #     eventlet.spawn(self._listener)
    # else:
    #     if catch_signals:
    #         import signal
    #         signal.signal(signal.SIGINT, self._signal_handler)
    #         signal.signal(signal.SIGTERM, self._signal_handler)
    #         signal.signal(signal.SIGABRT, self._signal_handler)
    #     t = threading.Thread(target=self._listener)
    #     t.setDaemon(True)
    #     t.start()
    if len(self._cache) == 0:
        self._long_poll()
    # start the thread to get config server with schedule
    if not self.stopped:
        t = threading.Thread(target=self._listener)
        t.setDaemon(True)
        t.start()

def stop(self):
    self._stopping = True
    logging.info("Stopping listener...")

def _cached_http_get(self, key, default_val, namespace='application'):
    url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace,
                                                      self.ip)
    r = requests.get(url)
    if r.ok:
        data = r.json()
        self._cache[namespace] = data
        logging.info('Updated local cache for namespace {}', namespace)
    else:
        data = self._cache[namespace]

    if key in data:
        return data[key]
    else:
        return default_val

def _uncached_http_get(self, namespace='application'):
    url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip)
    r = requests.get(url)
    if r.status_code == 200:
        data = r.json()
        self._cache[namespace] = data['configurations']
        self.update_local_cache(data['configurations'], namespace)
        logging.info('Updated local cache for namespace {} release key {}: {}',
                     namespace, data['releaseKey'],
                     repr(self._cache[namespace]))

def _signal_handler(self, signal, frame):
    logging.info('You pressed Ctrl+C!')
    self._stopping = True

def _long_poll(self):
    url = '{}/notifications/v2'.format(self.config_server_url)
    notifications = []
    for key in self._notification_map:
        notification_id = self._notification_map[key]
        notifications.append({
            'namespaceName': key,
            'notificationId': notification_id
        })
    try:
        r = requests.get(url=url, params={
            'appId': self.appId,
            'cluster': self.cluster,
            'notifications': json.dumps(notifications, ensure_ascii=False)
        }, timeout=self.timeout)
        logging.debug('Long polling returns {}: url={}', r.status_code, r.request.url)
        if r.status_code == 304:
            # no change, loop
            logging.debug('No change, loop...')
            return

        if r.status_code == 200:
            data = r.json()
            for entry in data:
                ns = entry['namespaceName']
                nid = entry['notificationId']
                logging.info("{} has changes: notificationId={}", ns, nid)
                self._uncached_http_get(ns)
                self._notification_map[ns] = nid
        else:
            logging.debug('Sleep...')
            time.sleep(self.timeout)
    except BaseException:
        logging.error(
            'Failed to get the configuration remotely, the default configuration will be used')
        f = open("./%s/%s-configuration-%s.txt" % (self.cache_file_dir, self.appId, "application"),
                 encoding='utf-8')
        # # 读取文件
        content = f.read()
        self._cache["application"] = json.loads(content)

def _listener(self):
    logging.info('Entering listener loop...')
    while not self._stopping:
        # self.timeout seconds to pull a configuration
        logging.info("Long polling pulls Apollo configuration, next execution time: {}",
                     datetime.now() + timedelta(seconds=self.timeout))
        self._long_poll()

    logging.info("Listener stopped!")
    self.stopped = True

@staticmethod
def create_apollo_client():
    # f = open("./apollo_config_url", encoding='utf-8')
    # content = f.read()
    # f.close()
    # apollo_config_url = content
    return ApolloClient(app_id="Flight-Schedule-Service", cluster="default", timeout=61, cache_file_dir="config")

if name == 'main':
client = ApolloClient.create_apollo_client()
pass
# client = ApolloClient(app_id="Flight-Schedule-Service")
# client.start()
# if sys.version_info[0] < 3:
# v = raw_input('Press any key to quit...\n')
# else:
# v = input('Press any key to quit...\n')
#
# client.stop()
# while not client.stopped:
# pass

标签:配置,Python,ip,self,cache,namespace,url,._,Apollo
From: https://www.cnblogs.com/lyuSky/p/18034814

相关文章

  • 简化 Python 日志管理:Loguru 入门指南
    简化Python日志管理:Loguru入门指南在开发和维护软件项目时,高效的日志管理系统对于监控应用程序的行为、调试代码和追踪异常至关重要。Python的标准日志模块虽然功能强大,但其配置和使用往往较为复杂,尤其是对于新手开发者。这就是Loguru库发挥作用的地方,它以极简的方式重新定......
  • Python中字典setdefault()方法和append()的配合使用
    1.setdefault()方法语法dict.setdefault(key,default=None)说明:如果字典中包含给定的键值,那么返回该键对应的值。否则,则返回给定的默认值。Syntax:dict.setdefault(key,default_value)Parameters:Ittakestwoparameters:key–Keytobesearchedinthedictionar......
  • Python报错symbol lookup error: xxx.so: undefined symbol: cufftxxx解决办法
    技术背景在上一篇文章中介绍过如何实现本地MindSpore的CUDA算子,那么在算子编译和使用的过程中可能会出现一些小问题,这里介绍的是编译成功为so动态链接库之后,在python中调用,提示找不到xxx函数/字符的报错。这里使用的编译指令为:$nvcc--shared-Xcompiler-fPIC-oxxx.soxxx.c......
  • GB28181视频监控平台EasyCVR如何通过配置实现级联不响应下级平台的检索消息?
    AI视频智能分析/视频监控管理平台EasyCVR能在复杂的网络环境中(专网、内网、局域网、广域网、公网等)将前端海量的设备进行统一集中接入与视频汇聚管理,平台支持设备通过4G、5G、WIFI、有线等方式进行视频流的快捷接入和传输。平台能将接入的视频流进行汇聚、转码与多格式分发,可分发......
  • Python嵌套绘图并为条形图添加自定义标注
    论文绘图时经常需要多图嵌套,正好最近绘图用到了,记录一下使用Python实现多图嵌套的过程。首先,实现Seaborn分别绘制折线图和柱状图。'''绘制折线图'''importseabornassnsimportmatplotlib.pyplotaspltimportwarningswarnings.filterwarnings("ignore","use_inf_as_n......
  • Python函数每日一讲29 - 一文让你彻底掌握Python中的getattr函数
    引言在Python中,getattr()函数是一种强大的工具,它允许我们在运行时动态地访问对象的属性和方法。本文将介绍getattr()函数的基本语法、常见用法和高级技巧,帮助大家更好地理解和应用这一函数。语句概览getattr()函数的语法如下:getattr(object,name[,default])其中:ob......
  • Python Django适配dm8(达梦)数据库
    官方文档https://eco.dameng.com/document/dm/zh-cn/start/python-development.htmlDjango适配达梦https://blog.csdn.net/qq_35349982/article/details/132165581https://blog.csdn.net/weixin_61894388/article/details/126330168项目适配达梦升级或安装依赖Django==3......
  • gin环境&路由配置
    Gin是一个Go(Golang)编写的轻量级httpweb框架,运行速度非常快Gin的官网:https://gin-gonic.com/zh-cn/GinGithub地址:https://github.com/gin-gonic/gingin环境搭建下载并安装#gomod同级目录下goget-ugithub.com/gin-gonic/gin......
  • Android 多渠道配置
    Android多包名,icon本篇文章主要记录下android下的同一工程,打包时配置不同的包名,icon,名称等信息.1:多包名首先讲述下如何配置多包名.在build.gralde的android标签下添加:productFlavors{xiaomi{applicationId"com.test.usagetest"}......
  • umijs 项目配置问题汇总
    umi配置问题汇总umi或@umijs/max集成tailwindcss正常umi内置了tailwindcss插件,详情可参考官方文档TailwindCSS配置生成器但是由于内置的tailwindcss插件过老,umi官方已不推荐使用内置,建议使用tailwindcss官方的配置。详情可见issue同时,umi官方也不推荐使用p......