$(function () {
getMsg();
});
function getMsg()
{
$.ajax({
url:"/polling/msg",
type:"get",
data:{},
success:function(data)
{
if(data != null && data!="")
alertShow(data.msg);
getMsg();
}
});
}
/**
*
* @author {chensg}:2016年6月1日
* example
*
*/
@Controller
@RequestMapping("/polling/")
public class PollingController {
@Autowired
MessageContainer messageContainer; //全局存放每一个user创建的DeferredResult实例,key:userId,value:DeferredResult
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 长轮询
* @return
*/
@RequestMapping(value="msg", method=RequestMethod.GET)
public @ResponseBody DeferredResult<UserMessage> getMessage() {
final String userId = (UserDetails) SecurityContextHolder.getContext()
.getAuthentication()
.getPrincipal().getUsername();
DeferredResult<UserMessage> result = new DeferredResult<UserMessage>(30000l,null); //设置超时30s,超时返回null
final Map<String, DeferredResult> resultMap=messageContainer.getUserMessages();
resultMap.put(userId, result);
result.onCompletion(new Runnable()
{
@Override
public void run() {
resultMap.remove(userId);
}
});
return result;
}
/**
* test 新增需要推给某某用户的消息
* @return
*/
@RequestMapping(value="msg", method=RequestMethod.POST)
public @ResponseBody RestResult addMessage(String msg,String userId) {
UserMessage userMsg = new UserMessage();
userMsg.setUserId(userId);
userMsg.setMsg(msg);
//系统或者其他用户需要推送的消息放入消息队列
rabbitTemplate.convertAndSend("test.exchange", "test.binding", userMsg);
return null;
}
}
页面加载完成时,该用户请求/polling/msg控制器接口,接口里会创建一个DeferredResult实例,设置超时30S,超时返回null。DeferredResult<?> 允许应用程序从一个线程中返回,而何时返回则由线程决定
消息实体类
public class UserMessage implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String userId;
private String msg;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
配置rabbitMQ
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd">
<!-- 创建一个connectionFactory -->
<rabbit:connection-factory id="rabbitConnectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="/" />
<!-- 创建一个rabbitTemplate, 设置retryTemplate -->
<rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory"
retry-template="retryTemplate" />
<!-- 创建一个retryTemplate -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!-- 创建一个用于消息推送的队列 -->
<rabbit:queue id="testQueue" name="test.polling" />
<rabbit:direct-exchange name="test.exchange">
<rabbit:bindings>
<rabbit:binding queue="test.polling" key="test.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 创建一个消息处理器 -->
<bean id="servicePollingHandler"
class="com.xxx.controller.test.ServicePollingHandler" />
<!-- 绑定监听器和队列 -->
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener ref="servicePollingHandler"
method="testPollingHandle"
queues="testQueue" />
</rabbit:listener-container>
</beans>
public class ServicePollingHandler {
@Autowired
MessageContainer messageContainer;
public void testPollingHandle(UserMessage userMessage)
{
Map<String, DeferredResult> msgContainer = messageContainer.getUserMessages();
DeferredResult<UserMessage> deferredResult = msgContainer.get(userMessage.getUserId());
if (deferredResult!=null){
deferredResult.setResult(userMessage); //调用setResult(),线程返回信息。
}
}
}
@PropertySource(value="classpath:application.properties")
@ImportResource({"classpath:amqp.xml"})
public class RootConfig {
@Bean
public MessageContainer messageContainer() {
return new MessageContainer();
}
}
public class MessageContainer {
private ConcurrentHashMap<String, DeferredResult> userMessages = new ConcurrentHashMap<String, DeferredResult>(); //线程安全
public ConcurrentHashMap<String, DeferredResult> getUserMessages() {
return userMessages;
}
}
该例子的用途,当一个用户登录页面时,异步请求后台/polling/msg,后台创建一个线程,维持改长连接30s,当超时或者返回信息,页面则再次请求后台,维持一个30s的长连接(长轮询)。
系统或者其他用户调用/polling/msg method:post,传入msg与userId,控制器把消息放入消息队列,消息队列把消息推送到ServicePollingHandler类testPollingHandle()方法,该方法根据userId获得该用户登陆之后的页面长轮询创建的deferredResult实例,调用setResult,页面接受到线程返回消息。
可以基于以上代码,实现web聊天
转自http://chenshangge.iteye.com/blog/2302710