aws服務(wù)從入門到精通| Amazon Kinesis 服務(wù)之Firehose操作

kinesis簡介(什么是Kinesis)

Amazon Kinesis 可以輕松收集、處理和分析實時視頻和數(shù)據(jù)流

  • 1、使用 Kinesis可以捕獲,處理,存儲video stream 用來分析和機器學(xué)習(xí)。
  • 2、使用 Kinesis 構(gòu)建 自定義應(yīng)用程序分析數(shù)據(jù)流,或者使用流行的流處理框架
  • 3、使用 firehose 加載流,處理 流的存儲
  • 4,使用 Kinesis Data Analytics 與SQL數(shù)據(jù)流分析

由于作者英語太爛了,所以附上亞馬遜的原文的英文如下:

Amazon Kinesis makes it easy to collect, process, and analyze video and data streams in real time.

  • 1、Use Kinesis Video Streams to capture, process, and store video streams for analytics and machine learning.
  • 2、Use Kinesis Data Streams to build custom applications that analyze data streams using popular stream processing frameworks.
  • 3、Use Kinesis Data Firehose to load data streams into AWS data stores.
  • 4、Use Kinesis Data Analytics to analyze data streams with SQL.

kinesis 操作界面簡介:

控制面板

很顯然kinesis 只有四大部分組成:

  • 1、Data Streams:
    借助 Amazon Kinesis Data Streams,您可以構(gòu)建用于處理或分析流數(shù)據(jù)的自定義應(yīng)用程序,以滿足特定需求。您可以配置數(shù)以萬計的數(shù)據(jù)創(chuàng)建器,連續(xù)不斷地將數(shù)據(jù)輸入 Kinesis 數(shù)據(jù)流。例如,來自網(wǎng)站點擊流、應(yīng)用程序日志和社交媒體饋送內(nèi)容的數(shù)據(jù)。在不到一秒時間里,您的 Amazon Kinesis 應(yīng)用程序便可以從數(shù)據(jù)流讀取和處理數(shù)據(jù)。
圖片
  • 2、Data Firehose:
    Amazon Kinesis Data Firehose 是將流數(shù)據(jù)加載到數(shù)據(jù)存儲和分析工具的最簡單方式。Kinesis Data Firehose 是一項完全托管的服務(wù),讓您可以輕松地從數(shù)十萬個來源捕獲、轉(zhuǎn)換大量流數(shù)據(jù)并將其加載到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Kinesis Data Analytics 和 Splunk 中,從而實現(xiàn)近乎實時的分析與見解

  • 3、Data Analytics:
    Amazon Kinesis Data Analytics 是使用 ANSI 標(biāo)準(zhǔn) SQL 實時處理和分析流數(shù)據(jù)最簡單的方法。借助該產(chǎn)品,您能夠從 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose 中讀取數(shù)據(jù),且可以使用 SQL 構(gòu)建流處理查詢或整個應(yīng)用程序,從而在收到數(shù)據(jù)后持續(xù)進行篩選、轉(zhuǎn)換和整合。Amazon Kinesis Data Analytics 自動識別標(biāo)準(zhǔn)數(shù)據(jù)格式、解析數(shù)據(jù)并推薦架構(gòu),這樣您便可以使用交互式架構(gòu)編輯器進行編輯。它提供交互式 SQL 編輯器和流處理模板,這樣您就可以在幾分鐘內(nèi)構(gòu)建復(fù)雜的流處理應(yīng)用程序。Amazon Kinesis Data Analytics 會在您的流式處理應(yīng)用程序中持續(xù)運行查詢,并將處理結(jié)果寫入輸出目標(biāo) (如 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose),從而將數(shù)據(jù)傳輸至 Amazon S3、Amazon Redshift 和 Amazon Elasticsearch Service。Amazon Kinesis Data Analytics 可以自動預(yù)置、部署和擴展運行您的流式處理應(yīng)用程序所需的資源

  • 4、Video Streams:
    Amazon Kinesis Video Streams 是一項完全托管的視頻提取和存儲服務(wù)。借助它,您可以為支持機器人、智能城市、工業(yè)自動化、安全監(jiān)控、機器學(xué)習(xí) (ML) 等功能的應(yīng)用程序安全地提取、處理和存儲任意規(guī)模的視頻。Kinesis Video Streams 還可以接收其他類型的時間編碼數(shù)據(jù),例如音頻、RADAR 和 LIDAR 信號。Kinesis Video Streams 為您提供了可安裝在您設(shè)備上的軟件開發(fā)工具包,從而可以輕松安全地將視頻流式傳輸?shù)?AWS。Kinesis Video Streams 可以自動預(yù)置和彈性擴展從數(shù)百萬臺設(shè)備中提取視頻流所需的所有基礎(chǔ)設(shè)施。它還持久地存儲視頻流并對其進行加密和編制索引,而且提供了易于使用的 API,因此應(yīng)用程序可以基于標(biāo)簽和時間戳來訪問和檢索已編制索引的視頻片段。Kinesis Video Streams 提供了一個庫來將 ML 框架 (例如 Apache MxNet、Tensorflow 和 OpenCV) 與視頻流集成以構(gòu)建機器學(xué)習(xí)應(yīng)用程序。Kinesis Video Streams 與 Amazon Rekognition Video 集成,從而使您能夠構(gòu)建用于檢測和識別流視頻中的人臉的計算機視覺應(yīng)用程序。

以上就是Kinesis 的四個功能所有的服務(wù),讀者可以根據(jù)實際場景進行選取對應(yīng)的服務(wù)進行操作。我今天主要講的操作服務(wù)是Firehose

Firehose的界面操作部分:

  • 1 、打開Firehose的操作界面。會出現(xiàn) 存在的delivery stream


    image.png

-2、點擊 create delivery stream 按鈕,出現(xiàn)以下界面:

image.png
  • 3.存儲數(shù)據(jù)之前可以選擇是否用lamda函數(shù)處理。符合的數(shù)據(jù)存儲。
image.png
  • 4.選擇存儲的工具。s3、redshift、elasticsearch、splunk
image.png
  • 5、存儲文件的格式,緩存多大的時候進行寫入,錯誤日志收集,IAM角色的設(shè)置
界面

image.png
  • 6、確認(rèn)信息界面:


    image.png
  • 7、查看結(jié)果


    image.png
image.png

Firehose代碼操作:

  • 1.導(dǎo)入pom.xml文件:
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>1.7.5</version>
        </dependency>
  • 2.初始化流操作
 /**
     * 初始化流
     */
    @SuppressWarnings("deprecation")
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }
  • 3、單個流的寫入
    /**
     * 
     * 單個寫流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }
  • 4、批量流的寫入
  /**
     * 批量添加數(shù)據(jù)到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("寫流錯誤" + e);
        }
    }


    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

  • 4、更新流配置
    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

完整 代碼:

package com.sdk.wifi.util.aws.firehose;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.DeliveryStreamDescription;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamRequest;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.S3DestinationUpdate;
import com.amazonaws.services.kinesisfirehose.model.UpdateDestinationRequest;

public class FireHoseUtil {

    private static final Log LOGGER = LogFactory.getLog(FireHoseUtil.class);

    // DeliveryStream properties
    private static AmazonKinesisFirehoseClient firehoseClient;
    private static final String FIRE_HOSE_REGION = "us-west-2";

    /**
     * 批量添加數(shù)據(jù)到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("寫流錯誤" + e);
        }
    }

    /**
     * 
     * 單個寫流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }

    /**
     * 字符串邊record
     * 
     * @param data
     * @return
     * @throws UnsupportedEncodingException 
     */
    public static Record createRecord(String data) throws UnsupportedEncodingException {
        return new Record().withData(ByteBuffer.wrap(data.getBytes("UTF-8")));
    }

    /**
     * 初始化流
     */
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }

    /**
     * 批量寫流
     * 
     * @param recordList
     * @return
     */
    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容