rabbitmq 可靠性投遞(一)之項(xiàng)目搭建

首先來看看我們可靠性投遞的流程圖

image

流程的示意圖如上所示,比如我下單成功了,這時(shí)進(jìn)行 step1,對(duì)我的業(yè)務(wù)數(shù)據(jù)進(jìn)行入庫(kù),業(yè)務(wù)數(shù)據(jù)入庫(kù)完畢(這里要特別注意一定要保證業(yè)務(wù)數(shù)據(jù)入庫(kù))再對(duì)要發(fā)送的消息進(jìn)行入庫(kù),圖中采用了兩個(gè)數(shù)據(jù)庫(kù),可以根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景來確定是否采用兩個(gè)數(shù)據(jù)庫(kù),如果采用了兩個(gè)數(shù)據(jù)庫(kù),有人可能就像到了采用分布式事務(wù)來保證數(shù)據(jù)的一致性,但是在大型互聯(lián)網(wǎng)中,基本很少采用事務(wù),都是采用補(bǔ)償機(jī)制。

對(duì)業(yè)務(wù)數(shù)據(jù)和消息入庫(kù)完畢就進(jìn)入 setp2,發(fā)送消息到 MQ 服務(wù)上,按照正常的流程就是消費(fèi)者監(jiān)聽到該消息,就根據(jù)唯一 id 修改該消息的狀態(tài)為已消費(fèi),并給一個(gè)確認(rèn)應(yīng)答 ack 到 Listener。如果出現(xiàn)意外情況,消費(fèi)者未接收到或者 Listener 接收確認(rèn)時(shí)發(fā)生網(wǎng)絡(luò)閃斷,接收不到,這時(shí)候就需要用到我們的分布式定時(shí)任務(wù)來從 msg 數(shù)據(jù)庫(kù)抓取那些超時(shí)了還未被消費(fèi)的消息,重新發(fā)送一遍。重試機(jī)制里面要設(shè)置重試次數(shù)限制,因?yàn)橐恍┩獠康脑驅(qū)е乱恢卑l(fā)送失敗的,不能重試太多次,要不然會(huì)拖垮整個(gè)服務(wù)。例如重試三次還是失敗的,就把消息的 status 設(shè)置成 發(fā)送失敗,然后通過補(bǔ)償機(jī)制,人工去處理。實(shí)際生產(chǎn)中,這種情況還是比較少的,但是你不能沒有這個(gè)補(bǔ)償機(jī)制,要不然就做不到可靠性了。

這一小節(jié),我們介紹項(xiàng)目的基本搭建

先來看看項(xiàng)目依賴

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> 
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
         <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
        
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        
        <!-- 添加JDBC jar --> 
        <dependency>
          <groupId>org.mybatis.spring.boot</groupId>
          <artifactId>mybatis-spring-boot-starter</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>tk.mybatis</groupId>
          <artifactId>mapper-spring-boot-starter</artifactId>
          <version>1.1.0</version>
        </dependency>    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.24</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- mybatis分頁插件 -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>  
            <groupId>com.github.miemiedev</groupId>  
            <artifactId>mybatis-paginator</artifactId>  
            <version>1.2.17</version>  
            <exclusions>
                <exclusion>
                    <groupId>org.mybatis</groupId>
                    <artifactId>mybatis</artifactId>
                </exclusion>
            </exclusions>            
        </dependency>               
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
             <version>3.7</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
         <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>   
         <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>20.0</version>
        </dependency>
         <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope> 
        </dependency>   
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.hmily.dubbo</groupId>
            <artifactId>common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>   
    </dependencies>

來看看包結(jié)構(gòu)


image

然后就是 application.properties 的配置

server.port=8030

server.servlet.context-path=/

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

db.driverLocation=G:/test/MySQL/mysql-connector-java-5.1.6-bin.jar
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://130.67.151.179:3306/rabbitmq_common?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=admin

mybatis.type-aliases-package=com.hmily.rabbitmq.rabbitmqcommon
mybatis.mapper-locations=classpath:mapping/*.xml

logging.level.tk.mybatis=TRACE

##########################  server setting for LongID Gene  ####################################
snowFlake.workerId = 1
snowFlake.datacenterId = 1

# Dubbo Config properties
dubbo.application.id=rabbitmq-common
dubbo.application.name=rabbitmq-common
dubbo.application.qosPort=22212
dubbo.application.qosEnable=true
dubbo.scan.basePackages=com.hmily.rabbitmq.rabbitmqcommon.*
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=12343
dubbo.registry.id=rabbitmq-common-registry
dubbo.registry.address=zookeeper://130.67.151.179:2181

# Enables Dubbo All Endpoints
management.endpoint.dubbo.enabled = true
management.endpoint.dubbo-shutdown.enabled = true
management.endpoint.dubbo-configs.enabled = true
management.endpoint.dubbo-services.enabled = true
management.endpoint.dubbo-references.enabled = true
management.endpoint.dubbo-properties.enabled = true

# Dubbo Health
## StatusChecker Name defaults (default : "memory", "load" )
management.health.dubbo.status.defaults = memory
## StatusChecker Name extras (default : empty )
management.health.dubbo.status.extras = load,threadpool

spring.rabbitmq.addresses=130.67.151.179:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true


snowFlakeServiceApi.version=1.0.0

order.rabbitmq.listener.order.queue.name=order-create
order.rabbitmq.listener.order.queue.durable=true
order.rabbitmq.listener.order.exchange.name=order
order.rabbitmq.listener.order.exchange.durable=true
order.rabbitmq.listener.order.exchange.type=topic
order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
order.rabbitmq.listener.order.key=order.*
order.rabbitmq.send.create.key=order.create

druid.properties 的配置如下:

##下面為連接池的補(bǔ)充設(shè)置,應(yīng)用到上面所有數(shù)據(jù)源中
#初始化大小,最小,最大
druid.initialSize=5
druid.minIdle=10
druid.maxActive=300
#配置獲取連接等待超時(shí)的時(shí)間
druid.maxWait=60000
#配置間隔多久才進(jìn)行一次檢測(cè),檢測(cè)需要關(guān)閉的空閑連接,單位是毫秒 
druid.timeBetweenEvictionRunsMillis=60000
#配置一個(gè)連接在池中最小生存的時(shí)間,單位是毫秒
druid.minEvictableIdleTimeMillis=300000
druid.validationQuery=SELECT 1 FROM DUAL
druid.testWhileIdle=true
druid.testOnBorrow=false
druid.testOnReturn=false
#打開PSCache,并且指定每個(gè)連接上PSCache的大小
druid.poolPreparedStatements=true
druid.maxPoolPreparedStatementPerConnectionSize=20
#配置監(jiān)控統(tǒng)計(jì)攔截的filters,去掉后監(jiān)控界面sql無法統(tǒng)計(jì),'wall'用于防火墻 
druid.filters=stat,wall,log4j
#通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合并多個(gè)DruidDataSource的監(jiān)控?cái)?shù)據(jù)
druid.useGlobalDataSourceStat=true

再展開看配置包里面的結(jié)構(gòu)


image

先看看主配置文件

@Configuration
@MapperScan(basePackages = "com.hmily.rabbitmq.rabbitmqcommon.mapper")
@ComponentScan(basePackages = {"com.hmily.rabbitmq.rabbitmqcommon.*", "com.hmily.rabbitmq.rabbitmqcommon.config.*"})
public class MainConfig {

}

接著是從配置文件 druid.properties 獲取 DruidDataSource 的信息

@Component
@ConfigurationProperties(prefix="spring.datasource") 
@PropertySource("classpath:druid.properties")
public class DruidDataSourceSettings {

    private String driverClassName;
    private String url;
    private String username;
    private String password;
    
    @Value("${druid.initialSize}")
    private int initialSize;
    
    @Value("${druid.minIdle}")
    private int minIdle;
    
    @Value("${druid.maxActive}")
    private int maxActive;
    
    @Value("${druid.timeBetweenEvictionRunsMillis}")
    private long timeBetweenEvictionRunsMillis;
    
    @Value("${druid.minEvictableIdleTimeMillis}")
    private long minEvictableIdleTimeMillis;
    
    @Value("${druid.validationQuery}")
    private String validationQuery;
    
    @Value("${druid.testWhileIdle}")
    private boolean testWhileIdle;
    
    @Value("${druid.testOnBorrow}")
    private boolean testOnBorrow;
    
    @Value("${druid.testOnReturn}")
    private boolean testOnReturn;
    
    @Value("${druid.poolPreparedStatements}")
    private boolean poolPreparedStatements;
    
    @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")
    private int maxPoolPreparedStatementPerConnectionSize;
    
    @Value("${druid.filters}")
    private String filters;
    
    @Value("${druid.connectionProperties}")
    private String connectionProperties;
    
    @Bean
    public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){
        return new PropertySourcesPlaceholderConfigurer();
    }
    
    public String getDriverClassName() {
        return driverClassName;
    }
    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public int getInitialSize() {
        return initialSize;
    }
    public void setInitialSize(int initialSize) {
        this.initialSize = initialSize;
    }
    public int getMinIdle() {
        return minIdle;
    }
    public void setMinIdle(int minIdle) {
        this.minIdle = minIdle;
    }
    public int getMaxActive() {
        return maxActive;
    }
    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;
    }
    public long getTimeBetweenEvictionRunsMillis() {
        return timeBetweenEvictionRunsMillis;
    }
    public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
        this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
    }
    public long getMinEvictableIdleTimeMillis() {
        return minEvictableIdleTimeMillis;
    }
    public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
    }
    public String getValidationQuery() {
        return validationQuery;
    }
    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }
    public boolean isTestWhileIdle() {
        return testWhileIdle;
    }
    public void setTestWhileIdle(boolean testWhileIdle) {
        this.testWhileIdle = testWhileIdle;
    }
    public boolean isTestOnBorrow() {
        return testOnBorrow;
    }
    public void setTestOnBorrow(boolean testOnBorrow) {
        this.testOnBorrow = testOnBorrow;
    }
    public boolean isTestOnReturn() {
        return testOnReturn;
    }
    public void setTestOnReturn(boolean testOnReturn) {
        this.testOnReturn = testOnReturn;
    }
    public boolean isPoolPreparedStatements() {
        return poolPreparedStatements;
    }
    public void setPoolPreparedStatements(boolean poolPreparedStatements) {
        this.poolPreparedStatements = poolPreparedStatements;
    }
    public int getMaxPoolPreparedStatementPerConnectionSize() {
        return maxPoolPreparedStatementPerConnectionSize;
    }
    public void setMaxPoolPreparedStatementPerConnectionSize(
            int maxPoolPreparedStatementPerConnectionSize) {
        this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
    }
    public String getFilters() {
        return filters;
    }
    public void setFilters(String filters) {
        this.filters = filters;
    }
    public String getConnectionProperties() {
        return connectionProperties;
    }
    public void setConnectionProperties(String connectionProperties) {
        this.connectionProperties = connectionProperties;
    }
    
}

再到 DruidDataSource 的配置

@Configuration
@EnableTransactionManagement
public class DruidDataSourceConfig {
    
    private static Logger logger = LoggerFactory.getLogger(DruidDataSourceConfig.class);
    
    @Autowired
    private DruidDataSourceSettings druidSettings;
    
    public static String DRIVER_CLASSNAME ;
    
    @Bean
    public static PropertySourcesPlaceholderConfigurer propertyConfigure(){
        return new PropertySourcesPlaceholderConfigurer();
    }   
    
    @Bean
    public ServletRegistrationBean druidServlet() {
        
        ServletRegistrationBean reg = new ServletRegistrationBean();
        reg.setServlet(new StatViewServlet());
//        reg.setAsyncSupported(true);
        reg.addUrlMappings("/druid/*");
        reg.addInitParameter("allow", "localhost");
        reg.addInitParameter("deny","/deny");
//        reg.addInitParameter("loginUsername", "bhz");
//        reg.addInitParameter("loginPassword", "bhz");
        logger.info(" druid console manager init : {} ", reg);
        return reg;
    }

    @Bean
    public FilterRegistrationBean filterRegistrationBean() {
        FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
        filterRegistrationBean.setFilter(new WebStatFilter());
        filterRegistrationBean.addUrlPatterns("/*");
        filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico, /druid/*");
        logger.info(" druid filter register : {} ", filterRegistrationBean);
        return filterRegistrationBean;
    }
    
    @Bean
    public DataSource dataSource() throws SQLException {
        DruidDataSource ds = new DruidDataSource();
        ds.setDriverClassName(druidSettings.getDriverClassName());
        DRIVER_CLASSNAME = druidSettings.getDriverClassName();
        ds.setUrl(druidSettings.getUrl());
        ds.setUsername(druidSettings.getUsername());
        ds.setPassword(druidSettings.getPassword());
        ds.setInitialSize(druidSettings.getInitialSize());
        ds.setMinIdle(druidSettings.getMinIdle());
//      ds.setMaxIdle(druidSettings.getMaxIdle());
        ds.setMaxActive(druidSettings.getMaxActive());
        ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
        ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
        ds.setValidationQuery(druidSettings.getValidationQuery());
        ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
        ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
        ds.setTestOnReturn(druidSettings.isTestOnReturn());
        ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
        ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
        ds.setFilters(druidSettings.getFilters());
        ds.setConnectionProperties(druidSettings.getConnectionProperties());
        
        //proxyFilters ===> 有問題
//      WallFilter wallFilter = new WallFilter();
//      WallConfig wallConfig = new WallConfig();
//      wallConfig.setMultiStatementAllow(true);
//      wallFilter.setConfig(wallConfig);
//      List<Filter> wallFilterList = new ArrayList<Filter>();
//      wallFilterList.add(wallFilter);
//      ds.setProxyFilters(wallFilterList);
        logger.info(" druid datasource config : {} ", ds);
        return ds;
    }

    @Bean
    public PlatformTransactionManager transactionManager() throws Exception {
        DataSourceTransactionManager txManager = new DataSourceTransactionManager();
        txManager.setDataSource(dataSource());
        return txManager;
    }
    
}

再接著就是 mybatis 的配置

@Configuration
@AutoConfigureAfter(MybatisDataSourceConfig.class)
public class MybatisMapperScanerConfig {
    
    @Bean
    public MapperScannerConfigurer mapperScannerConfigurer() {
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
        mapperScannerConfigurer.setBasePackage("com.hmily.rabbitmq.rabbitmqcommon.mapper");
        return mapperScannerConfigurer;
    }

}

@Configuration
public class MybatisDataSourceConfig {
    
    @Autowired
    private DataSource dataSource;
    
    @Bean(name="sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactoryBean() {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 添加XML目錄
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        try {
//          如果是剛開始搭建項(xiàng)目時(shí),mapping 底下沒有 xml 就要注釋掉這里,要不然會(huì)報(bào)錯(cuò)
            bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));
            SqlSessionFactory sqlSessionFactory = bean.getObject();
            sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
            
            return sqlSessionFactory;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}

public interface BaseMapper<T> extends Mapper<T>, MySqlMapper<T> {

}

最后是我們的定時(shí)任務(wù) task 配置

@Configuration
@EnableScheduling
public class TaskSchedulerConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskScheduler());
    }
    
    @Bean(destroyMethod="shutdown")
    public Executor taskScheduler(){
        return Executors.newScheduledThreadPool(100);
    }

}

就這樣,啟動(dòng)項(xiàng)目,運(yùn)行不報(bào)錯(cuò),寫個(gè)簡(jiǎn)單接口調(diào)用一下不報(bào)錯(cuò),我們的項(xiàng)目就搭建好了。

完整代碼:
https://github.com/hmilyos/common.git 
https://github.com/hmilyos/snowFlakeDemo.git
https://github.com/hmilyos/rabbitmq-common.git       available 分支
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容