AWS数据管道CSV数据DynamoDB
我试图使用AWS管道从S3斗CSV数据传输到DynamoDB,以下是我管行脚本,它不能正常工作,AWS数据管道CSV数据DynamoDB
CSV文件结构
Name, Designation,Company
A,TL,C1
B,Prog, C2
DynamoDb:N_Table,与名称的哈希值
{
"objects": [
{
"id": "Default",
"scheduleType": "cron",
"name": "Default",
"role": "DataPipelineDefaultRole",
"resourceRole": "DataPipelineDefaultResourceRole"
},
{
"id": "DynamoDBDataNodeId635",
"schedule": {
"ref": "ScheduleId639"
},
"tableName": "N_Table",
"name": "MyDynamoDBData",
"type": "DynamoDBDataNode"
},
{
"emrLogUri": "s3://onlycsv/error",
"id": "EmrClusterId636",
"schedule": {
"ref": "ScheduleId639"
},
"masterInstanceType": "m1.small",
"coreInstanceType": "m1.xlarge",
"enableDebugging": "true",
"installHive": "latest",
"name": "ImportCluster",
"coreInstanceCount": "1",
"logUri": "s3://onlycsv/error1",
"type": "EmrCluster"
},
{
"id": "S3DataNodeId643",
"schedule": {
"ref": "ScheduleId639"
},
"directoryPath": "s3://onlycsv/data.csv",
"name": "MyS3Data",
"dataFormat": {
"ref": "DataFormatId1"
},
"type": "S3DataNode"
},
{
"id": "ScheduleId639",
"startDateTime": "2013-08-03T00:00:00",
"name": "ImportSchedule",
"period": "1 Hours",
"type": "Schedule",
"endDateTime": "2013-08-04T00:00:00"
},
{
"id": "EmrActivityId637",
"input": {
"ref": "S3DataNodeId643"
},
"schedule": {
"ref": "ScheduleId639"
},
"name": "MyImportJob",
"runsOn": {
"ref": "EmrClusterId636"
},
"maximumRetries": "0",
"myDynamoDBWriteThroughputRatio": "0.25",
"attemptTimeout": "24 hours",
"type": "EmrActivity",
"output": {
"ref": "DynamoDBDataNodeId635"
},
"step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
},
{
"id": "DataFormatId1",
"name": "DefaultDataFormat1",
"column": [
"Name",
"Designation",
"Company"
],
"columnSeparator": ",",
"recordSeparator": "\n",
"type": "Custom"
}
]
}
出的四个步骤在执行管道,两者得到完成,但它并没有完全执行
我会建议使用datapipeline,而不是定制提供的CSV数据格式。
为了调试集群中的错误,你可以查找在EMR控制台jobflow并查看失败任务的日志文件。
如果您正在使用从S3导入数据到DynamoDB的模板数据管道,这些dataformats将无法正常工作。请使用下面链接中的格式来存储输入S3数据文件http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-pipelinejson-verifydata2.html
此格式的模板数据管道生成的输出文件将数据从DynamoDB导出到S3。
希望有所帮助。
目前(2015-04)默认进口管道模板不支持导入CSV文件。
如果您的CSV文件不是太大(不到1GB左右),您可以创建一个ShellCommandActivity,将CSV转换为DynamoDB JSON格式,并将EmrActivity将其导入到您的表格中。
正如你可以创建样品DynamoDB表,包括你所需要的,填充虚拟值,然后用管道(导出/导入按钮DynamoDB控制台)导出记录的所有字段类型的第一步。这将为您提供有关导入管道预期格式的想法。类型名称不明显,导入活动对于正确的大小写非常敏感(例如,您应该为boolean字段使用bool)。
之后,应该很容易创建一个awk脚本(或任何其他文本转换器,至少在awk中,您可以使用默认的AMI映像作为您的shell活动),您可以将它们提供给shellCommandActivity。不要忘记启用“暂存”标记,因此您的输出将上传回S3以供导入活动提取。
见下面链接的作品(问题部分)的解决方案,尽管EMR 3.x的只需将分隔符更改为"columnSeparator": ","
即可。就个人而言,除非您确定数据正确清理,否则我不会做CSV。
How to upgrade Data Pipeline definition from EMR 3.x to 4.x/5.x?