pom中加入依赖
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>2.1.0</version>
</dependency>
netty socketio 配置信息
# netty-socketio 配置
socketio:
host: 127.0.0.1
port: 8889
contextPath: /mwapi/ws/spl
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
# 设置http交互最大内容长度
maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)
bossCount: 1
workCount: 100
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
upgradeTimeout: 1000000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000
java socketio config配置
@Configuration
public class SocketIOConfig {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.contextPath}")
private String contextPath;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIOServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setSoLinger(0);
socketConfig.setReuseAddress(true);
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setSocketConfig(socketConfig);
config.setHostname(host);
config.setPort(port);
// config.setContext(contextPath);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setAllowCustomRequests(allowCustomRequests);
config.setUpgradeTimeout(upgradeTimeout);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
config.setOrigin("*");
SocketIOServer socketIOServer = new SocketIOServer(config);
socketIOServer.addNamespace("/mynamespace");
return socketIOServer;
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
java server 通用代码编写
@Component
public class MessageEventHandler {
private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
/**
* 服务器socket对象
*/
public SocketIOServer socketIoServer;
/**
* 客户端集合
*/
public List<String> listClient = new CopyOnWriteArrayList<>();
public SocketInstance socketInstance = SocketInstance.getSocketInstance();
/**
* 超时时间
*/
static final int limitSeconds = 60;
/**
* 初始化消息事件处理器
*
* @param server 服务器socket对象
*/
@Autowired
public MessageEventHandler(SocketIOServer server) {
logger.info("初始化SOCKET消息事件处理器");
this.socketIoServer = server;
}
/**
* 客户端发起连接时触发
*
* @param client 客户端Socket对象信息
*/
@OnConnect
public void onConnect(SocketIOClient client) {
logger.info("客户端{}已连接", client.getSessionId());
String sessionId = getSessionId(client);
listClient.add(sessionId);
socketInstance.insertSocketClient(sessionId, client);
//向前端发送接收数据成功标识
client.sendEvent("connect_success", "已经成功连接");
}
/**
* 客户端断开连接时触发
*
* @param client 客户端Socket对象信息
*/
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
logger.info("客户端{}断开连接", client.getSessionId());
String sessionId = getSessionId(client);
listClient.remove(sessionId);
socketInstance.remoteClient(sessionId);
}
/**
* 客户端发送消息时触发
*
* @param client 客户端Socket对象信息
* @param request AckRequest 回调对象
* @param data 消息信息实体
*/
@OnEvent(value = SocketConstants.SocketEvent.MESSAGE)
public void onEvent(SocketIOClient client, AckRequest request, String data) {
System.out.println("发来消息:" + data);
request.sendAckData("服务端已收到");
client.sendEvent("messageevent", "back data");
//socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
}
@OnEvent(value = SocketConstants.SocketEvent.BROADCAST)
public void onEventByBroadcast(SocketIOClient client, AckRequest request, String data) {
System.out.println("发来消息:" + data);
request.sendAckData("服务端-广播事件已收到");
client.sendEvent(SocketConstants.SocketEvent.BROADCAST, "广播事件 " + DateUtil.now());
//socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
}
/**
* 广播消息 函数可在其他类中调用
*/
public void sendBroadcast(byte[] data) {
//向已连接的所有客户端发送数据,map实现客户端的存储
for (SocketIOClient client : socketInstance.getClientSocketAll().values()) {
if (client.isChannelOpen()) {
client.sendEvent("message_event", data);
}
}
}
/**
* 获取客户端的session Id
*
* @param client: 客户端
*/
private String getSessionId(SocketIOClient client) {
return client.getSessionId().toString();
}
/**
* 获取连接的客户端ip地址
*
* @param client: 客户端
* @return 获取连接的客户端ip地址
*/
private String getIpByClient(SocketIOClient client) {
String sa = client.getRemoteAddress().toString();
return sa.substring(1, sa.indexOf(":"));
}
}
自定义命名空间,事件处理
@Slf4j
@Component
public class MyNamespaceHandler {
//测试使用
@OnEvent("message")
public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
log.info("SplSearch:{}", data);
if (ackRequest.isAckRequested()) {
//返回给客户端,说我接收到了
ackRequest.sendAckData("SplSearch", data);
}
}
}
项目启动加载并且项目关闭时关闭socket io
可以使用springboot方式也可以使用spring的方式,这里方式很多根据自己喜好来定
如果使用spring方式来处理可以实现:ApplicationListener<...>
实现类同时希望拥有启动加载和关闭销毁两个功能,可以这样做
- spring 实现应用监听
@Component
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent) {
// 处理应用程序启动事件
System.out.println("Application Started");
} else if (event instanceof ContextClosedEvent) {
// 处理应用程序关闭事件
System.out.println("Application Closed");
}
}
}
- springboot方式
@Slf4j
@Component
@Order(value = 1)
public class MyCommandLineRunner implements CommandLineRunner, DisposableBean {
private final SocketIOServer server;
private final MyNamespaceHandler myNamespaceHandler;
@Autowired
public MyCommandLineRunner(SocketIOServer server, MyNamespaceHandler myNamespaceHandler) {
this.myNamespaceHandler = myNamespaceHandler;
this.server = server;
System.out.println("初始化MyCommandLineRunner");
}
@Override
public void run(String... args) {
try {
server.getNamespace("/mynamespace").addListeners(myNamespaceHandler);
server.start();
System.out.println("socket.io启动成功!");
} catch (Exception ex) {
ex.printStackTrace();
}
}
@Override
public void destroy() {
//如果用kill -9 这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉
server.stop();
log.info("SocketIOServer==============================关闭成功");
}
}
唯一socket实例
public class SocketInstance {
/**
* 客户端Socket连接对象容器
*/
private static Map<String, SocketIOClient> socketClients = null;
/**
* 私有构造
*/
private SocketInstance() {
//从缓存中获取socketClients
socketClients = new HashMap<>();
}
/**
* 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为SocketInstanceHolder的类,
* 只有在SocketInstance.getSocketInstance()中调用,
* 由于私有的属性,他人无法使用SocketInstanceHolder,不调用SocketInstance.getSocketInstance()就不会创建实例。
* 优点:达到了lazy loading的效果,即按需创建实例。
* 无法适用于分布式集群部署
*/
private static class SocketInstanceHolder {
/**
* 创建全局唯一实例
*/
private final static SocketInstance instance = new SocketInstance();
}
/**
* 获取全局唯一实例
*
* @return SocketInstance对象
*/
public static SocketInstance getSocketInstance() {
return SocketInstanceHolder.instance;
}
/**
* 新增客户端连接到容器
*
* @param encode 设备En号
* @param socketIOClient 客户端socket对象
*/
public void insertSocketClient(String encode, SocketIOClient socketIOClient) {
socketClients.put(encode, socketIOClient);
}
/**
* 获取客户端Socket对象
*
* @param encode 设备encode
* @return 客户端Socket对象
*/
public SocketIOClient getClientSocket(String encode) {
return socketClients.get(encode);
}
/**
* 获取所有客户端Socket对象
*
* @return 客户端Socket对象
*/
public Map<String, SocketIOClient> getClientSocketAll() {
return socketClients;
}
/**
* 删除客户端
* @param sessionId 客户端的id
*/
public void remoteClient(String sessionId) {
SocketIOClient oldSocketIOClient = socketClients.get(sessionId);
if (oldSocketIOClient != null) {
try {
//关闭客户端连接
oldSocketIOClient.disconnect();
} catch (Exception ex) {
ex.printStackTrace();
}
}
socketClients.remove(sessionId);
}
}
public class SocketConstants {
/**
* Socket事件类
*/
public class SocketEvent {
/**
* 效验连接状况
*/
public static final String HEALTH_CHECK = "HEALTH_CHECK";
/**
* 消息接收事件名称
*/
public static final String MESSAGE = "message";
public static final String BROADCAST = "broadcast";
}
}
客户端代码
默认没有命名空间的,需要命名空间在url后面加/xxx
public class SocketIOClientLaunch {
public static void main(String[] args) {
// 服务端socket.io连接通信地址
String url = "http://127.0.0.1:8889";
try {
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.reconnectionAttempts = 2;
// 失败重连的时间间隔
options.reconnectionDelay = 1000;
// 连接超时时间(ms)
options.timeout = 500;
// userId: 唯一标识 传给服务端存储
final Socket socket = IO.socket(url + "?userId=1", options);
socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));
// 自定义事件`connected` -> 接收服务端成功连接消息
socket.on(SocketConstant.CONNECTION, objects -> {
int length = objects.length;
log.info("服务端自定义事件`connected`:" + objects[0].toString());
});
// 自定义事件`push_data_event` -> 接收服务端消息
socket.on(SocketConstant.PUSH_DATA_EVENT, objects -> log.info("服务端自定义`push_data_event`:" + objects[0].toString()));
// 自定义事件`myBroadcast` -> 接收服务端广播消息
socket.on(SocketConstant.BROADCAST, objects -> log.info("服务端广播消息:" + objects[0].toString()));
socket.connect();
while (true) {
Thread.sleep(3000);
// 自定义事件`push_data_event` -> 向服务端发送消息
socket.emit(SocketConstant.PUSH_DATA_EVENT, "发送数据 " + DateUtil.now());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
postman 调用方式
回调事件
发送事件
配置
下一篇:k8s中配置socket io,gateway 配置socket io
标签:springboot,记录,socketio,String,client,socket,public,客户端 From: https://www.cnblogs.com/xxsdnol/p/17884549.html