kinesis簡介(什么是Kinesis)
Amazon Kinesis 可以輕松收集、處理和分析實時視頻和數(shù)據(jù)流
- 1、使用 Kinesis可以捕獲,處理,存儲video stream 用來分析和機器學(xué)習(xí)。
- 2、使用 Kinesis 構(gòu)建 自定義應(yīng)用程序分析數(shù)據(jù)流,或者使用流行的流處理框架
- 3、使用 firehose 加載流,處理 流的存儲
- 4,使用 Kinesis Data Analytics 與SQL數(shù)據(jù)流分析
由于作者英語太爛了,所以附上亞馬遜的原文的英文如下:
Amazon Kinesis makes it easy to collect, process, and analyze video and data streams in real time.
- 1、Use Kinesis Video Streams to capture, process, and store video streams for analytics and machine learning.
- 2、Use Kinesis Data Streams to build custom applications that analyze data streams using popular stream processing frameworks.
- 3、Use Kinesis Data Firehose to load data streams into AWS data stores.
- 4、Use Kinesis Data Analytics to analyze data streams with SQL.
kinesis 操作界面簡介:

很顯然kinesis 只有四大部分組成:
- 1、Data Streams:
借助 Amazon Kinesis Data Streams,您可以構(gòu)建用于處理或分析流數(shù)據(jù)的自定義應(yīng)用程序,以滿足特定需求。您可以配置數(shù)以萬計的數(shù)據(jù)創(chuàng)建器,連續(xù)不斷地將數(shù)據(jù)輸入 Kinesis 數(shù)據(jù)流。例如,來自網(wǎng)站點擊流、應(yīng)用程序日志和社交媒體饋送內(nèi)容的數(shù)據(jù)。在不到一秒時間里,您的 Amazon Kinesis 應(yīng)用程序便可以從數(shù)據(jù)流讀取和處理數(shù)據(jù)。

2、Data Firehose:
Amazon Kinesis Data Firehose 是將流數(shù)據(jù)加載到數(shù)據(jù)存儲和分析工具的最簡單方式。Kinesis Data Firehose 是一項完全托管的服務(wù),讓您可以輕松地從數(shù)十萬個來源捕獲、轉(zhuǎn)換大量流數(shù)據(jù)并將其加載到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Kinesis Data Analytics 和 Splunk 中,從而實現(xiàn)近乎實時的分析與見解3、Data Analytics:
Amazon Kinesis Data Analytics 是使用 ANSI 標(biāo)準(zhǔn) SQL 實時處理和分析流數(shù)據(jù)最簡單的方法。借助該產(chǎn)品,您能夠從 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose 中讀取數(shù)據(jù),且可以使用 SQL 構(gòu)建流處理查詢或整個應(yīng)用程序,從而在收到數(shù)據(jù)后持續(xù)進行篩選、轉(zhuǎn)換和整合。Amazon Kinesis Data Analytics 自動識別標(biāo)準(zhǔn)數(shù)據(jù)格式、解析數(shù)據(jù)并推薦架構(gòu),這樣您便可以使用交互式架構(gòu)編輯器進行編輯。它提供交互式 SQL 編輯器和流處理模板,這樣您就可以在幾分鐘內(nèi)構(gòu)建復(fù)雜的流處理應(yīng)用程序。Amazon Kinesis Data Analytics 會在您的流式處理應(yīng)用程序中持續(xù)運行查詢,并將處理結(jié)果寫入輸出目標(biāo) (如 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose),從而將數(shù)據(jù)傳輸至 Amazon S3、Amazon Redshift 和 Amazon Elasticsearch Service。Amazon Kinesis Data Analytics 可以自動預(yù)置、部署和擴展運行您的流式處理應(yīng)用程序所需的資源4、Video Streams:
Amazon Kinesis Video Streams 是一項完全托管的視頻提取和存儲服務(wù)。借助它,您可以為支持機器人、智能城市、工業(yè)自動化、安全監(jiān)控、機器學(xué)習(xí) (ML) 等功能的應(yīng)用程序安全地提取、處理和存儲任意規(guī)模的視頻。Kinesis Video Streams 還可以接收其他類型的時間編碼數(shù)據(jù),例如音頻、RADAR 和 LIDAR 信號。Kinesis Video Streams 為您提供了可安裝在您設(shè)備上的軟件開發(fā)工具包,從而可以輕松安全地將視頻流式傳輸?shù)?AWS。Kinesis Video Streams 可以自動預(yù)置和彈性擴展從數(shù)百萬臺設(shè)備中提取視頻流所需的所有基礎(chǔ)設(shè)施。它還持久地存儲視頻流并對其進行加密和編制索引,而且提供了易于使用的 API,因此應(yīng)用程序可以基于標(biāo)簽和時間戳來訪問和檢索已編制索引的視頻片段。Kinesis Video Streams 提供了一個庫來將 ML 框架 (例如 Apache MxNet、Tensorflow 和 OpenCV) 與視頻流集成以構(gòu)建機器學(xué)習(xí)應(yīng)用程序。Kinesis Video Streams 與 Amazon Rekognition Video 集成,從而使您能夠構(gòu)建用于檢測和識別流視頻中的人臉的計算機視覺應(yīng)用程序。
以上就是Kinesis 的四個功能所有的服務(wù),讀者可以根據(jù)實際場景進行選取對應(yīng)的服務(wù)進行操作。我今天主要講的操作服務(wù)是Firehose
Firehose的界面操作部分:
-
1 、打開Firehose的操作界面。會出現(xiàn) 存在的delivery stream
image.png
-2、點擊 create delivery stream 按鈕,出現(xiàn)以下界面:

- 3.存儲數(shù)據(jù)之前可以選擇是否用lamda函數(shù)處理。符合的數(shù)據(jù)存儲。

- 4.選擇存儲的工具。s3、redshift、elasticsearch、splunk

- 5、存儲文件的格式,緩存多大的時候進行寫入,錯誤日志收集,IAM角色的設(shè)置


-
6、確認(rèn)信息界面:
image.png -
7、查看結(jié)果
image.png

Firehose代碼操作:
- 1.導(dǎo)入pom.xml文件:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.7.5</version>
</dependency>
- 2.初始化流操作
/**
* 初始化流
*/
@SuppressWarnings("deprecation")
public static void initClients() {
AWSCredentials credentials = null;
try {
credentials = new ProfileCredentialsProvider().getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (~/.aws/credentials), and is in valid format.",
e);
}
// Firehose client
firehoseClient = new AmazonKinesisFirehoseClient(credentials);
firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));
}
- 3、單個流的寫入
/**
*
* 單個寫流
*/
public static void addFireHose(Record record, String deliveryStreamName) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(deliveryStreamName);
putRecordRequest.setRecord(record);
firehoseClient.putRecord(putRecordRequest);
}
- 4、批量流的寫入
/**
* 批量添加數(shù)據(jù)到流里面
*/
public static void addBatchFireHose(List<Record> records,
String deliveryStreamName) {
try {
putRecordBatch(records, deliveryStreamName);
} catch (Exception e) {
LOGGER.error("寫流錯誤" + e);
}
}
private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
String deliveryStreamName) {
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
putRecordBatchRequest.setRecords(recordList);
return firehoseClient.putRecordBatch(putRecordBatchRequest);
}
- 4、更新流配置
/**
* 更新流配置
*
* @throws Exception
*/
public static void updateDeliveryStream(String deliveryOpenStreamName,
String s3DestinationUpdateName) throws Exception {
DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
LOGGER.info("Updating DeliveryStream Destination: "
+ deliveryOpenStreamName + " with new configuration options");
// get(0) -> DeliveryStream currently supports only one destination per
// DeliveryStream
UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
.withDeliveryStreamName(deliveryOpenStreamName)
.withCurrentDeliveryStreamVersionId(
deliveryStreamDescription.getVersionId())
.withDestinationId(
deliveryStreamDescription.getDestinations().get(0)
.getDestinationId());
S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
s3DestinationUpdate.withPrefix(s3DestinationUpdateName);
updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);
firehoseClient.updateDestination(updateDestinationRequest);
}
/**
* Method to describe the delivery stream.
*
* @param deliveryStreamName
* the delivery stream
* @return the delivery description
*/
private static DeliveryStreamDescription describeDeliveryStream(
String deliveryStreamName) {
DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
describeDeliveryStreamRequest
.withDeliveryStreamName(deliveryStreamName);
DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
.describeDeliveryStream(describeDeliveryStreamRequest);
return describeDeliveryStreamResponse.getDeliveryStreamDescription();
}
完整 代碼:
package com.sdk.wifi.util.aws.firehose;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.DeliveryStreamDescription;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamRequest;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.S3DestinationUpdate;
import com.amazonaws.services.kinesisfirehose.model.UpdateDestinationRequest;
public class FireHoseUtil {
private static final Log LOGGER = LogFactory.getLog(FireHoseUtil.class);
// DeliveryStream properties
private static AmazonKinesisFirehoseClient firehoseClient;
private static final String FIRE_HOSE_REGION = "us-west-2";
/**
* 批量添加數(shù)據(jù)到流里面
*/
public static void addBatchFireHose(List<Record> records,
String deliveryStreamName) {
try {
putRecordBatch(records, deliveryStreamName);
} catch (Exception e) {
LOGGER.error("寫流錯誤" + e);
}
}
/**
*
* 單個寫流
*/
public static void addFireHose(Record record, String deliveryStreamName) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(deliveryStreamName);
putRecordRequest.setRecord(record);
firehoseClient.putRecord(putRecordRequest);
}
/**
* 字符串邊record
*
* @param data
* @return
* @throws UnsupportedEncodingException
*/
public static Record createRecord(String data) throws UnsupportedEncodingException {
return new Record().withData(ByteBuffer.wrap(data.getBytes("UTF-8")));
}
/**
* 初始化流
*/
public static void initClients() {
AWSCredentials credentials = null;
try {
credentials = new ProfileCredentialsProvider().getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (~/.aws/credentials), and is in valid format.",
e);
}
// Firehose client
firehoseClient = new AmazonKinesisFirehoseClient(credentials);
firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));
}
/**
* 批量寫流
*
* @param recordList
* @return
*/
private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
String deliveryStreamName) {
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
putRecordBatchRequest.setRecords(recordList);
return firehoseClient.putRecordBatch(putRecordBatchRequest);
}
/**
* 更新流配置
*
* @throws Exception
*/
public static void updateDeliveryStream(String deliveryOpenStreamName,
String s3DestinationUpdateName) throws Exception {
DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
LOGGER.info("Updating DeliveryStream Destination: "
+ deliveryOpenStreamName + " with new configuration options");
// get(0) -> DeliveryStream currently supports only one destination per
// DeliveryStream
UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
.withDeliveryStreamName(deliveryOpenStreamName)
.withCurrentDeliveryStreamVersionId(
deliveryStreamDescription.getVersionId())
.withDestinationId(
deliveryStreamDescription.getDestinations().get(0)
.getDestinationId());
S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
s3DestinationUpdate.withPrefix(s3DestinationUpdateName);
updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);
firehoseClient.updateDestination(updateDestinationRequest);
}
/**
* Method to describe the delivery stream.
*
* @param deliveryStreamName
* the delivery stream
* @return the delivery description
*/
private static DeliveryStreamDescription describeDeliveryStream(
String deliveryStreamName) {
DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
describeDeliveryStreamRequest
.withDeliveryStreamName(deliveryStreamName);
DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
.describeDeliveryStream(describeDeliveryStreamRequest);
return describeDeliveryStreamResponse.getDeliveryStreamDescription();
}
}


