您的位置:首页技术文章
文章详情页

Springboot之整合Socket连接案例

【字号: 日期:2023-03-26 17:10:20浏览:2作者:猪猪

Socket连接与硬件通信

一、如何让socket随着springboot项目一起启动

SpringBoot中CommandLineRunner的作用:平常开发中有可能需要实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中

具体实现

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @author 易水●墨龙吟 * @Description * @create 2019-04-14 23:40 */@Componentpublic class TestRunner implements CommandLineRunner { @Autowired private SocketProperties properties; @Override public void run(String... args) throws Exception { ServerSocket server = null; Socket socket = null; server = new ServerSocket(properties.getPort()); System.out.println('设备服务器已经开启, 监听端口:' + properties.getPort()); ThreadPoolExecutor pool = new ThreadPoolExecutor(properties.getPoolCore(),properties.getPoolMax(),properties.getPoolKeep(),TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(properties.getPoolQueueInit()),new ThreadPoolExecutor.DiscardOldestPolicy() ); while (true) { socket = server.accept(); pool.execute(new ServerConfig(socket)); } }}

此处使用了自定义的线程池,提高对于socket的客户端处理能力。

二、自定义配置并使用

此处将socket的端口和线程池的一些配置放到 application.yml中使用,方便使用和修改

# Socket配置socket: # 监听端口 2323 port: 2323 # 线程池 - 保持线程数 20 pool-keep: 20 # 线程池 - 核心线程数 10 pool-core: 10 # 线程池 - 最大线程数 20 pool-max: 30 # 线程队列容量 10 pool-queue-init: 10

import lombok.Getter;import lombok.Setter;import lombok.ToString;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.PropertySource;import org.springframework.stereotype.Component;/** * @author 易水●墨龙吟 * @Description * @create 2019-04-18 22:35 */@Setter@Getter@ToString@Component@Configuration@PropertySource('classpath:application.yml')@ConfigurationProperties(prefix = 'socket')public class SocketProperties { private Integer port; private Integer poolKeep; private Integer poolCore; private Integer poolMax; private Integer poolQueueInit;}三、Socket对于客户端发来的信息的处理和重发机制

当客户端端连接之后发送信息,如果超时未发送,将会关闭,发送数据有异常将会返回给客户端一个error,让客户端在发送一次数据。

import com.farm.config.socket.resolve.MessageChain;import com.farm.service.EnvironmentService;import com.farm.service.impl.EnvironmentServiceImpl;import java.io.*;import java.net.Socket;import java.net.SocketException;import java.net.SocketTimeoutException;import java.util.Map;/** * @author 易水●墨龙吟 * @Description * @create 2019-04-14 23:21 */public class ServerConfig extends Thread { private Socket socket; public ServerConfig(Socket socket) { this.socket = socket; }// 获取spring容器管理的类,可以获取到sevrice的类 private EnvironmentService service = SpringUtil.getBean(EnvironmentServiceImpl.class); private String handle(InputStream inputStream) throws IOException, DataFormException { byte[] bytes = new byte[1024]; int len = inputStream.read(bytes); if (len != -1) { StringBuffer request = new StringBuffer(); request.append(new String(bytes, 0, len, 'UTF-8')); System.out.println('接受的数据: ' + request); System.out.println('from client ... ' + request + '当前线程' + Thread.currentThread().getName()); Map<String, String> map = MessageChain.out(request.toString()); System.out.println('处理的数据' + map); Integer res = service.addEnvironment(map); if (res == 1) {return 'ok'; } else {throw new DataFormException('数据处理异常'); } } else { throw new DataFormException('数据处理异常'); } } @Override public void run() { BufferedWriter writer = null; try { // 设置连接超时9秒 socket.setSoTimeout(9000); System.out.println('客户 - ' + socket.getRemoteSocketAddress() + ' -> 机连接成功'); InputStream inputStream = socket.getInputStream(); writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); String result = null; try {result = handle(inputStream);writer.write(result);writer.newLine();writer.flush(); } catch (IOException | DataFormException | IllegalArgumentException e) {writer.write('error');writer.newLine();writer.flush();System.out.println('发生异常');try { System.out.println('再次接受!'); result = handle(inputStream); writer.write(result); writer.newLine(); writer.flush();} catch (DataFormException | SocketTimeoutException ex) { System.out.println('再次接受, 发生异常,连接关闭');} } } catch (SocketException socketException) { socketException.printStackTrace(); try {writer.close(); } catch (IOException ioException) {ioException.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } finally { try {writer.close(); } catch (IOException e) {e.printStackTrace(); } } }}

在此处有一个坑,如果客户端是用C/C++编写的,必须使用如下方法:

byte[] bytes = new byte[1024];int len = inputStream.read(bytes);

如果使用readLine或者 DataInputStream dataInputStream =new DataInputStream(socket.getInputStream())这样会出现使用TCP连接助手,客户端发送数据收不到。

四、如何在普通类中使用Spring注入类

这里需要使用一个工具类。

import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;/** * @author 易水●墨龙吟 * @Description * @create 2019-04-15 0:01 */@Componentpublic class SpringUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringUtil.applicationContext == null) { SpringUtil.applicationContext = applicationContext; } } /** * 获取applicationContext * @return */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 通过name获取 Bean. * @param name * @return */ public static Object getBean(String name){ return getApplicationContext().getBean(name); } /** * 通过class获取Bean. * @param clazz * @param <T> * @return */ public static <T> T getBean(Class<T> clazz){ return getApplicationContext().getBean(clazz); } /** * 通过name,以及Clazz返回指定的Bean * @param name * @param clazz * @param <T> * @return */ public static <T> T getBean(String name,Class<T> clazz){ return getApplicationContext().getBean(name, clazz); }}

补充:springboot下websocket前台后端数据长连接

首先导入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.security</groupId> <artifactId>spring-security-messaging</artifactId> </dependency>

spring-security-messaging 是后面继承 AbstractSecurityWebSocketMessageBrokerConfigurer需要用到的依赖

WebSocketConfig

@Configuration@EnableWebSocketMessageBroker //此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { /** * 注册 Stomp的端点 * addEndpoint:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址 * withSockJS:指定端点使用SockJS协议 */ registry.addEndpoint('/websocket/tracker') //物流消息通道, .setAllowedOrigins('*') //允许跨域,里面路径可以设定 .withSockJS() //指定协议 .setInterceptors(httpSessionHandshakeInterceptor()) ; //设置拦截器() } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /** * 配置消息代理 * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker */ registry.enableSimpleBroker('/topic','/user'); } //拦截器 @Bean public HandshakeInterceptor httpSessionHandshakeInterceptor() { return new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {//可以在这里先判断登录是否合法return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { //握手成功后, } }; }}

WebsocketSecurityConfiguration

@Configurationpublic class WebsocketSecurityConfiguration extends AbstractSecurityWebSocketMessageBrokerConfigurer { @Override protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) { messages .nullDestMatcher().authenticated() .simpDestMatchers('/topic/**').authenticated() .simpDestMatchers('/user/**').authenticated() .simpTypeMatchers(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE).denyAll() // catch all .anyMessage().denyAll(); } /** * Disables CSRF for Websockets. */ @Override protected boolean sameOriginDisabled() { return true; }}

WebSocketResource

package com.gleam.shopmall.web.rest;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationListener;import org.springframework.messaging.handler.annotation.MessageMapping;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.messaging.simp.SimpMessageHeaderAccessor;import org.springframework.messaging.simp.SimpMessageMappingInfo;import org.springframework.messaging.simp.SimpMessageSendingOperations;import org.springframework.stereotype.Controller;import org.springframework.web.socket.messaging.SessionDisconnectEvent;@Controllerpublic class WebSocketResource { private static final Logger log = LoggerFactory.getLogger(WebSocketResource.class); @Autowired SimpMessageSendingOperations messagingTemplate; //此方法适用于网页聊天室,从前端接收数据,返回订阅者(前端) @MessageMapping('/welcome') //指定要接收消息的地址,类似@RequestMapping @SendTo('/topic/getResponse') //默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”} public String say(String message) throws Exception { return message; } //发送指定用户(直接从后端发送数据到前端) public void sendToUser(String login,String channel, String info) { log.debug('[ToUser]WEBSOCKET发送消息, username={}, info={}', login, info); this.messagingTemplate.convertAndSendToUser(login, channel, info); log.debug('[ToUser]WEBSOCKET发送消息:完成'); } //发送所有订阅的(直接从后端发送数据到前端) public void send(String channel, String info) { log.debug('[ToAll]WEBSOCKET发送消息, info={}', info); // this.messagingTemplate.convertAndSend(channel, info); this.messagingTemplate.convertAndSend('/topic/getResponse', '接收到了吗?'); log.debug('[ToAll]WEBSOCKET发送消息:完成'); }}

前端html

<!DOCTYPE html><html xmlns:th='http://www.thymeleaf.org'><head> <meta charset='UTF-8' /> <script src='http://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js'></script> <script src='https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js'></script> <script src='http://code.jquery.com/jquery-1.7.2.min.js'></script> <script src='http://pv.sohu.com/cityjson?ie=utf-8'></script> <title>Spring Boot+WebSocket+广播式</title> <script type='text/javascript'> var stompClient = null; function setConnected(connected) { document.getElementById(’connect’).disabled = connected; document.getElementById(’disconnect’).disabled = !connected; document.getElementById(’conversationDiv’).style.visibility = connected ? ’visible’ : ’hidden’; $(’#response’).html(); } function connect() { // websocket的连接地址,此值等于WebSocketConfig中registry.addEndpoint('/websocket/tracker').withSockJS()配置的地址, //这里如果是微服务或者远端,需要全路径 var socket = new SockJS(’/websocket/tracker’); //1 stompClient = Stomp.over(socket);//2 stompClient.connect({}, function(frame) {//3setConnected(true);console.log(’开始进行连接Connected: ’ + frame);// 客户端订阅消息的目的地址:此值等于WebSocketResource中@SendTo('/topic/getResponse')注解的里配置的值stompClient.subscribe(’/topic/getResponse’, function(respnose){ //4 showResponse(respnose.body);}); }); } function disconnect() { if (stompClient != null) {stompClient.disconnect(); } setConnected(false); console.log('Disconnected'); } function sendName() { var name = $(’#name’).val(); stompClient.send('/welcome', {}, returnCitySN[’cip’] +':'+name);// JSON.stringify(name) } function showResponse(message) { var response = $('#response'); response.html(message+'<br>' + response.html()); } </script></head><body onload='disconnect()'><noscript><h2 style='color: red'>貌似你的浏览器不支持websocket</h2></noscript><div> <div> <button onclick='connect();' style='color: red'>连接</button> <button disabled='disabled' onclick='disconnect();'>断开连接</button> </div> <div id='conversationDiv'> <label>输入内容</label><input type='text' /> <button onclick='sendName();'>发送</button> <p id='response'></p> </div></div></body></html>```

以上为个人经验,希望能给大家一个参考,也希望大家多多支持好吧啦网。如有错误或未考虑完全的地方,望不吝赐教。

标签: Spring
相关文章: