AWS Lambda筆記-SNS/SQS事件通知-13【含源代碼】

SNS(Simple Notification Service),SQS(Simple Queue Service)都是AWS非常重要的部分,它允許軟件的不通部分通過消息傳遞彼此進行交流。在SNS中可以創(chuàng)建主題并為其訂閱資源。訂閱的資源可以是HTTP端點,Lambda函數(shù),移動應(yīng)用,發(fā)送郵件,向SQS發(fā)送隊列消息,發(fā)送SMS短信等。在SQS中可以創(chuàng)建隊列并發(fā)送消息到隊列中,然后消費者可以輪詢這些消息進行消費,類似Rabbit MQ。

  1. 創(chuàng)建SNS主題
  2. Lambda發(fā)布SNS通知
  3. Lambda訂閱SNS消息
  4. SQS訂閱SNS消息

源代碼

代碼下載地址:https://pan.baidu.com/s/1rjK5Pm-UQM8OnPRUUBjuMg
提取碼:j9ak

工程說明

工程關(guān)系配置

1. 創(chuàng)建SNS主題

創(chuàng)建SNS主題比較簡單,可以通過控制臺創(chuàng)建https://console.aws.amazon.com/sns/v3/home或通過CloudFormation模版創(chuàng)建。
當(dāng)前通過cloudformation模版創(chuàng)建:

"UserRegistrationSnsTopic": {
  "Type": "AWS::SNS::Topic",
  "Properties": {
    "Subscription": [
      //此主題的 SNS 訂閱(終端節(jié)點)
      //下面會在這邊添加Lambda和SQS訂閱
    ]
  }
}

發(fā)布工程./gradlew deploy成功后,我們可以在控制臺https://console.aws.amazon.com/sns/v3/home中看到發(fā)布的主題。

SNS主題內(nèi)容

2. Lambda發(fā)布SNS通知

在Lambda里面發(fā)布SNS消息,需要應(yīng)用SNS SDK。發(fā)布消息很簡單,只需要調(diào)用AmazonSNSClient.publish(topicArn,message)即可發(fā)布信息。參數(shù)topicArn可以通過環(huán)境變量獲取。

  1. 在cloudformation.template中添加Arn的環(huán)境變量
  2. 在Lambda中添發(fā)布消息代碼
  1. 在cloudformation.template中添加Arn的環(huán)境變量
 "UserRegistrationLambda": {
      "Type": "AWS::Lambda::Function",
      "Properties": {
        "Handler": "com.serverlessbook.lambda.userregistration.Handler",
        "Description": "User registration Lambda",
        "Role": {
          "Fn::GetAtt": [
          ]
        },
        "Environment": {
             //添加環(huán)境變量,后續(xù)UserRegistrationLambda的Handle中使用到。
            "UserRegistrationSnsTopic": {
              "Ref": "UserRegistrationSnsTopic"
            }
          }
        }
      }
    }
  1. 在Lambda中添發(fā)布消息代碼
    代碼中省略內(nèi)部類和注冊成功跳轉(zhuǎn)地址的代碼片段,重點展示SNS的信息發(fā)布。
//用戶注冊Lambda
public class Handler extends LambdaHandler<Handler.RegistrationInput, Handler.RegistrationOutput> {
    //SNS Client 
    private AmazonSNSClient amazonSNSClient;
    
    //在構(gòu)造器中我們使用 INJECTOR.injectMembers(this);
    //這樣我們可以通過New方法,將AmazonSNSClient注入進來。不需要在DependencyInjectionModule的config()中配置。
    @Inject
    public Handler setAmazonSNSClient(AmazonSNSClient amazonSNSClient) {
        this.amazonSNSClient = amazonSNSClient;
        return this;
    }
    //獲取用戶信息發(fā)布消息
    private void notifySnsSubscribers(User user) {
      try {
        amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), user.getEmail());
        LOGGER.info("SNS notification sent for "+user.getEmail());
      } catch (Exception anyException) {
        LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
      }
    }

    public Handler() {
        INJECTOR.injectMembers(this);
        Objects.requireNonNull(userService);
    }

    @Override
    public RegistrationOutput handleRequest(RegistrationInput input, Context context) {
        User createdUser = userService.registerNewUser(input.username, input.email);
        //發(fā)布該主題的消息
        notifySnsSubscribers(createdUser);
        //返回生成user的原始URL
        return new RegistrationOutput(createdUser);
    }
}

3. Lambda訂閱SNS消息

接下來需要訂閱消息,當(dāng)訂閱到消息后可以觸發(fā)一些動作,例如接受到用戶注冊功能信息,給用戶發(fā)送電子郵件。我們創(chuàng)建一個lambda-userregistration-welcomemail工程,在這個工程中訂閱“UserRegistrationSnsTopic”主題發(fā)布的信息。

  1. 創(chuàng)建Lambda工程,獲取訂閱消息

  2. 在cloudformation中配置lambda

  3. 在cloudformation中添加消息訂閱

  4. 創(chuàng)建Lambda工程,獲取訂閱消息
    創(chuàng)建lambda-userregistration-welcomemail工程,將該工程加入settings.gradle,并在build.gradle文件中添加SNS Event的SDK,添加Lambda獲取訂閱消息的邏輯代碼。
    如圖創(chuàng)建工程和目錄結(jié)構(gòu)。


    SNS主題內(nèi)容

將工程加入settings.gradle中:

include 'lambda-userregistration-welcomemail'

在當(dāng)前工程的build.gradle中添加依賴:

dependencies {
    compile group: 'com.amazonaws', name: 'aws-lambda-java-events', version: '1.3.0'
    compile group: 'com.google.inject', name: 'guice', version: guiceVersion
}

添加Lambda代碼,代碼中SNSEvent對象的getRecords()方法的值,大多數(shù)情況下,返回的是一個元素的列表,但也有可能是多個消息,所以這邊需要通過循環(huán)遍歷來獲得每條信息,并進行處理。

public class Handler implements RequestHandler<SNSEvent, Void> {
  private static final Injector INJECTOR = Guice.createInjector();
  private AmazonSimpleEmailServiceClient simpleEmailServiceClient;

  //直接注入new的AmazonSimpleEmailServiceClient對象
  @Inject
  public Handler setSimpleEmailServiceClient(
      AmazonSimpleEmailServiceClient simpleEmailServiceClient) {
    this.simpleEmailServiceClient = simpleEmailServiceClient;
    return this;
  }

  public Handler() {
    INJECTOR.injectMembers(this);
    Objects.nonNull(simpleEmailServiceClient);
  }

  private void sendEmail(final String emailAddress) {
     //可以發(fā)送SES郵件
     LOGGER.debug("Sending welcome mail to " + emailAddress + " succeeded");
  }

  @Override
  public Void handleRequest(SNSEvent input, Context context) {
    //收到的是標(biāo)準(zhǔn)的SNSEvent事件
    //getRecords()返回的是一個列表,表示Lambda可能一次收多條SNS消息。
    input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
    return null;
  }
}
  1. 在cloudformation中配置lambda
    "UserRegistrationWelcomeMailLambda": {
      "Type": "AWS::Lambda::Function",
      "Properties": {
        "Handler": "com.serverlessbook.lambda.userregistration.welcomemail.Handler",
        "Runtime": "java8",
        "Timeout": "300",
        "MemorySize": "1024",
        "Description": "User registration welcome mail Lambda",
        "Role": {
          "Fn::GetAtt": [
            "LambdaExecutionRole",
            "Arn"
          ]
        },
        "Code": {
          "S3Bucket": {
            "Ref": "DeploymentBucket"
          },
          "S3Key": {
            "Fn::Sub": "artifacts/lambda-userregistration-welcomemail/${ProjectVersion}/${DeploymentTime}.jar"
          }
        }
      }
    },
    //授權(quán)器
    "UserRegistrationWelcomeMailLambdaPermission": {
      "Type": "AWS::Lambda::Permission",
      "Properties": {
        "Action": "lambda:InvokeFunction",
        "FunctionName": {
          "Ref": "UserRegistrationWelcomeMailLambda"
        },
        "Principal": "sns.amazonaws.com",
        "SourceArn": {
          "Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:*"
        }
      }
    },
  1. 在cloudformation中添加消息訂閱
    這邊需要在上面的"UserRegistrationSnsTopic"中添加消息訂閱。注意,后續(xù)還會添加SQS的訂閱。
"UserRegistrationSnsTopic": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
        "Subscription": [
            //Lambda訂閱
            {
                "Endpoint": {
                    "Fn::GetAtt": [
                        "UserRegistrationWelcomeMailLambda",
                        "Arn"
                    ]
                },
                "Protocol": "lambda"
            }
        ]
    }
}

第二階段./gradlew deploy 在用戶注冊成功后,可以在CloudWatch控制臺看到一行日志"Sending welcome mail to xxx succeeded"的日志。到此整個信息的發(fā)布和訂閱就完成了。后續(xù)我們在這個SNS消息中添加SQS的訂閱。

4. SQS訂閱SNS消息

在SNS的Topic中添加SQS的訂閱和Lambda的訂閱是一樣的。

  1. 創(chuàng)建一個消息隊列
  2. 添加消息訂閱
  1. 創(chuàng)建一個消息隊列
    詳細(xì)內(nèi)容標(biāo)注在注釋中。
//創(chuàng)建標(biāo)準(zhǔn)SQS隊列
"UserRegistrationQueue": {
    "Type": "AWS::SQS::Queue"
},
SQS隊列的策略,應(yīng)用與上面的SQS
"UserRegistrationQueuePolicy": {
    "Type": "AWS::SQS::QueuePolicy",
    "Properties": {
        "PolicyDocument": {
            "Version": "2012-10-17",
            //權(quán)限設(shè)置,所有人允許在隊列UserRegistrationQueue上發(fā)送消息
            //(只是發(fā)送,接受消息可以添加SQS:ReceiveMessage)
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": "*",
                    "Action": "SQS:SendMessage",
                    "Resource": {
                        "Fn::GetAtt": [
                            "UserRegistrationQueue",
                            "Arn"
                        ]
                    },
                    "Condition": {
                        "ArnEquals": {
                            "aws:SourceArn": {
                                "Ref": "UserRegistrationSnsTopic"
                            }
                        }
                    }
                }
            ]
        },
        //策略作用于UserRegistrationQueue隊列
        "Queues": [
            {
                "Ref": "UserRegistrationQueue"
            }
        ]
    }
}
  1. 添加消息訂閱
    需要繼續(xù)在“UserRegistrationSnsTopic”繼續(xù)添加訂閱消息。完整的Topic和訂閱如下:
"UserRegistrationSnsTopic": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
        "Subscription": [
            //Lambda訂閱
            {
                "Endpoint": {
                    "Fn::GetAtt": [
                        "UserRegistrationWelcomeMailLambda",
                        "Arn"
                    ]
                },
                "Protocol": "lambda"
            },
            //SQS訂閱
            {
                "Endpoint": {
                    "Fn::GetAtt": [
                        "UserRegistrationQueue",
                        "Arn"
                    ]
                },
                "Protocol": "sqs"
            }
        ]
    }
}

第三階段./gradlew deploy 在用戶注冊成功后,可以登陸SQS控制臺https://console.aws.amazon.com/sqs/v2/home
已經(jīng)有一條信息。

SQS隊列信息

源代碼

代碼下載地址:https://pan.baidu.com/s/1rjK5Pm-UQM8OnPRUUBjuMg
提取碼:j9ak

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容