首页 > 编程语言 >5、zookeeper的java -Curator(服务注册与发现)

5、zookeeper的java -Curator(服务注册与发现)

时间:2023-05-15 19:04:42浏览次数:26  
标签:java String zookeeper Curator curator org apache import serviceDiscovery




目录

  • Zookeeper系列文章目录
  • 一、知识介绍
  • 1、ServiceInstance
  • 2、ServiceProvider
  • 3、ServiceDiscovery
  • 1)、注册/注销服务
  • 2)、查询服务
  • 3)、服务缓存
  • 二、示例1
  • 1、pom.xml
  • 2、ServiceInstance
  • 3、ServiceProvider及ServiceDiscovery
  • 4、注册服务的添加、删除、查询
  • 5、验证示例1
  • 三、示例2
  • 1、InstanceDetails
  • 2、ProviderService
  • 3、ConsumerService
  • 4、验证



本文介绍使用zookeeper的Curator类库实现服务的注册与发现。
该示例会涉及到本专栏下的其他文章,比如:4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁) 等文章,都是在该专栏下的。

一、知识介绍

Curator Service Discovery它对此抽象出了ServiceInstance、ServiceProvider、ServiceDiscovery三个接口,通过它我们可以很轻易的实现Service Discovery。

1、ServiceInstance

Curator中使用ServiceInstance作为一个服务实例,ServiceInstances具有名称,ID,地址,端口和/或ssl端口以及可选的payload(用户定义)。ServiceInstances以下列方式序列化并存储在ZooKeeper中:

5、zookeeper的java -Curator(服务注册与发现)_hadoop

2、ServiceProvider

ServiceProvider是主要的抽象类。它封装了发现服务为特定的命名服务和提供者策略。提供者策略方案是从一组给定的服务实例选择一个实例。有三个捆绑策略:轮询调度、随机和粘性(总是选择相同的一个)。
ServiceProviders是使用ServiceProviderBuilder分配的。消费者可以从ServiceDiscovery获取ServiceProviderBuilder。ServiceProviderBuilder允许设置服务名称和其他几个可选值。
必须通过调用start()来启动ServiceProvider 。完成后,应该调用close()。
ServiceProvider中有以下两个重要方法:

/** 获取一个服务实例 */
public ServiceInstance<T> getInstance() throws Exception;
/** 获取所有的服务实例 */
public Collection<ServiceInstance<T>> getAllInstances() throws Exception;

在使用curator 2.x(ZooKeep3.4.x)时,服务提供者对象必须由应用程序缓存并重用。因为服务提供者添加的内部NamespaceWatcher对象无法在ZooKeep3.4.x中删除,所以为每个对相同服务的调用创建一个新的服务提供者最终将耗尽JVM的内存。

3、ServiceDiscovery

为了创建ServiceProvider,必须有一个ServiceDiscovery。它是由一个ServiceDiscoveryBuilder创建。开始必须调用start()方法。当使用完成应该调用close()方法。
如果特定实例有I/O错误,等等。您应该调用ServiceProvider.NoteError(),并传入实例。ServiceProvider将临时将有错误的实例视为“关闭”。实例的阈值和超时是通过DownInstancePolicy设置的,该策略可以传递给ServiceProviderBuilder(注意:如果没有指定默认DownInstancePolicy,则使用默认DownInstancePolicy)。

对于更精细的控制,您可以使用以下方法:

1)、注册/注销服务

通常,将应用程序的服务描述符传递给ServiceDiscovery构造函数,它将自动注册/注销服务。但是,如果需要手动执行此操作,请使用以下方法:

/** 注册服务 */
public void registerService(ServiceInstance<T> service) throws Exception;
/** 注销服务 */
public void unregisterService(ServiceInstance<T> service) throws Exception;

2)、查询服务

您可以查询所有服务名称、特定服务的所有实例或单个服务实例。

/** 查询所有服务名称 */
public Collection<String> queryForNames() throws Exception;
/** 查询特定服务的所有实例 */
public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception;
/** 查询单个服务实例 */
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception;

3)、服务缓存

上述每个查询方法都直接调用ZooKeeper。
如果经常查询服务,还可以使用ServiceCache。它在内存中缓存特定服务的实例列表。它使用Watcher监听使列表保持最新。
可以通过ServiceDiscovery.serviceCacheBuilder()返回的构建器分配ServiceCache 。
通过调用start()启动ServiceCache对象,完成后,应调用close()。
然后可以通过调用以下内容获取服务的当前已知实例列表:

/** 获取缓存服务列表 */
public List<ServiceInstance<T>> getInstances();
ServiceCache支持在Watcher更新实例列表时收到通知的侦听器:
/**
 * Listener for changes to a service cache
 */
public interface ServiceCacheListener extends ConnectionStateListener {
    /**
     * Called when the cache has changed (instances added/deleted, etc.)
     */
    public void cacheChanged();
}

二、示例1

1、pom.xml

<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.22</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>5.3.0</version>
		</dependency>
		<!--https://mvnrepository.com/artifact/org.apache.curator/curator-client -->
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-client</artifactId>
			<version>5.3.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-x-discovery -->
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-x-discovery</artifactId>
			<version>5.3.0</version>
		</dependency>

2、ServiceInstance

该serviceInstance是经过扩展后的,也可以不需要那么多的属性。

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.Data;

/**
 * @author alanchan
 *
 */
@Data
public class InstanceDetails {
	public static final String ROOT_PATH = "/service";
	
	//该服务拥有哪些方法
	public Map<String, Service> services = new HashMap<>();

	//服务描述
	private String serviceDescription;

	public InstanceDetails() {
		this.serviceDescription = "";
	}

	public InstanceDetails(String serviceDescription) {
		this.serviceDescription = serviceDescription;
	}

	@Data
	public static class Service {
		//方法名称
		private String methodName;

		//方法描述
		private String description;

		//方法所需要的参数列表
		private List<String> params;

	}

}

3、ServiceProvider及ServiceDiscovery

import java.io.Closeable;
import java.io.IOException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

/**
 * @author alanchan
 *
 */
public class ExampleServer implements Closeable {

	private final ServiceDiscovery<InstanceDetails> serviceDiscovery;
	private final ServiceInstance<InstanceDetails> serviceInstance;

	public ExampleServer(CuratorFramework client, String path, String serviceName, String description)
			throws Exception {
		UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");

		serviceInstance = ServiceInstance
				.<InstanceDetails>builder()
				.name(serviceName)
				.payload(new InstanceDetails(description))
				.port((int) (65535 * Math.random()))
				.uriSpec(uriSpec)
				.build();

		JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
				InstanceDetails.class);

		serviceDiscovery = ServiceDiscoveryBuilder
				.builder(InstanceDetails.class)
				.client(client)
				.basePath(path)
				.serializer(serializer)
				.thisInstance(serviceInstance)
				.build();
	}

	public ServiceInstance<InstanceDetails> getServiceInstance() {
		return serviceInstance;
	}

	public void start() throws Exception {
		serviceDiscovery.start();
	}

	@Override
	public void close() throws IOException {
		CloseableUtils.closeQuietly(serviceDiscovery);
	}

}

4、注册服务的添加、删除、查询

本示例是通过键盘输入方式进行服务的注册、删除、查询操作。

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
//import org.apache.curator.shaded.com.google.common.base.Predicate;
//import org.apache.curator.shaded.com.google.common.collect.Iterables;
//import org.apache.curator.shaded.com.google.common.collect.Lists;
//import org.apache.curator.shaded.com.google.common.collect.Maps;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;
import org.apache.zookeeper.KeeperException;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan
 *
 */
public class ConsumerExampleServer {
	private static final String PATH = "/discovery/example";

	public static void main(String[] args) throws Exception {
		CuratorFramework client = null;
		ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
		Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
		try {
			client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy, Constant.CONNECTION_TIMEOUT,
					Constant.SESSION_TIMEOUT);
			client.start();

			JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
					InstanceDetails.class);

			serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH)
					.serializer(serializer).build();
			serviceDiscovery.start();
			// 服务列表
//			List<ExampleServer> servers = Lists.newArrayList();

//			testingServiceName 添加成功
//			1 type(s)
//			testingServiceName
//				desc: http://foo.com:57438
//			addInstance(String serviceName, String description, CuratorFramework client,List<ExampleServer> servers)
//			addInstance("testingServiceName","desc", client,  servers);
//		
			// deleteInstance(String serviceName, List<ExampleServer> servers)
//			deleteInstance("testingServiceName", servers);

			// listRandomInstance(String serviceName, ServiceDiscovery<InstanceDetails>
			// serviceDiscovery,Map<String, ServiceProvider<InstanceDetails>> providers)
//			listRandomInstance("testingServiceName", serviceDiscovery, providers);

			// listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery)
//			listInstances(serviceDiscovery);

			// 采用命令方式操作几个方法
			command(serviceDiscovery, providers, client);
		} finally {
			for (ServiceProvider<InstanceDetails> cache : providers.values()) {
				CloseableUtils.closeQuietly(cache);
			}
			CloseableUtils.closeQuietly(serviceDiscovery);
			CloseableUtils.closeQuietly(client);
		}
	}

	private static void command(ServiceDiscovery<InstanceDetails> serviceDiscovery,
			Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception {
		List<ExampleServer> servers = Lists.newArrayList();
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
			boolean done = false;
			while (!done) {
				System.out.print("> ");

				String line = in.readLine();
				if (line == null) {
					break;
				}

				String command = line.trim();
				String[] parts = command.split("\\s");
				if (parts.length == 0) {
					continue;
				}
				String operation = parts[0];
				String args[] = Arrays.copyOfRange(parts, 1, parts.length);

				if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
					done = true;
				} else if (operation.equals("add")) {
					addInstance(args[0], args[1], client, servers);
				} else if (operation.equals("delete")) {
					deleteInstance(args[0], servers);
				} else if (operation.equals("random")) {
					listRandomInstance(args[0], serviceDiscovery, providers);
				} else if (operation.equals("list")) {
					listInstances(serviceDiscovery);
				}
			}
		} finally {
			for (ExampleServer server : servers) {
				CloseableUtils.closeQuietly(server);
			}
		}
	}

	// 删除服务
	private static void deleteInstance(String serviceName, List<ExampleServer> servers) {
		ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() {
			@Override
			public boolean apply(ExampleServer server) {
				return server.getServiceInstance().getName().endsWith(serviceName);
			}
		}, null);
		if (server == null) {
			System.err.println("没有查到服务名称为: " + serviceName + "的服务");
			return;
		}

		servers.remove(server);
		CloseableUtils.closeQuietly(server);
		System.out.println("移除服务名称为:: " + serviceName + "的服务");
	}

	// 查看服务
	private static void listRandomInstance(String serviceName, ServiceDiscovery<InstanceDetails> serviceDiscovery,
			Map<String, ServiceProvider<InstanceDetails>> providers) throws Exception {

		ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
		if (provider == null) {
			provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName)
					.providerStrategy(new RandomStrategy<InstanceDetails>()).build();
			providers.put(serviceName, provider);
			provider.start();

			Thread.sleep(2500); // give the provider time to warm up - in a real application you wouldn't need
								// to do this
		}

		ServiceInstance<InstanceDetails> instance = provider.getInstance();
		if (instance == null) {
			System.err.println("没有查到服务名称为: " + serviceName + "的服务");
		} else {
			System.out.println("\t" + instance.getPayload().getServiceDescription() + ": " + instance.buildUriSpec());
		}
	}

	// 查看服务列表
	private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception {
		try {
			Collection<String> serviceNames = serviceDiscovery.queryForNames();

			for (String serviceName : serviceNames) {
				Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery
						.queryForInstances(serviceName);
				System.out.println(serviceName);
				for (ServiceInstance<InstanceDetails> instance : instances) {
					System.out.println(
							"\t" + instance.getPayload().getServiceDescription() + ": " + instance.buildUriSpec());
				}
			}

		} catch (KeeperException.NoNodeException e) {
			e.printStackTrace();
			System.err.println("没有找到服务注册实例.");
		} finally {
			CloseableUtils.closeQuietly(serviceDiscovery);
		}
	}

	// 添加服务
	private static void addInstance(String serviceName, String description, CuratorFramework client,
			List<ExampleServer> servers) throws Exception {
		ExampleServer server = new ExampleServer(client, PATH, serviceName, description);
		servers.add(server);
		server.start();

		System.out.println(serviceName + " 添加成功");
	}
}

5、验证示例1

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
//import org.apache.curator.shaded.com.google.common.base.Predicate;
//import org.apache.curator.shaded.com.google.common.collect.Iterables;
//import org.apache.curator.shaded.com.google.common.collect.Lists;
//import org.apache.curator.shaded.com.google.common.collect.Maps;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;
import org.apache.zookeeper.KeeperException;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

public class Test {
	private static final String PATH = "/discovery/example";

	public static void main(String[] args) throws Exception {

		CuratorFramework client = null;
		ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
		Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
		try {
			client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy, Constant.CONNECTION_TIMEOUT,
					Constant.SESSION_TIMEOUT);
			client.start();

			JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
					InstanceDetails.class);

			serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH)
					.serializer(serializer).build();
			serviceDiscovery.start();

			processCommands(serviceDiscovery, providers, client);
		} finally {
			for (ServiceProvider<InstanceDetails> cache : providers.values()) {
				CloseableUtils.closeQuietly(cache);
			}
			CloseableUtils.closeQuietly(serviceDiscovery);
			CloseableUtils.closeQuietly(client);
		}
	}

	private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery,
			Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception {
		printHelp();
		List<ExampleServer> servers = Lists.newArrayList();
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
			boolean done = false;
			while (!done) {
				System.out.print("> ");

				String line = in.readLine();
				if (line == null) {
					break;
				}

				String command = line.trim();
				String[] parts = command.split("\\s");
				if (parts.length == 0) {
					continue;
				}
				String operation = parts[0];
				String args[] = Arrays.copyOfRange(parts, 1, parts.length);

				if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
					printHelp();
				} else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
					done = true;
				} else if (operation.equals("add")) {
					addInstance(args, client, command, servers);
				} else if (operation.equals("delete")) {
					deleteInstance(args, command, servers);
				} else if (operation.equals("random")) {
					listRandomInstance(args, serviceDiscovery, providers, command);
				} else if (operation.equals("list")) {
					listInstances(serviceDiscovery);
				}
			}
		} finally {
			for (ExampleServer server : servers) {
				CloseableUtils.closeQuietly(server);
			}
		}
	}

	private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) {
		if (args.length != 1) {
			System.err.println("syntax error (expected delete <name>): " + command);
			return;
		}

		final String serviceName = args[0];
		ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() {
			@Override
			public boolean apply(ExampleServer server) {
				return server.getServiceInstance().getName().endsWith(serviceName);
			}
		}, null);

		if (server == null) {
			System.err.println("No servers found named: " + serviceName);
			return;
		}

		servers.remove(server);
		CloseableUtils.closeQuietly(server);
		System.out.println("Removed a random instance of: " + serviceName);
	}

	private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery,
			Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception {
		if (args.length != 1) {
			System.err.println("syntax error (expected random <name>): " + command);
			return;
		}

		String serviceName = args[0];
		ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
		if (provider == null) {
			provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName)
					.providerStrategy(new RandomStrategy<InstanceDetails>()).build();
			providers.put(serviceName, provider);
			provider.start();

			Thread.sleep(2500); // give the provider time to warm up - in a real application you wouldn't need
								// to do this
		}

		ServiceInstance<InstanceDetails> instance = provider.getInstance();
		if (instance == null) {
			System.err.println("No instances named: " + serviceName);
		} else {
			outputInstance(instance);
		}
	}

	private static void outputInstance(ServiceInstance<InstanceDetails> instance) {
		System.out.println("\t" + instance.getPayload().getServiceDescription() + ": " + instance.buildUriSpec());
	}

	private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception {
		try {
			Collection<String> serviceNames = serviceDiscovery.queryForNames();
			System.out.println(serviceNames.size() + " type(s)");
			for (String serviceName : serviceNames) {
				Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery
						.queryForInstances(serviceName);
				System.out.println(serviceName);
				for (ServiceInstance<InstanceDetails> instance : instances) {
					outputInstance(instance);
				}
			}

		} catch (KeeperException.NoNodeException e) {
			System.err.println("There are no registered instances.");
		} finally {
			CloseableUtils.closeQuietly(serviceDiscovery);
		}
	}

	private static void printHelp() {
		System.out.println(
				"An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n");
		System.out.println("add <name> <description>: Adds a mock service with the given name and description");
		System.out.println("delete <name>: Deletes one of the mock services with the given name");
		System.out.println("list: Lists all the currently registered services");
		System.out.println("random <name>: Lists a random instance of the service with the given name");
		System.out.println("quit: Quit the example");
		System.out.println();
	}

	private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers)
			throws Exception {
		// simulate a new instance coming up
		// in a real application, this would be a separate process

		if (args.length < 2) {
			System.err.println("syntax error (expected add <name> <description>): " + command);
			return;
		}

		StringBuilder description = new StringBuilder();
		for (int i = 1; i < args.length; ++i) {
			if (i > 1) {
				description.append(' ');
			}
			description.append(args[i]);
		}

		String serviceName = args[0];
		ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString());
		servers.add(server);
		server.start();

		System.out.println(serviceName + " added");
	}
}

运行该类,出现如下界面

An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:

add <name> <description>: Adds a mock service with the given name and description
delete <name>: Deletes one of the mock services with the given name
list: Lists all the currently registered services
random <name>: Lists a random instance of the service with the given name
quit: Quit the example

> list
There are no registered instances.
> add reg  testing
reg added
> list
1 type(s)
reg
	 testing: http://foo.com:35438
> delete reg
Removed a random instance of: reg
> list
There are no registered instances.
> random reg2
No instances named: reg2
> list
1 type(s)
reg2

注意,delete的时候,可能需要等到时间轮询到才会删除,执行命令可能不会理解被删除。

三、示例2

通过模拟真实使用方式示例,即,定义一个serviceInstance、定义不同的服务提供方并注册至zookeeper、供消费方进行消费。模式是生产者-消费者模式。
本示例以来示例1的pom.xml和部分的文件,此处不再赘述。

1、InstanceDetails

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.Data;

/**
 * @author alanchan
 *
 */
@Data
public class InstanceDetails {
	public static final String ROOT_PATH = "/service";
	
	//该服务拥有哪些方法
	public Map<String, Service> services = new HashMap<>();

	//服务描述
	private String serviceDescription;

	public InstanceDetails() {
		this.serviceDescription = "";
	}

	public InstanceDetails(String serviceDescription) {
		this.serviceDescription = serviceDescription;
	}

	@Data
	public static class Service {
		//方法名称
		private String methodName;

		//方法描述
		private String description;

		//方法所需要的参数列表
		private List<String> params;

	}

}

2、ProviderService

可以注册很多的服务。

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceInstanceBuilder;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

public class ProviderService {

	public static void main(String[] args) throws Exception {
		CuratorFramework client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy,
				Constant.CONNECTION_TIMEOUT, Constant.SESSION_TIMEOUT);
		client.start();
		client.blockUntilConnected();

		// 该服务中所有的接口
		Map<String, InstanceDetails.Service> services = new HashMap<>();

		// 添加订单服务接口
		InstanceDetails.Service addOrderService = new InstanceDetails.Service();
		addOrderService.setDescription("添加订单");
		addOrderService.setMethodName("addOrder");
		ArrayList<String> addOrderParams = new ArrayList<>();
		addOrderParams.add("createTime");
		addOrderParams.add("state");
		addOrderService.setParams(addOrderParams);

		services.put("addOrder", addOrderService);

		// 添加删除订单服务接口
		InstanceDetails.Service delOrderService = new InstanceDetails.Service();
		delOrderService.setDescription("删除订单");
		delOrderService.setMethodName("delOrder");
		ArrayList<String> delOrderParams = new ArrayList<>();
		delOrderParams.add("orderId");
		delOrderService.setParams(delOrderParams);

		services.put("delOrder", delOrderService);

		// 服务的其他信息
		InstanceDetails payload = new InstanceDetails();
		payload.setServiceDescription("订单服务");
		payload.setServices(services);

		// 服务构造器
		ServiceInstanceBuilder<InstanceDetails> serviceInstanceBuilder = ServiceInstance.builder();

		// 将订单服务添加到 ServiceInstance
		ServiceInstance<InstanceDetails> orderService = serviceInstanceBuilder
				.address("127.0.0.1")
				.port(8082)
				.name("OrderService")
				.payload(payload)
				.uriSpec(new UriSpec("{scheme}://{address}:{port}"))
				.build();

		// 构建 ServiceDiscovery 用来注册服务
		ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder
				.builder(InstanceDetails.class)
				.client(client)
				.serializer(new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class))
				.basePath(InstanceDetails.ROOT_PATH)
				.build();

		// 服务注册
		serviceDiscovery.registerService(orderService);
		serviceDiscovery.start();

		System.out.println("第一台服务注册成功......");

		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

		serviceDiscovery.close();
		client.close();
	}

}

3、ConsumerService

import java.util.Collection;
import java.util.Map;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.zookeeper_curator.App;
import org.zookeeper_curator.Constant;

/**
 * @author alanchan
 *
 */
public class ConsumerService {

	public static void main(String[] args) throws Exception {
		CuratorFramework client = App.createWithOptions(Constant.zkServerAddress, Constant.retryPolicy,
				Constant.CONNECTION_TIMEOUT, Constant.SESSION_TIMEOUT);
		client.start();
		client.blockUntilConnected();

		ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder
				.builder(InstanceDetails.class)
				.client(client)
				.basePath(InstanceDetails.ROOT_PATH)
				.serializer(new JsonInstanceSerializer<>(InstanceDetails.class))
				.build();
		serviceDiscovery.start();

		// 根据名称获取服务
		Collection<ServiceInstance<InstanceDetails>> services = serviceDiscovery.queryForInstances("OrderService");
		 System.out.println(services.toString());

		for (ServiceInstance<InstanceDetails> service : services) {
			// 获取请求的scheme 例如:http://127.0.0.1:8080
			String uriSpec = service.buildUriSpec();
			// 获取服务的其他信息
			InstanceDetails payload = service.getPayload();

			// 服务描述
			String serviceDesc = payload.getServiceDescription();
			// 获取该服务下的所有接口
			Map<String, InstanceDetails.Service> allService = payload.getServices();
			Set<Map.Entry<String, InstanceDetails.Service>> entries = allService.entrySet();

			for (Map.Entry<String, InstanceDetails.Service> entry : entries) {
				System.out.println(serviceDesc + uriSpec + "/" + service.getName() + "/" + entry.getKey()
						+ " 该方法需要的参数为:" + entry.getValue().getParams().toString());
			}
		}
		System.out.println("---------------------");
		Thread.sleep(3 * 1000);

	}

}

4、验证

该示例验证需要先启动ProviderService,再启动ConsumerService服务。
启动ProviderService后,控制台简单输出:
第一台服务注册成功…
启动ConsumerService服务后,控制台简单输出:

[ServiceInstance{name='OrderService', id='b6014a82-b924-4004-8790-27211457a23f', address='127.0.0.1', port=8082, sslPort=null, payload=InstanceDetails(services={delOrder=InstanceDetails.Service(methodName=delOrder, description=删除订单, params=[orderId]), addOrder=InstanceDetails.Service(methodName=addOrder, description=添加订单, params=[createTime, state])}, serviceDescription=订单服务), registrationTimeUTC=1659926704836, serviceType=DYNAMIC, uriSpec=org.apache.curator.x.discovery.UriSpec@6c2ac0dc, enabled=true}]
订单服务http://127.0.0.1:8082/OrderService/delOrder 该方法需要的参数为:[orderId]
订单服务http://127.0.0.1:8082/OrderService/addOrder 该方法需要的参数为:[createTime, state]
---------------------

如果启动多个ProviderService后(本例启动2个),每个控制台简单输出:
第一台服务注册成功…
启动ConsumerService服务后,控制台简单输出:

[ServiceInstance{name='OrderService', id='f79b7fb0-4979-4b32-a330-da028583fbf8', address='127.0.0.1', port=8082, sslPort=null, payload=InstanceDetails(services={delOrder=InstanceDetails.Service(methodName=delOrder, description=删除订单, params=[orderId]), addOrder=InstanceDetails.Service(methodName=addOrder, description=添加订单, params=[createTime, state])}, serviceDescription=订单服务), registrationTimeUTC=1682233116512, serviceType=DYNAMIC, uriSpec=org.apache.curator.x.discovery.UriSpec@6c2ac0dc, enabled=true}, ServiceInstance{name='OrderService', id='89e1f334-eae0-4116-8e83-dec65cfca8ea', address='127.0.0.1', port=8082, sslPort=null, payload=InstanceDetails(services={delOrder=InstanceDetails.Service(methodName=delOrder, description=删除订单, params=[orderId]), addOrder=InstanceDetails.Service(methodName=addOrder, description=添加订单, params=[createTime, state])}, serviceDescription=订单服务), registrationTimeUTC=1682232987649, serviceType=DYNAMIC, uriSpec=org.apache.curator.x.discovery.UriSpec@6c2ac0dc, enabled=true}]
订单服务http://127.0.0.1:8082/OrderService/delOrder 该方法需要的参数为:[orderId]
订单服务http://127.0.0.1:8082/OrderService/addOrder 该方法需要的参数为:[createTime, state]
订单服务http://127.0.0.1:8082/OrderService/delOrder 该方法需要的参数为:[orderId]
订单服务http://127.0.0.1:8082/OrderService/addOrder 该方法需要的参数为:[createTime, state]


标签:java,String,zookeeper,Curator,curator,org,apache,import,serviceDiscovery
From: https://blog.51cto.com/alanchan2win/6280457

相关文章

  • 3、zookeeper的选举----经验证符合事实,网上很多都是错误的
    目录Zookeeper系列文章目录一、概念1、Zookeeper节点状态2、事务ID二、集群初始化选举1、每个Server发出一个投票2、接受来自各个服务器的投票3、处理投票4、统计投票5、改变服务器状态三、集群重新选举1、变更状态2、每个Server会发出一个投票3、接收来自各个服务器的投票4、处......
  • 4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布
    目录Zookeeper系列文章目录一、zookeeper原生JavaAPI二、ZkClient三、Apachecurator1、pom.xml2、定义常量类3、连接实例化4、事务操作示例5、CRUD示例6、监听示例7、计数器示例1)、单机原子自增性实现1、Synchronized示例2、Lock示例3、AtomicInteger示例2)、分布式线程安全原子......
  • Prometheus系列---【zookeeper-exporter的安装】
    zookeeper-exporter的安装1.下载地址链接:2.上传到服务器指定目录cd/home/appusermkdirmonitor3.执行修改权限chmod+x/home/appuser/monitor/zookeeper_exporter4.启动zookeeper_exporternohup/home/appuser/monitor/zookeeper_exporter-bind-addr=:1887>/dev/n......
  • java数组去重_JAVA数组去重常用方法
    java数组去重_JAVA数组去重常用方法发布于 2022-09-1017:18:356950举报大家好,又见面了,我是你们的朋友全栈君。packagecom.zxj.test;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.HashMap;importjava.util.List;importjava.util.......
  • Java对象和json对象
    java对象和json对象转换接口classPersion1{@SerializedName("name")@ExposeStringname;publicvoidsetName(Stringname){this.name=name;}@OverridepublicStringtoString()......
  • java.lang.IllegalArgumentException: Illegal URL:
    问题描述: java.lang.IllegalArgumentException:IllegalURL:翻译为:IllegalArgumentException:非法网址:原因:网址错误,网址前面加http:// 后面加/ ;(如下图) ......
  • ChatGPT Plugin开发setup - Java(Spring Boot) Python(fastapi)
    记录一下快速模板,整体很简单,如果不接auth,只需要以下:提供一个/.well-known/ai-plugin.json接口,返回openAI所需要的格式提供openAPI规范的文档CORS设置其他的和普通的web开发类似.本地开发就直接使用localhost即可,前几天官方localhost无法联通,最近应该修复了.要让GPT......
  • 将java开发环境装入linux系统
    使用该指令下载jdkwgethttps://dragonwell.oss-cn-shanghai.aliyuncs.com/8.6.6/Alibaba_Dragonwell_8.6.6_x64_linux.tar.gz使用 tarxf+文件名 解压文件tarxfAlibaba_Dragonwell_8.6.6_x64_linux.tar.gz配置环境变量vim/etc/profile 打开环境变量文件exportJAV......
  • 直播平台搭建源码,java相册制作
    直播平台搭建源码,java相册制作 packagecn.demo2;importjavax.imageio.ImageIO;importjavax.swing.*;importjava.awt.*;importjava.awt.image.BufferedImage;importjava.io.IOException;/** *电子相册 */publicclassMyImageextendsJPanel{  //定义一个成员变量......
  • java.lang.reflect 包
    java.lang.reflect包提供了反射中用到类,主要的类说明如下:Constructor类:提供类的构造方法信息。Field类:提供类或接口中成员变量信息。Method类:提供类或接口成员方法信息。Array类:提供了动态创建和访问Java数组的方法。Modifier类:提供类和成员访问修饰符信息。......