本文通过把存储在S3中的csv文件导入到Aurora数据库实例中,同时订阅一个Amazon Simple Notification Service(SNS)来通知到指定邮箱。
资源清单
- 一个MyDB Postgresql数据库,一张表venues
- 一个可以连接到数据库的EC2
- 一个SNS主题myUpdatedSNS
- 一个Lambda函数myNotifyUpdate
- 一个S3桶mys3-pg-test,包含initial.csv,additional.csv,test.csv
创建S3
TestVenues.csv
201,Portico,Concert_hall,LosFake,MN,21008,USA,limited
202,Barn,Indoor_small,Faketown,FL,78966,USA,closed
203,Barn,Indoor_small,LakeFake,CA,85820,USA,closed
InitialVenues.csv
0,Grounds,Outdoor_large,Faketown,WA,35238,USA,limited
1,Center,Indoor_small,LosFake,MN,86411,USA,limited
2,Hall,Indoor_small,Fakespot,TN,35353,USA,limited
AdditionalVenues.csv
351,Amphitheater,Concert_hall,San Fake,FL,15560,USA,closed
352,Complex,Outdoor_large,Faketown,WA,65095,USA,open
353,Stadium,Outdoor_large,LakeFake,WA,92033,USA,open
创建SNS主题
创建Lambda函数
import json
import boto3
import os
def lambda_handler(event, context):
client = boto3.client('sns')
try:
response = client.publish(TargetArn=os.environ['SNSarn'], Message=json.dumps(event))
except:
print("an error occurred")
return {
'statusCode': 200,
'body': json.dumps(event)
}
注意:用SNS的ARN来替换这个值
另外,需要给Lambda赋予访问SNS的权限:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:*",
"SNS:*",
"ec2:CreateNetworkInterface"
],
"Resource": "*",
"Effect": "Allow"
}
]
}
创建Role
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "rds.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
创建PG数据库
后续部分默认设置即可。
数据库绑定角色
数据库初始化
-- Table: public.venues
-- DROP TABLE IF EXISTS public.venues;
CREATE TABLE IF NOT EXISTS public.venues
(
venue_id integer NOT NULL,
venue_name text COLLATE pg_catalog."default",
venue_type text COLLATE pg_catalog."default",
venue_city text COLLATE pg_catalog."default",
venue_state text COLLATE pg_catalog."default",
venue_zip integer,
venue_country text COLLATE pg_catalog."default",
venue_status text COLLATE pg_catalog."default",
CONSTRAINT venues_pkey PRIMARY KEY (venue_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public.venues
OWNER to dbadmin;
创建S3扩展表
CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;
从S3导入InitialVenues的数据
注意:AK/SK需要替换成自己账号的属性。
SELECT aws_s3.table_import_from_s3(
'venues',
'',
'(format csv)',
'mys3-pg-test',
'InitialVenues.csv',
'cn-north-1',
'AK',
'SK',
''
);
Lambda函数发送SNS
创建Lamdba扩展
CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE;
Lamdba函数触发SNS消息
SELECT * from aws_lambda.invoke(aws_commons.create_lambda_function_arn('arn:aws-cn:lambda:cn-north-1:751318751053:function:myNotifyUpdate', 'cn-north-1'), '{"body": "Data ingested from s3 file"}'::json );