SNS(Simple Notification Service),SQS(Simple Queue Service)都是AWS非常重要的部分,它允許軟件的不通部分通過消息傳遞彼此進行交流。在SNS中可以創(chuàng)建主題并為其訂閱資源。訂閱的資源可以是HTTP端點,Lambda函數(shù),移動應(yīng)用,發(fā)送郵件,向SQS發(fā)送隊列消息,發(fā)送SMS短信等。在SQS中可以創(chuàng)建隊列并發(fā)送消息到隊列中,然后消費者可以輪詢這些消息進行消費,類似Rabbit MQ。
- 創(chuàng)建SNS主題
- Lambda發(fā)布SNS通知
- Lambda訂閱SNS消息
- SQS訂閱SNS消息
源代碼
代碼下載地址:https://pan.baidu.com/s/1rjK5Pm-UQM8OnPRUUBjuMg
提取碼:j9ak
工程說明

1. 創(chuàng)建SNS主題
創(chuàng)建SNS主題比較簡單,可以通過控制臺創(chuàng)建https://console.aws.amazon.com/sns/v3/home或通過CloudFormation模版創(chuàng)建。
當(dāng)前通過cloudformation模版創(chuàng)建:
"UserRegistrationSnsTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"Subscription": [
//此主題的 SNS 訂閱(終端節(jié)點)
//下面會在這邊添加Lambda和SQS訂閱
]
}
}
發(fā)布工程./gradlew deploy成功后,我們可以在控制臺https://console.aws.amazon.com/sns/v3/home中看到發(fā)布的主題。

2. Lambda發(fā)布SNS通知
在Lambda里面發(fā)布SNS消息,需要應(yīng)用SNS SDK。發(fā)布消息很簡單,只需要調(diào)用AmazonSNSClient.publish(topicArn,message)即可發(fā)布信息。參數(shù)topicArn可以通過環(huán)境變量獲取。
- 在cloudformation.template中添加Arn的環(huán)境變量
- 在Lambda中添發(fā)布消息代碼
- 在cloudformation.template中添加Arn的環(huán)境變量
"UserRegistrationLambda": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "com.serverlessbook.lambda.userregistration.Handler",
"Description": "User registration Lambda",
"Role": {
"Fn::GetAtt": [
]
},
"Environment": {
//添加環(huán)境變量,后續(xù)UserRegistrationLambda的Handle中使用到。
"UserRegistrationSnsTopic": {
"Ref": "UserRegistrationSnsTopic"
}
}
}
}
}
- 在Lambda中添發(fā)布消息代碼
代碼中省略內(nèi)部類和注冊成功跳轉(zhuǎn)地址的代碼片段,重點展示SNS的信息發(fā)布。
//用戶注冊Lambda
public class Handler extends LambdaHandler<Handler.RegistrationInput, Handler.RegistrationOutput> {
//SNS Client
private AmazonSNSClient amazonSNSClient;
//在構(gòu)造器中我們使用 INJECTOR.injectMembers(this);
//這樣我們可以通過New方法,將AmazonSNSClient注入進來。不需要在DependencyInjectionModule的config()中配置。
@Inject
public Handler setAmazonSNSClient(AmazonSNSClient amazonSNSClient) {
this.amazonSNSClient = amazonSNSClient;
return this;
}
//獲取用戶信息發(fā)布消息
private void notifySnsSubscribers(User user) {
try {
amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), user.getEmail());
LOGGER.info("SNS notification sent for "+user.getEmail());
} catch (Exception anyException) {
LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
}
}
public Handler() {
INJECTOR.injectMembers(this);
Objects.requireNonNull(userService);
}
@Override
public RegistrationOutput handleRequest(RegistrationInput input, Context context) {
User createdUser = userService.registerNewUser(input.username, input.email);
//發(fā)布該主題的消息
notifySnsSubscribers(createdUser);
//返回生成user的原始URL
return new RegistrationOutput(createdUser);
}
}
3. Lambda訂閱SNS消息
接下來需要訂閱消息,當(dāng)訂閱到消息后可以觸發(fā)一些動作,例如接受到用戶注冊功能信息,給用戶發(fā)送電子郵件。我們創(chuàng)建一個lambda-userregistration-welcomemail工程,在這個工程中訂閱“UserRegistrationSnsTopic”主題發(fā)布的信息。
創(chuàng)建Lambda工程,獲取訂閱消息
在cloudformation中配置lambda
在cloudformation中添加消息訂閱
-
創(chuàng)建Lambda工程,獲取訂閱消息
創(chuàng)建lambda-userregistration-welcomemail工程,將該工程加入settings.gradle,并在build.gradle文件中添加SNS Event的SDK,添加Lambda獲取訂閱消息的邏輯代碼。
如圖創(chuàng)建工程和目錄結(jié)構(gòu)。
SNS主題內(nèi)容
將工程加入settings.gradle中:
include 'lambda-userregistration-welcomemail'
在當(dāng)前工程的build.gradle中添加依賴:
dependencies {
compile group: 'com.amazonaws', name: 'aws-lambda-java-events', version: '1.3.0'
compile group: 'com.google.inject', name: 'guice', version: guiceVersion
}
添加Lambda代碼,代碼中SNSEvent對象的getRecords()方法的值,大多數(shù)情況下,返回的是一個元素的列表,但也有可能是多個消息,所以這邊需要通過循環(huán)遍歷來獲得每條信息,并進行處理。
public class Handler implements RequestHandler<SNSEvent, Void> {
private static final Injector INJECTOR = Guice.createInjector();
private AmazonSimpleEmailServiceClient simpleEmailServiceClient;
//直接注入new的AmazonSimpleEmailServiceClient對象
@Inject
public Handler setSimpleEmailServiceClient(
AmazonSimpleEmailServiceClient simpleEmailServiceClient) {
this.simpleEmailServiceClient = simpleEmailServiceClient;
return this;
}
public Handler() {
INJECTOR.injectMembers(this);
Objects.nonNull(simpleEmailServiceClient);
}
private void sendEmail(final String emailAddress) {
//可以發(fā)送SES郵件
LOGGER.debug("Sending welcome mail to " + emailAddress + " succeeded");
}
@Override
public Void handleRequest(SNSEvent input, Context context) {
//收到的是標(biāo)準(zhǔn)的SNSEvent事件
//getRecords()返回的是一個列表,表示Lambda可能一次收多條SNS消息。
input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
return null;
}
}
- 在cloudformation中配置lambda
"UserRegistrationWelcomeMailLambda": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "com.serverlessbook.lambda.userregistration.welcomemail.Handler",
"Runtime": "java8",
"Timeout": "300",
"MemorySize": "1024",
"Description": "User registration welcome mail Lambda",
"Role": {
"Fn::GetAtt": [
"LambdaExecutionRole",
"Arn"
]
},
"Code": {
"S3Bucket": {
"Ref": "DeploymentBucket"
},
"S3Key": {
"Fn::Sub": "artifacts/lambda-userregistration-welcomemail/${ProjectVersion}/${DeploymentTime}.jar"
}
}
}
},
//授權(quán)器
"UserRegistrationWelcomeMailLambdaPermission": {
"Type": "AWS::Lambda::Permission",
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "UserRegistrationWelcomeMailLambda"
},
"Principal": "sns.amazonaws.com",
"SourceArn": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:*"
}
}
},
- 在cloudformation中添加消息訂閱
這邊需要在上面的"UserRegistrationSnsTopic"中添加消息訂閱。注意,后續(xù)還會添加SQS的訂閱。
"UserRegistrationSnsTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"Subscription": [
//Lambda訂閱
{
"Endpoint": {
"Fn::GetAtt": [
"UserRegistrationWelcomeMailLambda",
"Arn"
]
},
"Protocol": "lambda"
}
]
}
}
第二階段./gradlew deploy 在用戶注冊成功后,可以在CloudWatch控制臺看到一行日志"Sending welcome mail to xxx succeeded"的日志。到此整個信息的發(fā)布和訂閱就完成了。后續(xù)我們在這個SNS消息中添加SQS的訂閱。
4. SQS訂閱SNS消息
在SNS的Topic中添加SQS的訂閱和Lambda的訂閱是一樣的。
- 創(chuàng)建一個消息隊列
- 添加消息訂閱
- 創(chuàng)建一個消息隊列
詳細(xì)內(nèi)容標(biāo)注在注釋中。
//創(chuàng)建標(biāo)準(zhǔn)SQS隊列
"UserRegistrationQueue": {
"Type": "AWS::SQS::Queue"
},
SQS隊列的策略,應(yīng)用與上面的SQS
"UserRegistrationQueuePolicy": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Version": "2012-10-17",
//權(quán)限設(shè)置,所有人允許在隊列UserRegistrationQueue上發(fā)送消息
//(只是發(fā)送,接受消息可以添加SQS:ReceiveMessage)
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "SQS:SendMessage",
"Resource": {
"Fn::GetAtt": [
"UserRegistrationQueue",
"Arn"
]
},
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "UserRegistrationSnsTopic"
}
}
}
}
]
},
//策略作用于UserRegistrationQueue隊列
"Queues": [
{
"Ref": "UserRegistrationQueue"
}
]
}
}
- 添加消息訂閱
需要繼續(xù)在“UserRegistrationSnsTopic”繼續(xù)添加訂閱消息。完整的Topic和訂閱如下:
"UserRegistrationSnsTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"Subscription": [
//Lambda訂閱
{
"Endpoint": {
"Fn::GetAtt": [
"UserRegistrationWelcomeMailLambda",
"Arn"
]
},
"Protocol": "lambda"
},
//SQS訂閱
{
"Endpoint": {
"Fn::GetAtt": [
"UserRegistrationQueue",
"Arn"
]
},
"Protocol": "sqs"
}
]
}
}
第三階段./gradlew deploy 在用戶注冊成功后,可以登陸SQS控制臺https://console.aws.amazon.com/sqs/v2/home
已經(jīng)有一條信息。

源代碼
代碼下載地址:https://pan.baidu.com/s/1rjK5Pm-UQM8OnPRUUBjuMg
提取碼:j9ak
