webflux 實(shí)現(xiàn)服務(wù)端推送消息

實(shí)現(xiàn)即時(shí)消息的方法有很多種比如websocket,sse; 而sse 又有spring mvc 實(shí)現(xiàn)的也有webflux 實(shí)現(xiàn)的。mvc實(shí)現(xiàn)的網(wǎng)上已經(jīng)有很多了,而webflux 實(shí)現(xiàn)的不是很多,也不是很全,因此本文主要做的是webflux 實(shí)現(xiàn)的即時(shí)消息,sse 這里不多講,如果有不理解的可以自行百度,谷歌。
maven 依賴在最下面
下面是最簡單的實(shí)現(xiàn)也是應(yīng)用場景最少的實(shí)現(xiàn)

    @GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
        // 每兩秒推送一次
        return Flux.interval(Duration.ofSeconds(2)).map(seq->
            Tuples.of(seq, LocalDateTime.now())).log()
                .map(data-> ServerSentEvent.<String>builder().id("1").data(data.getT2().toString()).build());
    }

上面的適合股票之類的,周期性的消息。比如每兩秒發(fā)送一次消息;這樣的場景是合適的,但是如果是非周期性的消息呢?比如我需要再應(yīng)用里發(fā)一個(gè)公告,這個(gè)公告是突然的,不確定的,那么這個(gè)邏輯就不合適了。
下面介紹非周期性消息


import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * @author haoran
 */
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
    private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();

    @GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    public Flux<String> getMessage(){
        return Flux.create(stringFluxSink -> {
            MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
            // 用戶斷開的時(shí)候取消訂閱
            stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
            // 訂閱消息
            subscribableChannel.subscribe(messageHandler);
        }, FluxSink.OverflowStrategy.LATEST);
    }


    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        subscribableChannel.send(new GenericMessage<>(event.getSource()));
    }
    @PostMapping("/publish")
    public void publish(@RequestParam String message){
        subscribableChannel.send(new GenericMessage<>(message));

    }
}

這里有個(gè)局限性 就是單服務(wù)的消息,那如果是多服務(wù)的集群消息怎么解決呢?
下面代碼是使用redis 的發(fā)布訂閱模式來實(shí)現(xiàn)webflux 的sse 集群

import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * @author haoran
 */
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
    private final RedissonClient redissonClient;
    public static final String USER_TOPIC = "user:";
    public static final String BROADCAST_TOPIC = "broadcast_topic";

    @GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<MessageDTO> getFolderWatch(@PathVariable String userId) {
        return Flux.create(sink -> {
            // 訂閱 廣播
            redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
                sink.next(m);
            });
            // 監(jiān)聽 用戶主題 單個(gè)
            redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
                sink.next(m);
            });
            //加入監(jiān)聽如果斷開鏈接就移除redis 的訂閱
            sink.onCancel(() -> {
                // 斷開移除
                System.out.println("退出 userId:" + userId);
                redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
                redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
            });
        }, FluxSink.OverflowStrategy.LATEST);
    }

    @PostMapping("/publish")
    public void publish(@RequestBody MessageDTO messageDTO) {
        redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
    }
}

redisson 配置

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 這個(gè)地方不可使用 json 序列化,否則會(huì)有問題,會(huì)出現(xiàn)一個(gè) java.lang.IllegalArgumentException: Value must not be null! 錯(cuò)誤
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }
}

@Slf4j
@Configuration
public class RedissonConfigure {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://localhost:6379");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        config.setCodec(new JsonJacksonCodec(objectMapper));
        return Redisson.create(config);
    }
}

其他類

import java.io.Serializable;

/**
 * @author haoran
 */
@Data
public class MessageDTO implements Serializable {
    private String message;
}

調(diào)試:


postman
服務(wù)8080
服務(wù)8081

由此可見當(dāng)我從8080 服務(wù)發(fā)送消息,8080,8081兩個(gè)服務(wù)都接收到消息了

maven 依賴

    <parent>
        <artifactId>webfluxdemo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>server</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <dependency>
            <groupId>de.ruedigermoeller</groupId>
            <artifactId>fst</artifactId>
            <version>2.57</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

父pom

    <groupId>org.example</groupId>
    <artifactId>webfluxdemo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
    </parent>
    <modules>
        <module>client</module>
        <module>server</module>
        <module>RxJava</module>
    </modules>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>

    </dependencyManagement>
    <!-- ... -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <url>https://repo.spring.io/snapshot</url>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <url>https://repo.spring.io/milestone</url>
        </pluginRepository>
    </pluginRepositories>

參考·1 Reactor 3 參考文檔 (htmlpreview.github.io)
參考·2 https://www.lefer.cn/posts/30624/
結(jié)語:百度真垃圾,查了半天也沒找到,終歸要google;本文只是簡單的實(shí)現(xiàn)了sse 在真實(shí)場景下會(huì)有很多不足,比如redis 加入訂閱的是通過lamda 表達(dá)式實(shí)現(xiàn)的,這里最好有個(gè)實(shí)現(xiàn)類來實(shí)現(xiàn)訂閱發(fā)送消息的業(yè)務(wù)。
題外話:webflux 如何實(shí)現(xiàn)響應(yīng)式報(bào)表?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容