照着官方文档上写,最后发现在消费端怎么也返回不了数据。在文档中也找不到怎么返回数据,查看官方demo也没有案例,各种搜索都找不到。
最后在源码中发现有一个RpcServer
类,经过一通研究终于跑通了,真不神人容易,消费端代码如下:
- 第一步:定义一个类继承
RpcServer
,重写父类的handleCall方法,在这个方法中返回需要的数据和接收客户端的数据,代码如下:
public class HandbookRpcServer extends RpcServer {
public HandbookRpcServer(Channel channel, String queueName) throws IOException {
super(channel, queueName);
}
@Override
public byte[] handleCall(Delivery request, AMQP.BasicProperties replyProperties) {
String input = new String(request.getBody());
System.out.println ("*** " + input + " ***");
return ("*** " + "hello rabbitmq" + " ***").getBytes();
}
}
- 第二步,在handler中 起动监听,代码如下:
@Autowired
Mono<Connection> connectionMono;
@PostConstruct
public void run() {
connectionMono.mapNotNull(f->{
Channel channel = null;
try {
channel = f.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
return channel;
}).subscribe(channel -> {
try {
RpcServer rpcServer = new HandbookRpcServer(channel,RPC_QUEUE);
new Thread(() -> {
try {
rpcServer.mainloop();
} catch (Exception e) {
// safe to ignore when loops ends/server is canceled
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
});
}
来自为知笔记(Wiz)
标签:reactor,RpcServer,rabbitmq,try,RPC,IOException,public,channel From: https://www.cnblogs.com/baiyifengyun/p/17132671.html