AWS Lambda筆記-Lambda Upload CloudSearch-15【含源代碼】

CloudSearch支持SDK方式上傳數(shù)據(jù),可通過創(chuàng)建一個Lambda函數(shù)來讀取SNS通知消息,并將值寫入CloudSearch中。那么實現(xiàn)通過/search?q=newuser搜索到新注冊的用戶信息的功能,就可以在新用戶注冊發(fā)布到SNS中信息以JSON格式寫入,再用一個Lambda訂閱JSON格式數(shù)據(jù)并寫入CloudSearch實現(xiàn)。

  1. User對象序列化為JSON格式
  2. 新用戶注冊Lambda,發(fā)布Json格式的User到SNS
  3. 訂閱SNS消息并反序列化User對象發(fā)郵件
  4. 訂閱SNS消息并上傳Json格式User到CloudSearch

工程說明

工程主要是通過SNS發(fā)布訂閱,lambda-userregistration-cloudsearch(工程)接受訂閱信息并寫入CloudSearch中,lambda-userregistration-welcomemail(工程)接受訂閱信息并發(fā)送郵件給新注冊用戶。


工程關系配置

1. User對象序列化為JSON格式

為方便Lambda接收SNS的用戶數(shù)據(jù)為JSON,所以在新用戶注冊成功后發(fā)布到SNS,就需要將User對象格式化為JSON。具體需要將service-user工程中的User類添加Json注釋(@JsonProperty)。

/*
 User POJO類
*/
public class User {

    @DynamoDBHashKey(attributeName = "UserId")
    @JsonProperty("userid")
    private String id;

    @DynamoDBIndexHashKey(globalSecondaryIndexName = "UsernameIndex", attributeName = "Username")
    @JsonProperty("username")
    private String username;

    @DynamoDBIndexHashKey(globalSecondaryIndexName = "EmailIndex", attributeName = "Email")
    @JsonProperty("email")
    private String email;
    //getter/setter 方法
}

2. 新用戶注冊Lambda,發(fā)布Json格式的User到SNS

lambda-userregistration工程的Handler類,在用戶注冊成功后,將User對象格式為JSON格式,并發(fā)布到SNS中。重點關注notifySnsSubscribers的new ObjectMapper().writeValueAsString(user)代碼,將序列化的User對象發(fā)布到UserRegistrationSnsTopic的主題中。

    private void notifySnsSubscribers(User user) {
      try {
        //發(fā)布UserRegistrationSnsTopic主題,內(nèi)容序列化的User對象
        amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), new ObjectMapper().writeValueAsString(user));
        LOGGER.info("SNS notification sent for "+user.getEmail());
      } catch (Exception anyException) {
        LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
      }
    }

3. 訂閱SNS消息并反序列化User對象發(fā)郵件

lambda-userregistration-welcomemail訂閱SNS消息,并反序列化User對象,為方便后續(xù)其他Lambda(如:lambda-userregistration-cloudsearch工程的Lambda)接受SNS消息并反序列化的重復操作,我們創(chuàng)建一個接受SNS消息并反序列化的Lambda基類(SnsLambdaHandler)。

public abstract class SnsLambdaHandler<I> implements RequestHandler<SNSEvent, Void> {

    private static final Logger LOGGER = Logger.getLogger(SnsLambdaHandler.class);

    private final ObjectMapper objectMapper;

    protected SnsLambdaHandler() {
        objectMapper=new ObjectMapper();
    }
    //需要子類實現(xiàn)的方法
    public abstract void handleSnsRequest(I input, Context context);

    @SuppressWarnings("unchecked")
    private Class<I> getJsonType() {
        return (Class<I>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    @Override
    public Void handleRequest(SNSEvent input, Context context) {
        //接受SNS消息
        input.getRecords().forEach(snsMessage -> {
            try {
                //接受消息,并反序列化
                I deserializedPayload = objectMapper.readValue(snsMessage.getSNS().getMessage(), getJsonType());
                handleSnsRequest(deserializedPayload, context);
            } catch (IOException anyException) {
                LOGGER.error("JSON could not be deserialized", anyException);
            }
        });
        return null;
    }
}

訂閱SNS的Lambda類修改為繼承SnsLambdaHandler類,并在類中實現(xiàn)public abstract void handleSnsRequest(I input, Context context);方法。由于之前是Eamil文本,現(xiàn)將User格式化為JSON,影響之前發(fā)送郵件業(yè)務邏輯,稍作修改,如果不關注發(fā)郵件邏輯可以直接跳過。

public class Handler extends SnsLambdaHandler<User> {
  //.....
  private void sendEmail(final User user) {
    final String emailAddress = user.getEmail();
    //收件地址
    Destination destination = new Destination().withToAddresses(emailAddress);
    Message message = new Message()
        .withBody(new Body().withText(new Content("Welcome to our forum!")))
        .withSubject(new Content("Welcome!"));
    //發(fā)送郵件,發(fā)件地址從配置的環(huán)境變量中獲取
    //......
  }

  @Override
  public void handleSnsRequest(User input, Context context){
    //收到的是標準的SNSEvent事件
    //getRecords()返回的是一個列表,表示Lambda可能一次收多條SNS消息。
    //input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
    //return null;
    sendEmail(input);
  }
}

4. 訂閱SNS消息并上傳Json格式User到CloudSearch

寫入CloudSearch的Lambda同樣需要訂閱SNS并且需要將User反序列化,所以繼承SnsLambdaHandler減少重復接受訂閱消息和反序列化。
JSON格式數(shù)據(jù)寫入CloudSearch,我們需要CloudSearch的AmazonCloudSearchDomainClient類幫忙,創(chuàng)建該類的同時需要設置Endpoint(即某個CloudSearch的Search Endpoint)的值。

Search Endpoint的值

]
還有一點需要注意,在uploadDocument時JSON格式需要如下方式(支持批量),id,type,fields都是必填字段。id是用來表示唯一性字段,type字段有add和delete分別是用來新增和刪除文檔內(nèi)容。fields則是需要搜索的字段內(nèi)容。下面類的uploadDocument方法主要功能是組裝并寫入CloudSearch。

[{
    "id": "1234-1234-1234",
    "type": "add",
    "fields": {
        "userid": "1234-1234-1234",
        "eamil": "abc@abc.com",
        "username": "testtest"
    }
}]

寫入CloudSearch的Lambda函數(shù):

public class Handler extends SnsLambdaHandler<User> {
    private static final Injector INJECTOR = Guice.createInjector();
    private static final Logger LOGGER = Logger.getLogger(Handler.class);
    private AmazonCloudSearchDomainClient amazonCloudSearchDomainClient;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Inject
    public Handler setAmazonCloudSearchDomainClient(AmazonCloudSearchDomainClient amazonCloudSearchDomainClient) {
        //獲取CloudSearch的端點
        this.amazonCloudSearchDomainClient = amazonCloudSearchDomainClient;
        this.amazonCloudSearchDomainClient.setEndpoint(System.getenv("CloudSearchDomain"));
        return this;
    }

    public Handler() {
        INJECTOR.injectMembers(this);
        Objects.nonNull(amazonCloudSearchDomainClient);
    }

    //更新CloudSearch文檔
    private void uploadDocument(User user) {
        try {
            //創(chuàng)建CloudSearchAPI需要的數(shù)據(jù)格式,add,id,fields鍵是必須的。
            final Map<String, Object> documentRequest = new HashMap<>();
            documentRequest.put("type", "add");
            documentRequest.put("id", user.getId());
            documentRequest.put("fields", user);
            LOGGER.info("User with id " + user.getId() + " is being uploaded to CloudSearch");
            //documentRequest對象轉(zhuǎn)為byte數(shù)組
            byte[] jsonAsByteStream = objectMapper.writeValueAsBytes(new Map[]{documentRequest});
            if (jsonAsByteStream != null) {
                ByteArrayInputStream document = new ByteArrayInputStream(jsonAsByteStream);
                amazonCloudSearchDomainClient.uploadDocuments(new UploadDocumentsRequest()
                        .withDocuments(document)
                        .withContentLength((long) document.available())
                        .withContentType(ContentType.Applicationjson)
                );
            }
        } catch (JsonProcessingException jsonProcessingException) {
            LOGGER.error("Object could not be converted to JSON", jsonProcessingException);
        } catch (Exception anyException) {
            LOGGER.error("Upload was failing", anyException);
        }
    }

    @Override
    public void handleSnsRequest(User input, Context context) {
        uploadDocument(input);
    }
}

該類中重點的依賴包:aws-java-sdk-cloudsearch是cloudsearch的SDK,具體依賴關系配置詳見該工程下的build.gradle文件。
同樣配置Lambda的cloudformation與其他Lambda配置相同,詳見cloudformation.template中的UserRegistrationCloudSearchLambda,UserRegistrationCloudSearchLambdaPermission。
最后一步:./gradlew deploy部署工程,部署成功后。
通過https://<youdomain>/users,body數(shù)據(jù)
{"username":"testuser24","email":"lazy24@163.com"}提交注冊信息。
通過https://<youdomain>/search?q=testuser24檢索到新注冊用戶數(shù)據(jù)。

檢索到的數(shù)據(jù)

異常一

com.amazonaws.services.cloudsearchdomain.model.AmazonCloudSearchDomainException: User: arn:aws:sts::083845954160:assumed-role/serverlessbook-LambdaExecutionRole-1CQQ1SF5ASHEB/serverlessbook-UserRegistrationCloudSearchLambda-WI941096GZTW is not authorized to perform: cloudsearch:document on resource: serverlessbook (Service: AmazonCloudSearchDomain; Status Code: 403; Error Code: AccessDenied; Request ID: ebb327dc-6ff3-4a3f-8e92-65986e76babd; Proxy: null)

提示主要是Lambda在uploaddocument的時候沒有權(quán)限。
解決方案:在對應的Lambda的Role中添加"arn:aws:iam::aws:policy/CloudSearchFullAccess"。該工程cloudformation.tempalte中涉及的是LambdaExecutionRole

"LambdaExecutionRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "Path": "/",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "lambda.amazonaws.com",
                        "apigateway.amazonaws.com"
                    ]
                },
                "Action": [
                    "sts:AssumeRole"
                ]
            }]
        },
        "ManagedPolicyArns": [
            "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
            "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
            "arn:aws:iam::aws:policy/AWSLambdaFullAccess",
            "arn:aws:iam::aws:policy/CloudSearchFullAccess"
        ]
    }
}

Github代碼地址:https://github.com/zhujinhuant/serverlessbook/tree/master/serverlessbook-15

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容