import time from consul import Consul, Check class ConsulServiceManager: def __init__(self, host, port=8500, schema="http"): self._host = host self._port = port self._schema = schema self._local_ip = "127.0.0.1" # 通过环境变量获取 self.consul = Consul(host=self._host, port=self._port, scheme=self._schema) def register_service(self, service_name, service_id, service_address, service_port): self.consul.agent.service.register( name=service_name, service_id=service_id, address=service_address, port=service_port, check=Check.http( f'https://{service_address}:{service_port}/health', interval='10s', timeout='5s', ), ) print(f'Service {service_name} registered with ID {service_id}.') def deregister_service(self, service_id): self.consul.agent.service.deregister(service_id) print(f'Service with ID {service_id} deregistered.') def discover_service(self, service_name): index, services = self.consul.catalog.services() print("Available services:") for service in services: print(f'- {service}') # 获取具体服务的信息 index, service_info = self.consul.catalog.service(service_name) if service_info: print(f"Service '{service_name}' found with info: {service_info}") else: print(f"Service '{service_name}' not found.") if __name__ == '__main__': # 创建 ConsulServiceManager 实例 consul_manager = ConsulServiceManager(host='192.168.20.50') # 服务信息 _service_name = 'my_service' _service_id = 'my_service_id' _service_address = 'www.xxx.com' # 使用 Consul 的主机名 _service_port = 8080 # 注册服务 consul_manager.register_service( _service_name, _service_id, _service_address, _service_port ) # 等待一段时间,以便服务注册 time.sleep(5) # 发现服务 consul_manager.discover_service(_service_name) # 注销服务 time.sleep(5) consul_manager.deregister_service(_service_id)
标签:name,service,Consul,self,port,consul,测试代码,id From: https://www.cnblogs.com/52-qq/p/18653204