从AWS Kinesis fireshose写入实木复合地板到AWS S3
我想从kinesis firehose中将数据接收到s3中,格式化为实木复合地板。到目前为止,我刚刚找到一个解决方案,意味着创建一个EMR,但我正在寻找更便宜和更快的东西,比如将接收的json直接从firehose存储为parquet或使用Lambda函数。从AWS Kinesis fireshose写入实木复合地板到AWS S3
非常感谢你, 哈维。
对付AWS支持服务之后,不同的实现百,我想解释一下我所取得的成就。
最后,我已经创建了一个处理由室壁运动流水生成的每个文件lambda函数,根据有效载荷分类我的活动,并将结果存储在S3木地板的文件。
这样做,是不是很容易的:
你应该创建一个Python虚拟ENV,包括所有需要的库文件(在我的情况熊猫,NumPy的,Fastparquet等)的-
第一。 由于结果文件(包括所有库和我的Lambda函数很重,需要启动一个EC2实例,我使用了免费层中包含的实例)。要创建虚拟ENV请按照下列步骤操作:
- 登录在EC2
- 创建一个文件夹名为拉姆达(或任何其他名称)
- 须藤荫-y更新
- 须藤荫-y升级
- 须藤荫-y groupinstall “开发工具”
- 须藤荫-y安装BLAS
- 须藤荫-y安装LAPACK
- 须藤荫-y安装atlas-SSE3-devel的
- 须藤荫安装python27-devel的python27-PIP GCC
- VIRTUALENV ENV
- 源ENV /斌/激活
- PIP安装boto3
- PIP安装fastparquet
- PIP安装熊猫
- PIP安装thriftpy
- PIP安装s3fs
- pip install(任何其他需要的库)
- find〜/ lambda/env/lib */python2.7/site-packages/-name“* .so”| xargs的剥离
- PUSHD ENV/LIB/python2.7 /站点包/
- 拉链-r -9 -q〜/ lambda.zip *
- Popd的
- PUSHD ENV/lib64下/ python2.7 /站点包/
- 拉链-r -9 -q〜/ lambda.zip *
- Popd的
-
创建lambda_function propertly:
import json import boto3 import datetime as dt import urllib import zlib import s3fs from fastparquet import write import pandas as pd import numpy as np import time def _send_to_s3_parquet(df): s3_fs = s3fs.S3FileSystem() s3_fs_open = s3_fs.open # FIXME add something else to the key or it will overwrite the file key = 'mybeautifullfile.parquet.gzip' # Include partitions! key1 and key2 write('ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df, compression='GZIP',open_with=s3_fs_open) def lambda_handler(event, context): # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']) try: s3 = boto3.client('s3') response = s3.get_object(Bucket=bucket, Key=key) data = response['Body'].read() decoded = data.decode('utf-8') lines = decoded.split('\n') # Do anything you like with the dataframe (Here what I do is to classify them # and write to different folders in S3 according to the values of # the columns that I want df = pd.DataFrame(lines) _send_to_s3_parquet(df) except Exception as e: print('Error getting object {} from bucket {}.'.format(key, bucket)) raise e
-
复制lambda函数的lambda.zip和部署lambda_function:
- 回到你的EC2实例,并添加所需的拉链lambda函数:拉链-9 lambda.zip lambda_function.py (lambda_function.py是在步骤2中生成的文件)
- 将生成的zip文件复制到S3,因为它非常繁重,无法通过S3进行部署。 aws s3 cp lambda.zip s3:// support-bucket/lambda_packages/
- 部署lambda函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages /lambda.zip
触发时你喜欢的要执行,例如,每一个新的文件在S3创建,甚至你可以在lambda函数到流水相关联的时间。 (我没有选择这个选项,因为'lambda'限制低于Firehose限制,您可以配置Firehose每个128Mb或15分钟写入一个文件,但是如果将此lambda函数与Firehose关联,则将执行lambda函数每3分钟或5MB,在我的情况下,我产生了很多小木地板文件的问题,因为每次启动lambda函数时,我都会生成至少10个文件)。
亚马逊室壁运动流水接收流式传输的记录,并且可以将它们存储在亚马逊S3(或Amazon红移或Amazon Elasticsearch服务)。
每条记录最大可达1000KB。
然而,记录被一起追加到一个文本文件中,有基于时间或大小配料。传统上,记录是JSON格式。
您将是无法发送实木复合地板文件,因为它不符合此文件格式。
是可能的触发LAMBDA数据变换功能,但这将不能够或者输出一个拼花文件。
实际上,考虑到镶木地板文件的性质,您不可能一次构建一个记录。作为一种列式存储格式,我怀疑它们确实需要在批处理中创建,而不是每个记录附加数据。
底线:都能跟得上。
非常感谢你。 – bracana
Hi @Javi,如果这个或任何答案已解决您的问题,请点击复选标记考虑[接受它](http://meta.stackexchange.com/q/5234/179419)。这向更广泛的社区表明,您已经找到了解决方案,并为答复者和您自己提供了一些声誉。没有义务这样做。 –
@JohnRotenstein您可以让lambda对Firehose的每个缓冲时间/大小的批处理进行转换,并且稍后每隔几个小时将Parquet文件拼接成更大的大小?这可让您通过FireHose将JSON流式传输到Parquet,以便在Athena中获得接近实时的数据,并且仍然可以获得Parquet的性能优势。 – earmouse