前言
socet归属于java.net这个包。该包提供了实现网络应用程序所需的类。java.net大致可以分为两类。
低级api:处理嵌套字、接口和地址
高级api:处理统一资源定位符,统一资源标识符,链接
再简单说一下socket。socket是tcp/ip协议族通信的中间抽象层,是一组接口。
梳理一下socket交互的过程:
socket的通信先由服务端进行初始化,再绑定端口,最后再通过accept进行阻塞等待,直到有客户端进行连接。如果这个时候有客户端初始化了另一个socket,并连接(connect)成功,至此socket连接成功,客户端发送数据,服务端接收数据,服务端将处理结果传递给客户端,一次交互结束。
使用场景
传输文件
将某个服务器或者电脑的文件传递到另一个地方,这里有个前提如果是服务器的话,直接传就行,服务器大多是有固定公网ip的,而一般的家用电脑则是ip并不固定,当然,如果两台电脑处于同一个局域网也是可以的。
首先是服务端,有前言的描述,这里就不过多赘述了。代码如下
private void server() throws IOException {
ServerSocket socket = new ServerSocket(8080);
Socket accept = socket.accept();
Path path = Paths.get("F:" + File.separator + "images");
Path path1 = Paths.get("F:" + File.separator + "images", File.separator, "a.txt");
File file1 = new File(path.toString());
byte[] bytes = new byte[1024];
int len;
File file = new File(String.valueOf(path1));
InputStream inputStream = accept.getInputStream();
if (!file1.exists()) {
file1.mkdirs();
}
FileOutputStream fileOutputStream = new FileOutputStream(path1.toString());
while ((len = inputStream.read(bytes)) != -1) {
fileOutputStream.write(bytes, 0, len);
}
inputStream.close();
fileOutputStream.close();
socket.close();
}
服务端先监听8080这个端口,并通过accept来阻塞等待链接。当客户端链接成功,则程序继续进行。客户端代码如下。
private void client() throws Exception {
log.info("pdjAccpetData ===== ");
Socket socket=new Socket("192.168.3.4",8080);
//读文件
FileInputStream fileInputStream = new FileInputStream("E:\\a.txt");
OutputStream ops = socket.getOutputStream();
byte[] bytes = new byte[1024];
int len;
while ((len = fileInputStream.read(bytes)) != -1) {
ops.write(bytes, 0, len);
}
ops.flush();
socket.close();
}
客户端初始化一个连接,连接到目标计算机的特定端口,将文件转换成流传递给目标计算机。。
文件传输完成之后客户端和服务端都将进行socket的关闭,清空流等操作。
这里只是简单的接收文件,这种应用属于单向通信且只能使用一次。下面是简单的双向通信。
这种连接是可靠的,使用的是tcp连接,还有另一种不可靠的udp连接这里不再赘述。
双向通信
要实现双向通信且不断开或者自己掌握断开时机的通信,需要将数据传输进行独立,简单来说就是写个死循环让socket一直读取流就行,读到特定字符就停止,反之一直循环下去。
下面是范例:
服务端监听端口,等待连接,连接成功后开启两个线程,一个负责读取信息,一个负责发送信息,两个线程读取和发送都是死循环。只有当读取或者发送的信息是q时会跳出循环。
public class Server{
public static void main(String[] args) {
try {
ServerSocket server = new ServerSocket(8888);
Socket socket = server.accept();
System.out.println("----程序已经连接++++");
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
PrintWriter pw = new PrintWriter(os);
Runnable r1 = () -> {
while (true) {
// 接受信息
String str;
try {
str = br.readLine();
System.out.println("sender的信息:" + str);
if("q".equals(str)){
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
};
Runnable r2 = () -> {
// 返回信息
while (true) {
Scanner scan = new Scanner(System.in);
String msg = scan.nextLine();
pw.println(msg);
pw.flush();
if("q".equals(msg)){
break;
}
}
};
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端同服务端区别只在于客户端是主动去连接服务端。
public class Client{
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 8888);
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
PrintWriter pw = new PrintWriter(os);
Runnable r = () -> {
while (true) {
// 发送信息
try {
Scanner scan = new Scanner(System.in);
String msg = scan.nextLine();
pw.println(msg);
pw.flush();
if("q".equals(msg)){
break;
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
};
Runnable r2 = () -> {
// 接受发送的信息
while (true) {
String str;
try {
str = br.readLine();
System.out.println("接受者receiver:" + str);
if("q".equals(str)){
break;
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
};
Thread t1 = new Thread(r);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上是socket双向通信,如果将信息传递改成文件流,那也可以用来传输文件,但是进入了循环中,外部在想通过api去改变内部的参数是不太现实的,所以这个不适合用来传输一些不确定的东西,例如只当作是聊天用,那没问题,聊天过程中想传输文件就得话一番公司改造了,例如识别一下前缀file,如果是以file开头的话,就让他去获取文件等等。但这是中是不能集成到代码让他自适应运行的。所以下面粗浅介绍一下websocket。
websocket
一提到websocket大多会想到聊天室之类的东西,事实上也确实如此,聊天室作为它最基础的功能,可以衍生出很多新奇的东西,尤其是配合前端。
websocket具体的实现不太清楚,所以只介绍一下怎么用。依赖什么的就不赘述了,只说代码了。
websocket同样分为客户端和服务端。
服务端:
@Slf4j
@Component
@ServerEndpoint("/websocket-server/{name}")
public class WsServer {
/**
* 记录在线连接客户端数量
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
/**
* 存放每个连接进来的客户端对应的websocketServer对象,用于转发消息
*/
private static ConcurrentHashMap<String, WsServer> wsServers = new ConcurrentHashMap<>();
private Session session;
/**
* 服务端与客户端连接成功时执行
*
* @param session 会话
*/
@OnOpen
public void onOpen(@PathParam("name") String name, Session session) {
log.info("=====================open===================");
this.session = session;
//集合中存入客户端对象+1
Set<Map.Entry<String, WsServer>> entries = wsServers.entrySet();
for (Map.Entry<String, WsServer> entry : entries) {
if (entry.getKey().equals(name)) {
try {
log.warn("清除旧链接{}", name);
entry.getValue().session.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
//接入的客户端+1
int count = onlineCount.incrementAndGet();
wsServers.put(name, this);
log.info("与客户端连接成功,当前连接的客户端数量为:{}", count);
log.info(wsServers.entrySet().toString());
log.info("=====================open===================");
}
/**
* 收到客户端的消息时执行
*
* @param file 消息
* @param session 会话
*/
@OnMessage(maxMessageSize = 8 * 1024 * 3)
public void onMessage(String file, Session session) throws IOException {
log.info("=====================message===================");
DomeDTO dto = JSONObject.parseObject(file, DomeDTO.class);
log.info(dto.getFilename());
log.info("共有{}客户端建立链接,收到来自客户端 {} 的消息,转发给:{}", wsServers.size(), session.getId(), dto.getDestination());
sendMessage(file, dto.getDestination());
log.info("=====================message===================");
}
/**
* 连接发生报错时执行
*
* @param session 会话
* @param throwable 报错
*/
@OnError
public void onError(Session session, @NonNull Throwable throwable) {
log.info("=====================error===================");
log.error("连接发生报错");
throwable.printStackTrace();
log.info("=====================error===================");
}
/**
* 连接断开时执行
*/
@OnClose
public void onClose() {
log.info("=====================close===================");
//接入客户端连接数-1
int count = onlineCount.decrementAndGet();
//集合中的客户端对象-1
for (Map.Entry<String, WsServer> entry : wsServers.entrySet()) {
if (entry.getValue().equals(this)) {
wsServers.remove(entry.getKey());
}
}
log.info("服务端断开连接,当前连接的客户端数量为:{},分别为:{}", count, wsServers.entrySet());
log.info("=====================close===================");
}
/**
* 向客户端推送消息
*
* @param message 消息
*/
public void sendMessage(String message, String destination) {
for (Map.Entry<String, WsServer> entry : wsServers.entrySet()) {
if (destination.equalsIgnoreCase(entry.getKey())) {
log.info(entry.getKey());
entry.getValue().session.getAsyncRemote().sendText(message);
}
}
}
/**
* 群发消息
*
* @param message 消息
*/
public void sendMessageToAll(String message) {
ConcurrentHashMap<String, WsServer> ws = wsServers;
for (Map.Entry<String, WsServer> entry : ws.entrySet()) {
entry.getValue().sendMessage(message, entry.getKey());
}
}
}
@Configuration
public class WsServerConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
客户端:
@Slf4j
@Configuration
public class WsClientConfig {
@Bean
public WebSocketClient webSocketClient() {
WebSocketClient client = null;
try {
client = new Client(new URI("ws://localhost:29091/websocket-server/No2"));
client.connect();
return client;
} catch (URISyntaxException e) {
e.printStackTrace();
}
return client;
}
}
@Slf4j
public class Client extends WebSocketClient {
public Client(URI serverUri) {
super(serverUri);
}
public Client(URI serverUri, Draft protocolDraft) {
super(serverUri, protocolDraft);
}
public Client(URI serverUri, Map<String, String> httpHeaders) {
super(serverUri, httpHeaders);
}
public Client(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders) {
super(serverUri, protocolDraft, httpHeaders);
}
public Client(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders, int connectTimeout) {
super(serverUri, protocolDraft, httpHeaders, connectTimeout);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("连接成功");
System.out.println(System.getProperty("user.dir"));
}
@Override
public void onMessage(String s) {
FileOutputStream fileOutputStream = null;
try {
SendFileDTO dto = JSONObject.parseObject(s, SendFileDTO.class);
//写文件到本地
byte[] bytes = dto.getData();
String fileName = dto.getFilename();
System.out.println(fileName);
String substring = fileName.replace("\\",File.separator).substring(fileName.lastIndexOf(File.separator) + 1);
Path path = Paths.get(System.getProperty("user.dir"), substring);
File fileLocal=new File(path.toString().substring(0,path.toString().lastIndexOf(File.separator)));
if (!fileLocal.exists()) {
fileLocal.mkdirs();
}
fileOutputStream = new FileOutputStream(path.toString());
fileOutputStream.write(bytes);
fileOutputStream.close();
//返回消息
File file = new File(path.toString());
sendFileDTO.setDestination("No1");
sendFileDTO.setMsg("ok");
String string = JSON.toJSONString(sendFileDTO);
//发送信息
this.send(string);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void onClose(int i, String s, boolean b) {
log.info("连接断开");
}
@Override
public void onError(Exception e) {
log.info("连接错误");
}
}
上面服务端设定好了连接的规则,即需要是websocket-server/name类型的才能连接上。当连接成功之后,服务端会将会话session保存在一个map中,当出现重复name的session时,清除掉旧的连接,防止重复链接。在这里server仅作转发信息用,这也是无奈之举,业务中需要将云服务器上的文件传输到内网中的电脑上。
这个的优点是能够将其嵌入到代码中,使用起来更加灵活。这个也是能够聊天的,稍加改造就可以了。
以上为本次记录,从最开始的一次性socket到死循环socket再到最后的长连接。在业务的处理上也做到了因地制宜,根据不同的业务选择不同的技术。