CloudSearch支持SDK方式上傳數(shù)據(jù),可通過創(chuàng)建一個Lambda函數(shù)來讀取SNS通知消息,并將值寫入CloudSearch中。那么實現(xiàn)通過/search?q=newuser搜索到新注冊的用戶信息的功能,就可以在新用戶注冊發(fā)布到SNS中信息以JSON格式寫入,再用一個Lambda訂閱JSON格式數(shù)據(jù)并寫入CloudSearch實現(xiàn)。
- User對象序列化為JSON格式
- 新用戶注冊Lambda,發(fā)布Json格式的User到SNS
- 訂閱SNS消息并反序列化User對象發(fā)郵件
- 訂閱SNS消息并上傳Json格式User到CloudSearch
工程說明
工程主要是通過SNS發(fā)布訂閱,lambda-userregistration-cloudsearch(工程)接受訂閱信息并寫入CloudSearch中,lambda-userregistration-welcomemail(工程)接受訂閱信息并發(fā)送郵件給新注冊用戶。

1. User對象序列化為JSON格式
為方便Lambda接收SNS的用戶數(shù)據(jù)為JSON,所以在新用戶注冊成功后發(fā)布到SNS,就需要將User對象格式化為JSON。具體需要將service-user工程中的User類添加Json注釋(@JsonProperty)。
/*
User POJO類
*/
public class User {
@DynamoDBHashKey(attributeName = "UserId")
@JsonProperty("userid")
private String id;
@DynamoDBIndexHashKey(globalSecondaryIndexName = "UsernameIndex", attributeName = "Username")
@JsonProperty("username")
private String username;
@DynamoDBIndexHashKey(globalSecondaryIndexName = "EmailIndex", attributeName = "Email")
@JsonProperty("email")
private String email;
//getter/setter 方法
}
2. 新用戶注冊Lambda,發(fā)布Json格式的User到SNS
lambda-userregistration工程的Handler類,在用戶注冊成功后,將User對象格式為JSON格式,并發(fā)布到SNS中。重點關注notifySnsSubscribers的new ObjectMapper().writeValueAsString(user)代碼,將序列化的User對象發(fā)布到UserRegistrationSnsTopic的主題中。
private void notifySnsSubscribers(User user) {
try {
//發(fā)布UserRegistrationSnsTopic主題,內(nèi)容序列化的User對象
amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), new ObjectMapper().writeValueAsString(user));
LOGGER.info("SNS notification sent for "+user.getEmail());
} catch (Exception anyException) {
LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
}
}
3. 訂閱SNS消息并反序列化User對象發(fā)郵件
lambda-userregistration-welcomemail訂閱SNS消息,并反序列化User對象,為方便后續(xù)其他Lambda(如:lambda-userregistration-cloudsearch工程的Lambda)接受SNS消息并反序列化的重復操作,我們創(chuàng)建一個接受SNS消息并反序列化的Lambda基類(SnsLambdaHandler)。
public abstract class SnsLambdaHandler<I> implements RequestHandler<SNSEvent, Void> {
private static final Logger LOGGER = Logger.getLogger(SnsLambdaHandler.class);
private final ObjectMapper objectMapper;
protected SnsLambdaHandler() {
objectMapper=new ObjectMapper();
}
//需要子類實現(xiàn)的方法
public abstract void handleSnsRequest(I input, Context context);
@SuppressWarnings("unchecked")
private Class<I> getJsonType() {
return (Class<I>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
@Override
public Void handleRequest(SNSEvent input, Context context) {
//接受SNS消息
input.getRecords().forEach(snsMessage -> {
try {
//接受消息,并反序列化
I deserializedPayload = objectMapper.readValue(snsMessage.getSNS().getMessage(), getJsonType());
handleSnsRequest(deserializedPayload, context);
} catch (IOException anyException) {
LOGGER.error("JSON could not be deserialized", anyException);
}
});
return null;
}
}
訂閱SNS的Lambda類修改為繼承SnsLambdaHandler類,并在類中實現(xiàn)public abstract void handleSnsRequest(I input, Context context);方法。由于之前是Eamil文本,現(xiàn)將User格式化為JSON,影響之前發(fā)送郵件業(yè)務邏輯,稍作修改,如果不關注發(fā)郵件邏輯可以直接跳過。
public class Handler extends SnsLambdaHandler<User> {
//.....
private void sendEmail(final User user) {
final String emailAddress = user.getEmail();
//收件地址
Destination destination = new Destination().withToAddresses(emailAddress);
Message message = new Message()
.withBody(new Body().withText(new Content("Welcome to our forum!")))
.withSubject(new Content("Welcome!"));
//發(fā)送郵件,發(fā)件地址從配置的環(huán)境變量中獲取
//......
}
@Override
public void handleSnsRequest(User input, Context context){
//收到的是標準的SNSEvent事件
//getRecords()返回的是一個列表,表示Lambda可能一次收多條SNS消息。
//input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
//return null;
sendEmail(input);
}
}
4. 訂閱SNS消息并上傳Json格式User到CloudSearch
寫入CloudSearch的Lambda同樣需要訂閱SNS并且需要將User反序列化,所以繼承SnsLambdaHandler減少重復接受訂閱消息和反序列化。
JSON格式數(shù)據(jù)寫入CloudSearch,我們需要CloudSearch的AmazonCloudSearchDomainClient類幫忙,創(chuàng)建該類的同時需要設置Endpoint(即某個CloudSearch的Search Endpoint)的值。

]
還有一點需要注意,在uploadDocument時JSON格式需要如下方式(支持批量),id,type,fields都是必填字段。id是用來表示唯一性字段,type字段有add和delete分別是用來新增和刪除文檔內(nèi)容。fields則是需要搜索的字段內(nèi)容。下面類的
uploadDocument方法主要功能是組裝并寫入CloudSearch。
[{
"id": "1234-1234-1234",
"type": "add",
"fields": {
"userid": "1234-1234-1234",
"eamil": "abc@abc.com",
"username": "testtest"
}
}]
寫入CloudSearch的Lambda函數(shù):
public class Handler extends SnsLambdaHandler<User> {
private static final Injector INJECTOR = Guice.createInjector();
private static final Logger LOGGER = Logger.getLogger(Handler.class);
private AmazonCloudSearchDomainClient amazonCloudSearchDomainClient;
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public Handler setAmazonCloudSearchDomainClient(AmazonCloudSearchDomainClient amazonCloudSearchDomainClient) {
//獲取CloudSearch的端點
this.amazonCloudSearchDomainClient = amazonCloudSearchDomainClient;
this.amazonCloudSearchDomainClient.setEndpoint(System.getenv("CloudSearchDomain"));
return this;
}
public Handler() {
INJECTOR.injectMembers(this);
Objects.nonNull(amazonCloudSearchDomainClient);
}
//更新CloudSearch文檔
private void uploadDocument(User user) {
try {
//創(chuàng)建CloudSearchAPI需要的數(shù)據(jù)格式,add,id,fields鍵是必須的。
final Map<String, Object> documentRequest = new HashMap<>();
documentRequest.put("type", "add");
documentRequest.put("id", user.getId());
documentRequest.put("fields", user);
LOGGER.info("User with id " + user.getId() + " is being uploaded to CloudSearch");
//documentRequest對象轉(zhuǎn)為byte數(shù)組
byte[] jsonAsByteStream = objectMapper.writeValueAsBytes(new Map[]{documentRequest});
if (jsonAsByteStream != null) {
ByteArrayInputStream document = new ByteArrayInputStream(jsonAsByteStream);
amazonCloudSearchDomainClient.uploadDocuments(new UploadDocumentsRequest()
.withDocuments(document)
.withContentLength((long) document.available())
.withContentType(ContentType.Applicationjson)
);
}
} catch (JsonProcessingException jsonProcessingException) {
LOGGER.error("Object could not be converted to JSON", jsonProcessingException);
} catch (Exception anyException) {
LOGGER.error("Upload was failing", anyException);
}
}
@Override
public void handleSnsRequest(User input, Context context) {
uploadDocument(input);
}
}
該類中重點的依賴包:aws-java-sdk-cloudsearch是cloudsearch的SDK,具體依賴關系配置詳見該工程下的build.gradle文件。
同樣配置Lambda的cloudformation與其他Lambda配置相同,詳見cloudformation.template中的UserRegistrationCloudSearchLambda,UserRegistrationCloudSearchLambdaPermission。
最后一步:./gradlew deploy部署工程,部署成功后。
通過https://<youdomain>/users,body數(shù)據(jù)
{"username":"testuser24","email":"lazy24@163.com"}提交注冊信息。
通過https://<youdomain>/search?q=testuser24檢索到新注冊用戶數(shù)據(jù)。

異常一
com.amazonaws.services.cloudsearchdomain.model.AmazonCloudSearchDomainException: User: arn:aws:sts::083845954160:assumed-role/serverlessbook-LambdaExecutionRole-1CQQ1SF5ASHEB/serverlessbook-UserRegistrationCloudSearchLambda-WI941096GZTW is not authorized to perform: cloudsearch:document on resource: serverlessbook (Service: AmazonCloudSearchDomain; Status Code: 403; Error Code: AccessDenied; Request ID: ebb327dc-6ff3-4a3f-8e92-65986e76babd; Proxy: null)
提示主要是Lambda在uploaddocument的時候沒有權(quán)限。
解決方案:在對應的Lambda的Role中添加"arn:aws:iam::aws:policy/CloudSearchFullAccess"。該工程cloudformation.tempalte中涉及的是LambdaExecutionRole。
"LambdaExecutionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"Path": "/",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com",
"apigateway.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}]
},
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AWSLambdaFullAccess",
"arn:aws:iam::aws:policy/CloudSearchFullAccess"
]
}
}
Github代碼地址:https://github.com/zhujinhuant/serverlessbook/tree/master/serverlessbook-15