SpringBoot開(kāi)啟UDP服務(wù)發(fā)送及消費(fèi)UDP協(xié)議信息

SpringBoot 開(kāi)啟 UDP 服務(wù),進(jìn)行接收 UDP,及發(fā)送 UDP,這里依賴(lài)的是 SpringBoot 內(nèi)置 integration 包

代碼地址

1. Config

添加 Jar,下面用的是 SpringBoot 內(nèi)置 integration 依賴(lài)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-ip</artifactId>
</dependency>

2. Receiving

/**
 * UDP消息接收服務(wù)
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/5/20 14:16
 */
@Configuration
public class UdpServer {

    private static final Logger logger = LoggerFactory.getLogger(UdpServer.class);

    @Value("${udp.port}")
    private Integer udpPort;

    @Autowired
    private BusinessService businessService;

    /**
     * UDP消息接收服務(wù)寫(xiě)法一
     * https://docs.spring.io/spring-integration/reference/html/ip.html#inbound-udp-adapters-java-configuration
     *
     * @param
     * @return org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/14 11:00
     */
    /*@Bean
    public UnicastReceivingChannelAdapter unicastReceivingChannelAdapter() {
        // 實(shí)例化一個(gè)UDP消息接收服務(wù)
        UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(udpPort);
        // unicastReceivingChannelAdapter.setOutputChannel(new DirectChannel());
        unicastReceivingChannelAdapter.setOutputChannelName("udpChannel");
        logger.info("UDP服務(wù)啟動(dòng)成功,端口號(hào)為: {}", udpPort);
        return unicastReceivingChannelAdapter;
    }*/

    /**
     * UDP消息接收服務(wù)寫(xiě)法二
     * https://docs.spring.io/spring-integration/reference/html/ip.html#inbound-udp-adapters-java-dsl-configuration
     *
     * @param
     * @return org.springframework.integration.dsl.IntegrationFlow
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/20 16:08
     */
    @Bean
    public IntegrationFlow integrationFlow() {
        logger.info("UDP服務(wù)啟動(dòng)成功,端口號(hào)為: {}", udpPort);
        return IntegrationFlows.from(Udp.inboundAdapter(udpPort)).channel("udpChannel").get();
    }

    /**
     * 轉(zhuǎn)換器
     *
     * @param payload
     * @param headers
     * @return java.lang.String
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/20 15:30
     */
    @Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
    public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {
        String message = new String(payload);
        // 轉(zhuǎn)換為大寫(xiě)
        // message = message.toUpperCase();
        // 向客戶(hù)端響應(yīng),還不知道怎么寫(xiě)
        return message;
    }

    /**
     * 過(guò)濾器
     *
     * @param message
     * @param headers
     * @return boolean
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/20 15:30
     */
    @Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
    public boolean filter(String message, @Headers Map<String, Object> headers) {
        // 獲取來(lái)源Id
        String id = headers.get("id").toString();
        // 獲取來(lái)源IP,可以進(jìn)行IP過(guò)濾
        String ip = headers.get("ip_address").toString();
        // 獲取來(lái)源Port
        String port = headers.get("ip_port").toString();
        // 信息數(shù)據(jù)過(guò)濾
        /*if (message.indexOf("-") < 0) {
            // 沒(méi)有-的數(shù)據(jù)會(huì)被過(guò)濾
            return false;
        }*/
        return true;
    }

    /**
     * 路由分發(fā)處理器
     *
     * @param message
     * @param headers
     * @return java.lang.String
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/20 15:35
     */
    @Router(inputChannel = "udpRouter")
    public String router(String message, @Headers Map<String, Object> headers) {
        // 獲取來(lái)源Id
        String id = headers.get("id").toString();
        // 獲取來(lái)源IP,可以進(jìn)行IP過(guò)濾
        String ip = headers.get("ip_address").toString();
        // 獲取來(lái)源Port
        String port = headers.get("ip_port").toString();
        // 篩選,走那個(gè)處理器
        if (false) {
            return "udpHandle2";
        }
        return "udpHandle1";
    }

    /**
     * 最終處理器1
     *
     * @param message
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/20 15:12
     */
    @ServiceActivator(inputChannel = "udpHandle1")
    public void udpMessageHandle(String message) throws Exception {
        // 可以進(jìn)行異步處理
        businessService.udpHandleMethod(message);
        logger.info("UDP1:" + message);
    }

    /**
     * 最終處理器2
     *
     * @param message
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/5/14 11:02
     */
    @ServiceActivator(inputChannel = "udpHandle2")
    public void udpMessageHandle2(String message) throws Exception {
        logger.info("UDP2:" + message);
    }

}
/**
 * BusinessServiceImpl
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/5/20 11:59
 */
@Service
public class BusinessServiceImpl implements BusinessService {

    private static final Logger logger = LoggerFactory.getLogger(BusinessServiceImpl.class);

    @Override
    @Async("threadPoolTaskExecutor")
    public void udpHandleMethod(String message) throws Exception {
        logger.info("業(yè)務(wù)開(kāi)始處理");
        Thread.sleep(3000);
        logger.info("業(yè)務(wù)處理完成");
    }
}

3. Sending

兩種發(fā)送

3.1. UdpSimpleClient

/**
 * 默認(rèn)發(fā)送方式
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/5/20 15:53
 */
@Service
public class UdpSimpleClient {

    private final static Logger logger = LoggerFactory.getLogger(UdpSimpleClient.class);

    @Value("${udp.port}")
    private Integer udpPort;

    public void sendMessage(String message) {
        logger.info("發(fā)送UDP: {}", message);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", udpPort);
        byte[] udpMessage = message.getBytes();
        DatagramPacket datagramPacket = null;
        try (DatagramSocket datagramSocket = new DatagramSocket()) {
            datagramPacket = new DatagramPacket(udpMessage, udpMessage.length, inetSocketAddress);
            datagramSocket.send(datagramPacket);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("發(fā)送成功");
    }
}

3.2. UdpIntegrationClient

/**
 * IntegrationClientConfig
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/5/20 15:59
 */
@Configuration
public class UdpIntegrationClientConfig {

    @Value("${udp.port}")
    private Integer udpPort;

    @Bean
    @ServiceActivator(inputChannel = "udpOut")
    public UnicastSendingMessageHandler unicastSendingMessageHandler() {
        UnicastSendingMessageHandler unicastSendingMessageHandler = new UnicastSendingMessageHandler("localhost", udpPort);
        return unicastSendingMessageHandler;
    }

}
/**
 * Integration發(fā)送方式
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/5/20 15:53
 */
@Service
public class UdpIntegrationClient {

    private final static Logger logger = LoggerFactory.getLogger(UdpIntegrationClient.class);

    @Autowired
    private UnicastSendingMessageHandler unicastSendingMessageHandler;

    public void sendMessage(String message) {
        logger.info("發(fā)送UDP: {}", message);
        unicastSendingMessageHandler.handleMessage(MessageBuilder.withPayload(message).build());
        logger.info("發(fā)送成功");
    }

}

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容