首页 > 编程语言 >Java Websocket 02: 原生模式通过 Websocket 传输文件

Java Websocket 02: 原生模式通过 Websocket 传输文件

时间:2023-06-19 09:12:31浏览次数:46  
标签:02 info Java byteBuffer buffer ByteBuffer Websocket public condition

目录

Websocket 原生模式 传输文件

关于 Websocket 传输的消息类型, 允许的参数包括以下三类

  1. 以下类型之一, 同时只能出现一个
    • 文本类型 (text messages) 的消息: String, Java primitive, 阻塞的 Stream Reader, 带text decoder(Decoder.Text or Decoder.TextStream)的对象
    • 二进制类型 (binary messages) 的消息: byte[] 或 ByteBuffer, 阻塞的 InputStream, 带 binary decoder (Decoder.Binary or Decoder.BinaryStream)的对象
    • Pong messages: PongMessage
  2. 通过 PathParam 指定的0个或多个基础类型
  3. 会话参数 Session, 可选

因此对于不同的消息类型, 可以有不同参数类型的 onMessage() 方法, 分别用于处理不同格式的内容, 对于传输文件, 需要使用 ByteBuffer 类型的参数

void onMessage(ByteBuffer byteBuffer, Session session)

在处理过程中和普通的文件传输是一样的, 需要将文件分片传输, 并约定合适的消息头用于判断文件传输的阶段, 在服务端根据不同的阶段进行文件创建, 写入和结束.

演示项目

与前一篇项目结构相同, 只需要修改 SocketServer 和 SocketClient

完整示例代码: https://github.com/MiltonLai/websocket-demos/tree/main/ws-demo02

SocketServer.java

增加了 onMessage(ByteBuffer byteBuffer, Session session) 方法用于处理二进制消息, 在方法中

  1. 先读取第一个字节的值, 根据不同的值对应不同的操作
    • 1 表示文件传输前的准备
    • 3 表示文件内容写入
    • 5 表示文件结束
  2. 再读取后续的值
    • 1 解析出文件元信息, 并创建文件通道
    • 3 将内容写入文件
    • 5 关闭文件通道, 清除buffer
  3. 回传ACK
    • 1 ACK 2
    • 3 不ACK
    • 5 ACK 6
@Component
@ServerEndpoint("/websocket/server/{sessionId}")
public class SocketServer {

    //...

    @OnMessage
    public void onMessage(ByteBuffer byteBuffer, Session session) throws IOException {
        if (byteBuffer.limit() == 0) {
            return;
        }

        byte mark = byteBuffer.get(0);
        if (mark == 1) {
            log.info("mark 1");
            byteBuffer.get();
            String info = new String(
                    byteBuffer.array(),
                    byteBuffer.position(),
                    byteBuffer.limit() - byteBuffer.position());
            FileInfo fileInfo = new JsonMapper().readValue(info, FileInfo.class);
            byteChannel = Files.newByteChannel(
                    Path.of("D:/data/" + fileInfo.getFileName()),
                    new StandardOpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});
            //ack
            ByteBuffer buffer = ByteBuffer.allocate(4096);
            buffer.put((byte) 2);
            buffer.put("receive fileinfo".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            session.getBasicRemote().sendBinary(buffer);
        } else if (mark == 3) {
            log.info("mark 3");
            byteBuffer.get();
            byteChannel.write(byteBuffer);
        } else if (mark == 5) {
            log.info("mark 5");
            //ack
            ByteBuffer buffer = ByteBuffer.allocate(4096);
            buffer.clear();
            buffer.put((byte) 6);
            buffer.put("receive end".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            session.getBasicRemote().sendBinary(buffer);
            byteChannel.close();
            byteChannel = null;
        }
    }

    //...

    public static class FileInfo implements Serializable {
        private String fileName;
        private long fileSize;

        public String getFileName() {return fileName;}
        public void setFileName(String fileName) {this.fileName = fileName;}
        public long getFileSize() {return fileSize;}
        public void setFileSize(long fileSize) {this.fileSize = fileSize;}
    }
}

SocketClient.java

client 测试类, 连接后可以在命令行向 server 发送消息

首先是消息处理中增加了 void onMessage(ByteBuffer bytes), 这个是用来接收服务端回传的ACK的, 根据第一个字节, 判断服务端的处理结果. 这里使用了一个 condition.notify() 用来通知发送线程继续发送

其次是消息发送中, 用输入的1触发文件发送. 文件发送在 void sendFile(WebSocketClient webSocketClient, Object condition) 方法中进行, 通过一个 condition 对象, 在文件开始传输和结束传输时控制线程的暂停和继续. byteBuffer.flip()用于控制 byteBuffer 从状态变为状态, 用于发送. flip is used to flip the ByteBuffer from "reading from I/O" (putting) to "writing to I/O" (getting).

public class SocketClient {

    private static final Logger log = LoggerFactory.getLogger(SocketClient.class);

    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {

        Object condition = new Object();

        WebSocketClient wsClient = new WebSocketClient(new URI("ws://127.0.0.1:8763/websocket/server/10001")) {

            //...

            @Override
            public void onMessage(ByteBuffer bytes) {
                //To overwrite
                byte mark = bytes.get(0);
                if (mark == 2) {
                    synchronized (condition) {
                        condition.notify();
                    }
                    log.info("receive ack for file info");
                } else if (mark == 6){
                    synchronized (condition) {
                        condition.notify();
                    }
                    log.info("receive ack for file end");
                }
            }

            @Override
            public void onClose(int i, String s, boolean b) {
                log.info("On close: {}, {}, {}", i, s, b);
            }

            @Override
            public void one rror(Exception e) {
                log.error("On error: {}", e.getMessage());
            }
        };

        wsClient.connect();

        log.info("Connecting ...");
        while (!ReadyState.OPEN.equals(wsClient.getReadyState())) {

        }
        log.info("Connected");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String line = scanner.next();
            if ("1".equals(line))
                sendFile(wsClient, condition);
            else
                wsClient.send(line);
        }
    }

    public static void sendFile(WebSocketClient webSocketClient, Object condition){
        new Thread(() -> {
            try {
                SeekableByteChannel byteChannel = Files.newByteChannel(
                        Path.of("/home/milton/Backup/linux/apache-tomcat-8.5.58.tar.gz"),
                        new StandardOpenOption[]{StandardOpenOption.READ});

                ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);

                byteBuffer.put((byte)1);
                String info = "{\"fileName\": \"greproto.tar.gz\", \"fileSize\":"+byteChannel.size()+"}";
                byteBuffer.put(info.getBytes(StandardCharsets.UTF_8));
                byteBuffer.flip();
                webSocketClient.send(byteBuffer);
                synchronized (condition) {
                    condition.wait();
                }

                byteBuffer.clear();
                byteBuffer.put((byte)3);
                while (byteChannel.read(byteBuffer) > 0) {
                    byteBuffer.flip();
                    webSocketClient.send(byteBuffer);
                    byteBuffer.clear();
                    byteBuffer.put((byte)3);
                }

                byteBuffer.clear();
                byteBuffer.put((byte)5);
                byteBuffer.put("end".getBytes(StandardCharsets.UTF_8));
                byteBuffer.flip();
                webSocketClient.send(byteBuffer);
                synchronized (condition) {
                    condition.wait();
                }
                byteChannel.close();

            } catch (InterruptedException|IOException e) {
                log.error(e.getMessage(), e);
            }

        }).start();
    }
}

运行示例

示例是一个普通的 Spring Boot jar项目, 可以通过mvn clean package进行编译, 再通过java -jar ws-demo01.jar运行, 启动后工作在8763端口

将 SocketClient.java 中的文件路径 D:/WorkJava/tmp/greproto.tar.gz 换成自己本地的文件路径, 运行 SocketClient, 可以观察到服务端接收到的消息. 如果输入1并回车, 就会触发客户端往服务端传输文件

参考

标签:02,info,Java,byteBuffer,buffer,ByteBuffer,Websocket,public,condition
From: https://www.cnblogs.com/milton/p/17489019.html

相关文章

  • java限流
    @ComponentpublicclassLimiterUtil{@ResourceprivateRedisTemplate<String,String>redisTemplate;/***固定窗口限流算法**@returntrue限流false放行*/publicbooleanfixedWindow(Stringkey,intcount){longcountCache=redisTemplate.op......
  • daka :p java day 1!
    书写helloworld!publicclasshelloworld{publicstaticvoidmain(String[]arges){System.out.println("helloworld!");}}  ......
  • 2020羊城杯easyre
    2020羊城杯easyre无壳64位程序GCC编译回显如下所示:放IDA中继续分析:int__cdeclmain(intargc,constchar**argv,constchar**envp){intv3;//eaxintv4;//eaxintv5;//eaxcharStr[48];//[rsp+20h][rbp-60h]BYREFcharStr1[64];//[rsp......
  • 要禁用 Windows Server 2022 2025时自动打开服务器管理器,可以通过以下批处理命令实现
    要禁用WindowsServer20222025时自动打开服务器管理器,可以通过以下批处理命令实现:首先打开记事本,输入以下命令:@echooffregadd"HKLM\Software\Microsoft\ServerManager"/vDoNotOpenServerManagerAtLogon/tREG_DWORD/d1/f保存文件,将文件名后缀改为.bat。......
  • Java基础
    数据类型INFINITY和NaN//INFINITY定义publicstaticfinaldoublePOSITIVE_INFINITY=1.0/0.0;publicstaticfinaldoubleNEGATIVE_INFINITY=-1.0/0.0;publicstaticfinalfloatPOSITIVE_INFINITY=1.0f/0.0f;publicstaticfinalfloatNEGATIVE_INFINITY......
  • Java Websocket 01: 原生模式 Websocket 基础通信
    目录JavaWebsocket01:原生模式Websocket基础通信JavaWebsocket02:原生模式通过Websocket传输文件Websocket原生模式原生模式下服务端通过@ServerEndpoint实现其对应的@OnOpen,@OnClose,@OnMessage,@OnError方法客户端创建WebSocketClient实现对应的......
  • 2023.6.18拷逝
    T1如图,从\(x_1\)能且只能走到\(x_1+2,x_1+4,x_1+6...\)设\(f[x]\)表示从\(x_1\)走到\(x\)的方案数,那么如果\(x-x_1\)是偶数,那么\(f[x]=f[x-2]+f[x-4]+...+f[x_1]\),否则\(f[x]=0\)。初始值:\(f[x_1]=1\)。考虑\(f[x]\)的前几项。\(f[x_1]=1,f[x_1+2]=1,f[x_......
  • 到底什么是php javascript
    php就是将 静转动(静态页面转为动态页面),有些页面在你访问之前他不是真实存在的,而是依据你提交的东西而动态生成的html页面,比如使用搜索引擎时候,你提交了关键字php,搜索引擎会到数据库中找到与php相关的信息,然后将这些信息排序和组装成一个html页面,将这个实时生成的页面返回给你的浏......
  • Azure Blob Storage Java SDK使用SAS Token授权读取文件403报错
    问题描述代码如下,内容十分简单,只是listpath的操作。点击查看代码DataLakeServiceClientdataLakeServiceClient=newDataLakeServiceClientBuilder().endpoint(blob).sasToken(sasToken).buildClient();DataLakeFileSystemClienttestFs=dataLakeServic......
  • linux java调用sh脚本
    1、2、importorg.jeecg.zhongyi.auto_dep.util.CommandStreamGobbler;importorg.jeecg.zhongyi.util.LogbackUtil;importorg.jeecg.zhongyi.util.vo.Result;importjava.io.IOException;importjava.io.InputStreamReader;importjava.util.LinkedList;importjava.......