-- coding: utf-8 --
import json
import os
import threading
import time
from datetime import datetime, timedelta
from typing import Optional
import requests
import LogConfig as logging
class ApolloClient(object):
def init(self, app_id, cluster='default', config_server_url=os.getenv('CONFIG_SERVERS',
'http://10.160.3.160:8080'), timeout=61,
ip=None,
cache_file_dir="config"):
self.config_server_url = config_server_url
self.appId = app_id
self.cluster = cluster
self.timeout = timeout
self.stopped = False
self.ip = self.init_ip(ip)
self.cache_file_dir = cache_file_dir
self._stopping = False
self._cache = {}
self._notification_map = {'application': -1}
self.init_cache_file(cache_file_dir)
@staticmethod
def init_ip(ip: Optional[str]) -> str:
"""
get ip
:param ip:
:return:
"""
if ip is None:
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 53))
ip = s.getsockname()[0]
s.close()
except BaseException:
logging.error("Get ip error use default ip:127.0.0.1 ")
return "127.0.0.1"
return ip
@staticmethod
def init_cache_file(cache_file_dir: str):
if not os.path.isdir(cache_file_dir):
os.mkdir(cache_file_dir)
# Main method
def get_value(self, key, default_val=None, namespace='application', auto_fetch_on_cache_miss=False):
if namespace not in self._notification_map:
self._notification_map[namespace] = -1
logging.info("Add namespace {} to local notification map", namespace)
if namespace not in self._cache:
self._cache[namespace] = {}
logging.info("Add namespace {} to local cache", namespace)
# This is a new namespace, need to do a blocking fetch to populate the local cache
self._long_poll()
if key in self._cache[namespace]:
return self._cache[namespace][key]
else:
if auto_fetch_on_cache_miss:
return self._cached_http_get(key, default_val, namespace)
else:
return default_val
def update_local_cache(self, data: json, namespace: str = "application") -> None:
"""
if local cache file exits, update the content
if local cache file not exits, create a version
:param data: new configuration content
:param namespace::s
:return:
"""
# trans the config map to md5 string, and check it's been updated or not
# if it's updated, update the local cache file
with open(
os.path.join(
self.cache_file_dir,
"%s-configuration-%s.txt" % (self.appId, namespace),
), "w", ) as f:
new_string = json.dumps(data)
f.write(new_string)
f.close()
# Start the long polling loop. Two modes are provided:
# 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop
# 2: eventlet mode (recommended), no need to call the .stop() since it is async
# def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=True):
def start(self):
# # First do a blocking long poll to populate the local cache, otherwise we may get racing problems
# if len(self._cache) == 0:
# self._long_poll()
# if use_eventlet:
# import eventlet
# if eventlet_monkey_patch:
# eventlet.monkey_patch()
# eventlet.spawn(self._listener)
# else:
# if catch_signals:
# import signal
# signal.signal(signal.SIGINT, self._signal_handler)
# signal.signal(signal.SIGTERM, self._signal_handler)
# signal.signal(signal.SIGABRT, self._signal_handler)
# t = threading.Thread(target=self._listener)
# t.setDaemon(True)
# t.start()
if len(self._cache) == 0:
self._long_poll()
# start the thread to get config server with schedule
if not self.stopped:
t = threading.Thread(target=self._listener)
t.setDaemon(True)
t.start()
def stop(self):
self._stopping = True
logging.info("Stopping listener...")
def _cached_http_get(self, key, default_val, namespace='application'):
url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace,
self.ip)
r = requests.get(url)
if r.ok:
data = r.json()
self._cache[namespace] = data
logging.info('Updated local cache for namespace {}', namespace)
else:
data = self._cache[namespace]
if key in data:
return data[key]
else:
return default_val
def _uncached_http_get(self, namespace='application'):
url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip)
r = requests.get(url)
if r.status_code == 200:
data = r.json()
self._cache[namespace] = data['configurations']
self.update_local_cache(data['configurations'], namespace)
logging.info('Updated local cache for namespace {} release key {}: {}',
namespace, data['releaseKey'],
repr(self._cache[namespace]))
def _signal_handler(self, signal, frame):
logging.info('You pressed Ctrl+C!')
self._stopping = True
def _long_poll(self):
url = '{}/notifications/v2'.format(self.config_server_url)
notifications = []
for key in self._notification_map:
notification_id = self._notification_map[key]
notifications.append({
'namespaceName': key,
'notificationId': notification_id
})
try:
r = requests.get(url=url, params={
'appId': self.appId,
'cluster': self.cluster,
'notifications': json.dumps(notifications, ensure_ascii=False)
}, timeout=self.timeout)
logging.debug('Long polling returns {}: url={}', r.status_code, r.request.url)
if r.status_code == 304:
# no change, loop
logging.debug('No change, loop...')
return
if r.status_code == 200:
data = r.json()
for entry in data:
ns = entry['namespaceName']
nid = entry['notificationId']
logging.info("{} has changes: notificationId={}", ns, nid)
self._uncached_http_get(ns)
self._notification_map[ns] = nid
else:
logging.debug('Sleep...')
time.sleep(self.timeout)
except BaseException:
logging.error(
'Failed to get the configuration remotely, the default configuration will be used')
f = open("./%s/%s-configuration-%s.txt" % (self.cache_file_dir, self.appId, "application"),
encoding='utf-8')
# # 读取文件
content = f.read()
self._cache["application"] = json.loads(content)
def _listener(self):
logging.info('Entering listener loop...')
while not self._stopping:
# self.timeout seconds to pull a configuration
logging.info("Long polling pulls Apollo configuration, next execution time: {}",
datetime.now() + timedelta(seconds=self.timeout))
self._long_poll()
logging.info("Listener stopped!")
self.stopped = True
@staticmethod
def create_apollo_client():
# f = open("./apollo_config_url", encoding='utf-8')
# content = f.read()
# f.close()
# apollo_config_url = content
return ApolloClient(app_id="Flight-Schedule-Service", cluster="default", timeout=61, cache_file_dir="config")
if name == 'main':
client = ApolloClient.create_apollo_client()
pass
# client = ApolloClient(app_id="Flight-Schedule-Service")
# client.start()
# if sys.version_info[0] < 3:
# v = raw_input('Press any key to quit...\n')
# else:
# v = input('Press any key to quit...\n')
#
# client.stop()
# while not client.stopped:
# pass