https://www.huangdf.xyz/categories/study-notes
南风
南风
发布于 2025-01-23 / 11 阅读
1
0

java文件传输及通信

前言

socet归属于java.net这个包。该包提供了实现网络应用程序所需的类。java.net大致可以分为两类。

低级api:处理嵌套字、接口和地址

高级api:处理统一资源定位符,统一资源标识符,链接

再简单说一下socket。socket是tcp/ip协议族通信的中间抽象层,是一组接口。

梳理一下socket交互的过程:

socket的通信先由服务端进行初始化,再绑定端口,最后再通过accept进行阻塞等待,直到有客户端进行连接。如果这个时候有客户端初始化了另一个socket,并连接(connect)成功,至此socket连接成功,客户端发送数据,服务端接收数据,服务端将处理结果传递给客户端,一次交互结束。

使用场景

传输文件

将某个服务器或者电脑的文件传递到另一个地方,这里有个前提如果是服务器的话,直接传就行,服务器大多是有固定公网ip的,而一般的家用电脑则是ip并不固定,当然,如果两台电脑处于同一个局域网也是可以的。

首先是服务端,有前言的描述,这里就不过多赘述了。代码如下

private void server() throws IOException {
        ServerSocket socket = new ServerSocket(8080);
        Socket accept = socket.accept();
        Path path = Paths.get("F:" + File.separator + "images");
        Path path1 = Paths.get("F:" + File.separator + "images", File.separator, "a.txt");
        File file1 = new File(path.toString());
        byte[] bytes = new byte[1024];
        int len;
        File file = new File(String.valueOf(path1));
        InputStream inputStream = accept.getInputStream();
        if (!file1.exists()) {
            file1.mkdirs();
        }
        FileOutputStream fileOutputStream = new FileOutputStream(path1.toString());
        while ((len = inputStream.read(bytes)) != -1) {
            fileOutputStream.write(bytes, 0, len);
        }
        inputStream.close();
        fileOutputStream.close();
        socket.close();
    }

服务端先监听8080这个端口,并通过accept来阻塞等待链接。当客户端链接成功,则程序继续进行。客户端代码如下。

private void client() throws Exception {
        log.info("pdjAccpetData ===== ");
        Socket socket=new Socket("192.168.3.4",8080);
        //读文件
        FileInputStream fileInputStream = new FileInputStream("E:\\a.txt");
        OutputStream ops = socket.getOutputStream();

        byte[] bytes = new byte[1024];
        int len;
        while ((len = fileInputStream.read(bytes)) != -1) {
            ops.write(bytes, 0, len);
        }
        ops.flush();
        socket.close();
    }

客户端初始化一个连接,连接到目标计算机的特定端口,将文件转换成流传递给目标计算机。。

文件传输完成之后客户端和服务端都将进行socket的关闭,清空流等操作。

这里只是简单的接收文件,这种应用属于单向通信且只能使用一次。下面是简单的双向通信。

这种连接是可靠的,使用的是tcp连接,还有另一种不可靠的udp连接这里不再赘述。

双向通信

要实现双向通信且不断开或者自己掌握断开时机的通信,需要将数据传输进行独立,简单来说就是写个死循环让socket一直读取流就行,读到特定字符就停止,反之一直循环下去。

下面是范例:

服务端监听端口,等待连接,连接成功后开启两个线程,一个负责读取信息,一个负责发送信息,两个线程读取和发送都是死循环。只有当读取或者发送的信息是q时会跳出循环。

public class Server{
    public static void main(String[] args) {
       try {
            ServerSocket server = new ServerSocket(8888);
            Socket socket = server.accept();
            System.out.println("----程序已经连接++++");
            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();
            InputStreamReader isr = new InputStreamReader(is);
            BufferedReader br = new BufferedReader(isr);
            PrintWriter pw = new PrintWriter(os);
            Runnable r1 = () -> {
                while (true) {
                    // 接受信息
                    String str;
                    try {
                        str = br.readLine();
                        System.out.println("sender的信息:" + str);
                        if("q".equals(str)){
                            break;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            };
            Runnable r2 = () -> {
                // 返回信息
                while (true) {
                    Scanner scan = new Scanner(System.in);
                    String msg = scan.nextLine();
                    pw.println(msg);
                    pw.flush();
                    if("q".equals(msg)){
                        break;
                    }
                }
            };
            Thread t1 = new Thread(r1);
            Thread t2 = new Thread(r2);
            t1.start();
            t2.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端同服务端区别只在于客户端是主动去连接服务端。

public class Client{
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1", 8888);
            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();
            InputStreamReader isr = new InputStreamReader(is);
            BufferedReader br = new BufferedReader(isr);
            PrintWriter pw = new PrintWriter(os);
            Runnable r = () -> {
                while (true) {
                    // 发送信息
                    try {
                        Scanner scan = new Scanner(System.in);
                        String msg = scan.nextLine();
                        pw.println(msg);
                        pw.flush();
                        if("q".equals(msg)){
                            break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        break;
                    }
                }
            };
            Runnable r2 = () -> {
                // 接受发送的信息
                while (true) {
                    String str;
                    try {
                        str = br.readLine();
                        System.out.println("接受者receiver:" + str);
                        if("q".equals(str)){
                            break;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        break;
                    }
                }
            };
            Thread t1 = new Thread(r);
            Thread t2 = new Thread(r2);
            t1.start();
            t2.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

以上是socket双向通信,如果将信息传递改成文件流,那也可以用来传输文件,但是进入了循环中,外部在想通过api去改变内部的参数是不太现实的,所以这个不适合用来传输一些不确定的东西,例如只当作是聊天用,那没问题,聊天过程中想传输文件就得话一番公司改造了,例如识别一下前缀file,如果是以file开头的话,就让他去获取文件等等。但这是中是不能集成到代码让他自适应运行的。所以下面粗浅介绍一下websocket。

websocket

一提到websocket大多会想到聊天室之类的东西,事实上也确实如此,聊天室作为它最基础的功能,可以衍生出很多新奇的东西,尤其是配合前端。

websocket具体的实现不太清楚,所以只介绍一下怎么用。依赖什么的就不赘述了,只说代码了。

websocket同样分为客户端和服务端。

服务端:

@Slf4j
@Component
@ServerEndpoint("/websocket-server/{name}")
public class WsServer {
    /**
     * 记录在线连接客户端数量
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    /**
     * 存放每个连接进来的客户端对应的websocketServer对象,用于转发消息
     */
    private static ConcurrentHashMap<String, WsServer> wsServers = new ConcurrentHashMap<>();
    private Session session;
    /**
     * 服务端与客户端连接成功时执行
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(@PathParam("name") String name, Session session) {
        log.info("=====================open===================");
        this.session = session;
        //集合中存入客户端对象+1
        Set<Map.Entry<String, WsServer>> entries = wsServers.entrySet();
        for (Map.Entry<String, WsServer> entry : entries) {
            if (entry.getKey().equals(name)) {
                try {
                    log.warn("清除旧链接{}", name);
                    entry.getValue().session.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        //接入的客户端+1
        int count = onlineCount.incrementAndGet();
        wsServers.put(name, this);
        log.info("与客户端连接成功,当前连接的客户端数量为:{}", count);
        log.info(wsServers.entrySet().toString());
        log.info("=====================open===================");
    }
    /**
     * 收到客户端的消息时执行
     *
     * @param file    消息
     * @param session 会话
     */
    @OnMessage(maxMessageSize = 8 * 1024 * 3)
    public void onMessage(String file, Session session) throws IOException {
        log.info("=====================message===================");
        DomeDTO dto = JSONObject.parseObject(file, DomeDTO.class);
        log.info(dto.getFilename());
        log.info("共有{}客户端建立链接,收到来自客户端 {} 的消息,转发给:{}", wsServers.size(), session.getId(), dto.getDestination());
        sendMessage(file, dto.getDestination());
        log.info("=====================message===================");
    }
    /**
     * 连接发生报错时执行
     *
     * @param session   会话
     * @param throwable 报错
     */
    @OnError
    public void onError(Session session, @NonNull Throwable throwable) {
        log.info("=====================error===================");
        log.error("连接发生报错");
        throwable.printStackTrace();
        log.info("=====================error===================");
    }
    /**
     * 连接断开时执行
     */
    @OnClose
    public void onClose() {
        log.info("=====================close===================");
        //接入客户端连接数-1
        int count = onlineCount.decrementAndGet();
        //集合中的客户端对象-1
        for (Map.Entry<String, WsServer> entry : wsServers.entrySet()) {
            if (entry.getValue().equals(this)) {
                wsServers.remove(entry.getKey());
            }
        }
        log.info("服务端断开连接,当前连接的客户端数量为:{},分别为:{}", count, wsServers.entrySet());
        log.info("=====================close===================");
    }
    /**
     * 向客户端推送消息
     *
     * @param message 消息
     */
    public void sendMessage(String message, String destination) {
        for (Map.Entry<String, WsServer> entry : wsServers.entrySet()) {
            if (destination.equalsIgnoreCase(entry.getKey())) {
                log.info(entry.getKey());
                entry.getValue().session.getAsyncRemote().sendText(message);
            }
        }
    }
    /**
     * 群发消息
     *
     * @param message 消息
     */
    public void sendMessageToAll(String message) {
        ConcurrentHashMap<String, WsServer> ws = wsServers;
        for (Map.Entry<String, WsServer> entry : ws.entrySet()) {
            entry.getValue().sendMessage(message, entry.getKey());
        }
    }
}

@Configuration
public class WsServerConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return  new ServerEndpointExporter();
    }
}

客户端:

@Slf4j
@Configuration
public class WsClientConfig {
    @Bean
    public WebSocketClient webSocketClient() {
        WebSocketClient client = null;
        try {
            client = new Client(new URI("ws://localhost:29091/websocket-server/No2"));
            client.connect();
            return client;
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        return client;
    }
}

@Slf4j
public class Client extends WebSocketClient {


    public Client(URI serverUri) {
        super(serverUri);
    }

    public Client(URI serverUri, Draft protocolDraft) {
        super(serverUri, protocolDraft);
    }

    public Client(URI serverUri, Map<String, String> httpHeaders) {
        super(serverUri, httpHeaders);
    }

    public Client(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders) {
        super(serverUri, protocolDraft, httpHeaders);
    }

    public Client(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders, int connectTimeout) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("连接成功");
        System.out.println(System.getProperty("user.dir"));
    }

    @Override
    public void onMessage(String s) {
        FileOutputStream fileOutputStream = null;
        try {
            SendFileDTO dto = JSONObject.parseObject(s, SendFileDTO.class);
            //写文件到本地
            byte[] bytes = dto.getData();
            String fileName = dto.getFilename();
            System.out.println(fileName);
            String substring = fileName.replace("\\",File.separator).substring(fileName.lastIndexOf(File.separator) + 1);
            Path path = Paths.get(System.getProperty("user.dir"), substring);
            File fileLocal=new File(path.toString().substring(0,path.toString().lastIndexOf(File.separator)));
            if (!fileLocal.exists()) {
                fileLocal.mkdirs();
            }
            fileOutputStream = new FileOutputStream(path.toString());
            fileOutputStream.write(bytes);
            fileOutputStream.close();
//返回消息
            File file = new File(path.toString());
            sendFileDTO.setDestination("No1");
            sendFileDTO.setMsg("ok");
            String string = JSON.toJSONString(sendFileDTO);
            //发送信息
            this.send(string);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("连接断开");
    }

    @Override
    public void onError(Exception e) {
        log.info("连接错误");
    }
}

上面服务端设定好了连接的规则,即需要是websocket-server/name类型的才能连接上。当连接成功之后,服务端会将会话session保存在一个map中,当出现重复name的session时,清除掉旧的连接,防止重复链接。在这里server仅作转发信息用,这也是无奈之举,业务中需要将云服务器上的文件传输到内网中的电脑上。

这个的优点是能够将其嵌入到代码中,使用起来更加灵活。这个也是能够聊天的,稍加改造就可以了。

以上为本次记录,从最开始的一次性socket到死循环socket再到最后的长连接。在业务的处理上也做到了因地制宜,根据不同的业务选择不同的技术。


评论