(五)apache ignite-Persistence 持久化(MongoDB)

使用NoSQL或MongoDB作為持久性存儲的主要好處是,它們非常適合進(jìn)行擴(kuò)展。通過使用MongoDB,您可以將冷數(shù)據(jù)或歷史數(shù)據(jù)直接以JSON的形式存儲到持久層,而無需進(jìn)行任何數(shù)據(jù)轉(zhuǎn)換。MongoDB的持久性方法與前一節(jié)RDBMS非常相似。


image.png

上圖顯示了使用MongoDB作為持久化存儲的概念體系結(jié)構(gòu)。關(guān)于MongoDB: MongoDB是一個開源的無模式數(shù)據(jù)庫。MongoDB是一個面向文檔的數(shù)據(jù)庫,以BSON或二進(jìn)制JSON格式存儲所有數(shù)據(jù)。這使MongoDB能夠靈活地通過javascript訪問它的功能,使特定類型的數(shù)據(jù)集成更容易、更快。MongoDB還通過共享集群的配置支持?jǐn)?shù)據(jù)分片;這意味著可以垂直和水平無縫地擴(kuò)展MongoDB。通常,MongoDB用于實時分析,在這種情況下,延遲很低,可用性非常高。為了完成MongoDB中緩存數(shù)據(jù)記錄的持久性,我們將首先安裝MongoDB并準(zhǔn)備maven項目。如果已經(jīng)在系統(tǒng)中安裝和配置了MongoDB數(shù)據(jù)庫,那么可以跳過第一步,從第4步繼續(xù)。

Step 1:

從這里下載MongoDB發(fā)行版(https://www.mongodb.com/download-center?jmp=nav#community)。我將使用社區(qū)版和下載MacOS的發(fā)行版。

Step 2:

將發(fā)行版解壓到操作系統(tǒng)中的某個位置,如下所示。

tar –xvf mongodb-osx-ssl-x86_64-4.0.3.tgz

創(chuàng)建本地文件目錄/data/db

Step 3:

在命令行中使用以下命令運(yùn)行MongoDB服務(wù)器。

./mongod --dbpath /data/db

MongoDB在27017端口啟動并運(yùn)行。

Step 4:

創(chuàng)建一個maven項目,并在pom.xml配置文件中添加所有的依賴項。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycookcode.bigData.ignite</groupId>
  <artifactId>ignite-persistence</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>ignite-persistence</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <ignite.version>2.6.0</ignite.version>
    <spring.version>5.1.1.RELEASE</spring.version>
  </properties>


  <dependencies>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-spring</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-slf4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-log4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>9.1-901-1.jdbc4</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>${spring.version}</version>

    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aop</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-beans</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.version}</version>
    </dependency>


    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-mongodb</artifactId>
      <version>2.1.1.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-jpa</artifactId>
      <version>2.1.1.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-expression</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-framework-bom</artifactId>
      <version>${spring.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>


  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>com.jolira</groupId>
        <artifactId>onejar-maven-plugin</artifactId>
        <version>1.4.4</version>
        <executions>
          <execution>
            <id>build-query</id>
            <configuration>
              <mainClass>com.mycookcode.bigData.ignite.App</mainClass>
              <attachToBuild>true</attachToBuild>
              <classifier>onejar</classifier>
              <filename>cache-store-runnable.jar</filename>
            </configuration>
            <goals>
              <goal>one-jar</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

Step 5:

創(chuàng)建com.mycookcode.bigData.ignite.nosql.model.MongoPost類對應(yīng)MongoDB中的數(shù)據(jù)模型。

package com.mycookcode.bigData.ignite.nosql.model;


import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDate;
import java.util.Objects;

@Document
public class MongoPost {

    @Id
    private String id;

    private String title;

    private String description;

    private LocalDate creationDate;

    private String author;

    public MongoPost() {
    }

    public MongoPost(String id, String title, String description, LocalDate creationDate, String author) {
        this.id = id;
        this.title = title;
        this.description = description;
        this.creationDate = creationDate;
        this.author = author;
    }


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public LocalDate getCreationDate() {
        return creationDate;
    }

    public void setCreationDate(LocalDate creationDate) {
        this.creationDate = creationDate;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        MongoPost post = (MongoPost)o;
        return Objects.equals(id, post.id) &&
                Objects.equals(title, post.title) &&
                Objects.equals(description, post.description) &&
                Objects.equals(creationDate, post.creationDate) &&
                Objects.equals(author, post.author);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, title, description, creationDate, author);
    }

    @Override
    public String toString() {
        return "MongoPost{" +
                "id='" + id + '\'' +
                ", title='" + title + '\'' +
                ", description='" + description + '\'' +
                ", creationDate=" + creationDate +
                ", author='" + author + '\'' +
                '}';
    }
}

Step 6:

我們將使用spring框架中的spring-data-MongoDB庫來處理MongoDB。我們還創(chuàng)建了一個spring上下文文件mongo-context.xml來配置MongoDB工廠。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:mongo="http://www.springframework.org/schema/data/mongo"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:annotation-config />

    <mongo:mongo-client id="mongo" host="localhost" port="27017"/>

    <mongo:db-factory id="mongoDbFactory" dbname="test" mongo-ref="mongo" />

    <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
        <constructor-arg ref="mongoDbFactory"/>
    </bean>


    <mongo:repositories base-package="com.mycookcode.bigData.ignite.nosql"  mongo-template-ref="mongoTemplate"/>

    <bean class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor"/>

    </beans>

上面的XML文件指示Spring執(zhí)行以下操作:

  • 指定MongoDB主機(jī)和端口。
  • 設(shè)置MongoDB工廠數(shù)據(jù)庫名稱是test。
  • 轉(zhuǎn)換MongoPost DTO為mongo Bson。
  • 設(shè)置MongoDB模版工廠。

Step 7:

創(chuàng)建一個名為MongoDBStore的新Java類,它將擴(kuò)展Ignite CacheStoreAdapter并實現(xiàn)LifecycleAware接口。

package com.mycookcode.bigData.ignite.nosql;

import com.mycookcode.bigData.ignite.nosql.model.MongoPost;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.mongodb.core.MongoOperations;

import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;


public class MongoDBStore extends CacheStoreAdapter<String,MongoPost> implements LifecycleAware {

    @Autowired
    private PostRepository postRepository;

    @Autowired
    private MongoOperations mongoOperations;


    private static Logger logger = LoggerFactory.getLogger(MongoDBStore.class);

    @Override
    public MongoPost load(String key) throws CacheLoaderException
    {
        logger.info(String.valueOf(postRepository));
        return postRepository.findById(key).get();
    }

    @Override
    public void write(Cache.Entry<? extends String,? extends MongoPost> entry) throws CacheWriterException
    {
        MongoPost post = entry.getValue();
        logger.info(String.valueOf(postRepository));
        postRepository.save(post);
    }

    @Override
    public void delete(Object key) throws CacheWriterException
    {
        logger.info(String.valueOf(postRepository));
        postRepository.deleteById((String) key);
    }

    @Override
    public void start() throws IgniteException
    {
        ConfigurableApplicationContext context  = new ClassPathXmlApplicationContext("mongo-context.xml");
        postRepository = context.getBean(PostRepository.class);
        logger.info(String.valueOf(postRepository));
        mongoOperations = context.getBean(MongoOperations.class);
        if (!mongoOperations.collectionExists(MongoPost.class))
            mongoOperations.createCollection(MongoPost.class);
    }

    @Override
    public void stop() throws IgniteException
    {

    }

}

上面的代碼實現(xiàn)并覆寫了三個CacheStore方法:load()、write()和delete()。在load()方法中,我們通過給定的鍵查找MongoDB中的文檔,并返回MongoPost實例。Write()方法將單個緩存的數(shù)據(jù)記錄保存到MongoDB test數(shù)據(jù)庫中。Delete()方法通過給定的鍵從MongoDB刪除bson文檔。

Step 8:

為了存儲并從MongoDB數(shù)據(jù)庫加載數(shù)據(jù),我們以編程方式配置Ignite cacheConfiguration。我們編寫一個com.mycookcode.bigData.ignite.App類用于從緩存中存儲和加載數(shù)據(jù)記錄。

package com.mycookcode.bigData.ignite;

import com.mycookcode.bigData.ignite.jdbc.PostgresDBStore;
import com.mycookcode.bigData.ignite.jdbc.model.Post;
import com.mycookcode.bigData.ignite.nosql.MongoDBStore;
import com.mycookcode.bigData.ignite.nosql.model.MongoPost;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.cache.Cache;
import javax.cache.configuration.FactoryBuilder;

import java.time.LocalDate;
import java.time.temporal.ChronoUnit;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;


public class App
{

    private static final String POST_CACHE_NAME = App.class.getSimpleName() + "-post";
    private static Logger LOGGER = LoggerFactory.getLogger(App.class);
    private static final String POSTGRESQL = "postgresql";
    private static final String MONGODB = "mongodb";

    public static void main( String[] args ) throws Exception
    {

        if(args.length <= 0 ){
            LOGGER.error("Usages! java -jar .\\target\\cache-store-runnable.jar postgresql|mongodb");
            System.exit(0);
        }
        if(args[0].equalsIgnoreCase(POSTGRESQL)){
            jdbcStoreExample();
        } else if (args[0].equalsIgnoreCase(MONGODB)){
            nosqlStore();
        }
    }


    private static void nosqlStore() throws Exception
    {
        IgniteConfiguration cfg = new IgniteConfiguration();

        CacheConfiguration configuration = new CacheConfiguration();
        configuration.setName("mongoDynamicCache");
        configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);

        configuration.setCacheStoreFactory(FactoryBuilder.factoryOf(MongoDBStore.class));
        configuration.setReadThrough(true);
        configuration.setWriteThrough(true);
        configuration.setWriteBehindEnabled(true);

        log("Start. PersistenceStore example.");
        cfg.setCacheConfiguration(configuration);

        try (Ignite ignite = Ignition.start(cfg)){
            int count = 10;
            try(IgniteCache<String,MongoPost> igniteCache = ignite.getOrCreateCache(configuration))
            {
                try(Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
                {
                    for(int i = 0;i <= count;i++)
                    {
                        igniteCache.put("_"+i,new MongoPost("_" + i, "title-" + i, "description-" + i, LocalDate.now().plus(i, ChronoUnit.DAYS), "author-" + i));
                    }
                    for (int i = 1; i < count; i += 2) {
                        igniteCache.clear("_" + i);
                        log("Clear every odd key: " + i);
                    }

                    for (long i = 1; i <= count; i++)
                        log("Local peek at [key=_" + i + ", val=" + igniteCache.localPeek("_" + i) + ']');

                    for (long i = 1; i <= count; i++)
                        log("Got [key=_" + i + ", val=" + igniteCache.get("_" + i) + ']');
                    tx.commit();
                }
            }
            log("PersistenceStore example finished.");
            ignite.destroyCache("mongoDynamicCache");
        }
    }


    private static void jdbcStoreExample() throws Exception
    {
        //構(gòu)建一個動態(tài)緩存,它分布在所有運(yùn)行的節(jié)點上。
        //也可以在xml配置文件中使用相同的配置
        IgniteConfiguration cfg = new IgniteConfiguration();

        CacheConfiguration configuration = new CacheConfiguration();
        configuration.setName("dynamicCache");
        configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);

        configuration.setCacheStoreFactory(FactoryBuilder.factoryOf(PostgresDBStore.class));
        configuration.setReadThrough(true);
        configuration.setWriteThrough(true);

        configuration.setWriteBehindEnabled(true);

        log("Start. PersistenceStore example.");
        cfg.setCacheConfiguration(configuration);

        try(Ignite ignite = Ignition.start(cfg))
        {
            int count = 10;
            try(IgniteCache<String,Post> igniteCache = ignite.getOrCreateCache(configuration))
            {

                try (Transaction tx =  ignite.transactions().txStart(PESSIMISTIC,REPEATABLE_READ)){
                    for(int i = 1;i <= count;i++)
                    {
                        igniteCache.put("_"+i,new Post("_" + i, "title-" + i, "description-" + i, LocalDate.now().plus(i, ChronoUnit.DAYS),"author-" + i));
                    }
                    tx.commit();

                    for (int i = 1;i < count;i+=2)
                    {
                        igniteCache.clear("_"+i);
                        log("Clear every odd key: " + i);
                    }

                    for (long i = 1;i <= count;i++)
                    {
                        log("Local peek at [key=_" + i + ", val=" + igniteCache.localPeek("_" + i) + ']');
                    }

                    for (long i = 1;i <= count;i++)
                    {
                        log("Got [key=_" + i + ", val=" + igniteCache.get("_" + i) + ']');
                    }
                    tx.commit();

                }
            }
            log("PersistenceStore example finished.");
            //ignite.destroyCache("dynamicCache");
            Thread.sleep(Integer.MAX_VALUE);
        }

    }


    private static void log(String msg) {
        LOGGER.info("\t" + msg);
    }
}

首先創(chuàng)建Ignite CacheConfiguration實例,并將緩存名稱設(shè)置為mongoDynamicCache。將原子性模式設(shè)置為事務(wù)性。然后將MongoDBStore設(shè)置為緩存工廠,并啟用讀寫功能。

Step 9:

通過maven編譯這個項目,并運(yùn)行應(yīng)用程序?qū)⒁恍l目從Ignite緩存存儲到MongoDB中。

mvn clean install

使用以下命令運(yùn)行應(yīng)用程序。

java -jar ./target/cache-store-runnable.jar mongodb
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,311評論 2 89
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,609評論 19 139
  • In-memory方法可以通過將數(shù)據(jù)集合放入系統(tǒng)內(nèi)存來實現(xiàn)驚人的速度。當(dāng)所有數(shù)據(jù)都保存在內(nèi)存中時,就不再需要處理使...
    席夢思閱讀 5,036評論 1 2
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,275評論 6 342
  • 最近電視上關(guān)于游戲活動的娛樂節(jié)目比較多,所以我也從中吸取一些能夠適應(yīng)大家的一些游戲性的學(xué)習(xí)活動。文言文學(xué)習(xí)對于學(xué)生...
    菲兒dancer閱讀 1,107評論 0 4

友情鏈接更多精彩內(nèi)容