SpringBoot 開(kāi)啟 UDP 服務(wù),進(jìn)行接收 UDP,及發(fā)送 UDP,這里依賴(lài)的是 SpringBoot 內(nèi)置 integration 包
代碼地址
- Github: https://github.com/dolyw/ProjectStudy/tree/master/SpringBoot/AsyncDemo
- Gitee(碼云): https://gitee.com/dolyw/ProjectStudy/tree/master/SpringBoot/AsyncDemo
1. Config
添加 Jar,下面用的是 SpringBoot 內(nèi)置 integration 依賴(lài)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
</dependency>
2. Receiving
/**
* UDP消息接收服務(wù)
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 14:16
*/
@Configuration
public class UdpServer {
private static final Logger logger = LoggerFactory.getLogger(UdpServer.class);
@Value("${udp.port}")
private Integer udpPort;
@Autowired
private BusinessService businessService;
/**
* UDP消息接收服務(wù)寫(xiě)法一
* https://docs.spring.io/spring-integration/reference/html/ip.html#inbound-udp-adapters-java-configuration
*
* @param
* @return org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/14 11:00
*/
/*@Bean
public UnicastReceivingChannelAdapter unicastReceivingChannelAdapter() {
// 實(shí)例化一個(gè)UDP消息接收服務(wù)
UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(udpPort);
// unicastReceivingChannelAdapter.setOutputChannel(new DirectChannel());
unicastReceivingChannelAdapter.setOutputChannelName("udpChannel");
logger.info("UDP服務(wù)啟動(dòng)成功,端口號(hào)為: {}", udpPort);
return unicastReceivingChannelAdapter;
}*/
/**
* UDP消息接收服務(wù)寫(xiě)法二
* https://docs.spring.io/spring-integration/reference/html/ip.html#inbound-udp-adapters-java-dsl-configuration
*
* @param
* @return org.springframework.integration.dsl.IntegrationFlow
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 16:08
*/
@Bean
public IntegrationFlow integrationFlow() {
logger.info("UDP服務(wù)啟動(dòng)成功,端口號(hào)為: {}", udpPort);
return IntegrationFlows.from(Udp.inboundAdapter(udpPort)).channel("udpChannel").get();
}
/**
* 轉(zhuǎn)換器
*
* @param payload
* @param headers
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:30
*/
@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {
String message = new String(payload);
// 轉(zhuǎn)換為大寫(xiě)
// message = message.toUpperCase();
// 向客戶(hù)端響應(yīng),還不知道怎么寫(xiě)
return message;
}
/**
* 過(guò)濾器
*
* @param message
* @param headers
* @return boolean
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:30
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(String message, @Headers Map<String, Object> headers) {
// 獲取來(lái)源Id
String id = headers.get("id").toString();
// 獲取來(lái)源IP,可以進(jìn)行IP過(guò)濾
String ip = headers.get("ip_address").toString();
// 獲取來(lái)源Port
String port = headers.get("ip_port").toString();
// 信息數(shù)據(jù)過(guò)濾
/*if (message.indexOf("-") < 0) {
// 沒(méi)有-的數(shù)據(jù)會(huì)被過(guò)濾
return false;
}*/
return true;
}
/**
* 路由分發(fā)處理器
*
* @param message
* @param headers
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:35
*/
@Router(inputChannel = "udpRouter")
public String router(String message, @Headers Map<String, Object> headers) {
// 獲取來(lái)源Id
String id = headers.get("id").toString();
// 獲取來(lái)源IP,可以進(jìn)行IP過(guò)濾
String ip = headers.get("ip_address").toString();
// 獲取來(lái)源Port
String port = headers.get("ip_port").toString();
// 篩選,走那個(gè)處理器
if (false) {
return "udpHandle2";
}
return "udpHandle1";
}
/**
* 最終處理器1
*
* @param message
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:12
*/
@ServiceActivator(inputChannel = "udpHandle1")
public void udpMessageHandle(String message) throws Exception {
// 可以進(jìn)行異步處理
businessService.udpHandleMethod(message);
logger.info("UDP1:" + message);
}
/**
* 最終處理器2
*
* @param message
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/14 11:02
*/
@ServiceActivator(inputChannel = "udpHandle2")
public void udpMessageHandle2(String message) throws Exception {
logger.info("UDP2:" + message);
}
}
/**
* BusinessServiceImpl
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 11:59
*/
@Service
public class BusinessServiceImpl implements BusinessService {
private static final Logger logger = LoggerFactory.getLogger(BusinessServiceImpl.class);
@Override
@Async("threadPoolTaskExecutor")
public void udpHandleMethod(String message) throws Exception {
logger.info("業(yè)務(wù)開(kāi)始處理");
Thread.sleep(3000);
logger.info("業(yè)務(wù)處理完成");
}
}
3. Sending
兩種發(fā)送
3.1. UdpSimpleClient
/**
* 默認(rèn)發(fā)送方式
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:53
*/
@Service
public class UdpSimpleClient {
private final static Logger logger = LoggerFactory.getLogger(UdpSimpleClient.class);
@Value("${udp.port}")
private Integer udpPort;
public void sendMessage(String message) {
logger.info("發(fā)送UDP: {}", message);
InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", udpPort);
byte[] udpMessage = message.getBytes();
DatagramPacket datagramPacket = null;
try (DatagramSocket datagramSocket = new DatagramSocket()) {
datagramPacket = new DatagramPacket(udpMessage, udpMessage.length, inetSocketAddress);
datagramSocket.send(datagramPacket);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
logger.info("發(fā)送成功");
}
}
3.2. UdpIntegrationClient
/**
* IntegrationClientConfig
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:59
*/
@Configuration
public class UdpIntegrationClientConfig {
@Value("${udp.port}")
private Integer udpPort;
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler unicastSendingMessageHandler() {
UnicastSendingMessageHandler unicastSendingMessageHandler = new UnicastSendingMessageHandler("localhost", udpPort);
return unicastSendingMessageHandler;
}
}
/**
* Integration發(fā)送方式
*
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:53
*/
@Service
public class UdpIntegrationClient {
private final static Logger logger = LoggerFactory.getLogger(UdpIntegrationClient.class);
@Autowired
private UnicastSendingMessageHandler unicastSendingMessageHandler;
public void sendMessage(String message) {
logger.info("發(fā)送UDP: {}", message);
unicastSendingMessageHandler.handleMessage(MessageBuilder.withPayload(message).build());
logger.info("發(fā)送成功");
}
}
參考