如何使用aiobotocore模拟AWS S3
问题描述:
我有一个项目使用aiohttp和aiobotocore来处理AWS中的资源。我正在测试适用于AWS S3的类,并且我正在使用moto来模拟AWS。惩戒工作得与使用同步代码(例如,从摩托文档)的实例细如何使用aiobotocore模拟AWS S3
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
def save(self):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
model_instance = MyModel('steve', 'is awesome')
model_instance.save()
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
然而,改写这个使用aiobotocore嘲讽不工作后 - 它连接到真正的AWS S3在我的例子。
import aiobotocore
import asyncio
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
async def save(self, loop):
session = aiobotocore.get_session(loop=loop)
s3 = session.create_client('s3', region_name='us-east-1')
await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
loop = asyncio.get_event_loop()
model_instance = MyModel('steve', 'is awesome')
loop.run_until_complete(model_instance.save(loop=loop))
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
所以我的假设是,摩托车不能正常工作与aiobotocore。如果我的源代码如第二个示例中所示,我如何有效地模拟AWS资源?
答
来自moto
的嘲笑不起作用,因为它们使用的是同步API。 但您可以启动moto服务器并配置aiobotocore以连接到此测试服务器。 看看aiobotocore测试的灵感。
答
下面是aiobotocore mock_server.py没有pytest:
# Initially from https://raw.githubusercontent.com/aio-libs/aiobotocore/master/tests/mock_server.py
import shutil
import signal
import subprocess as sp
import sys
import time
import requests
_proxy_bypass = {
"http": None,
"https": None,
}
def start_service(service_name, host, port):
moto_svr_path = shutil.which("moto_server")
args = [sys.executable, moto_svr_path, service_name, "-H", host,
"-p", str(port)]
process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL)
url = "http://{host}:{port}".format(host=host, port=port)
for _ in range(30):
if process.poll() is not None:
break
try:
# we need to bypass the proxies due to monkeypatches
requests.get(url, timeout=0.1, proxies=_proxy_bypass)
break
except requests.exceptions.RequestException:
time.sleep(0.1)
else:
stop_process(process)
raise AssertionError("Can not start service: {}".format(service_name))
return process
def stop_process(process, timeout=20):
try:
process.send_signal(signal.SIGTERM)
process.communicate(timeout=timeout/2)
except sp.TimeoutExpired:
process.kill()
outs, errors = process.communicate(timeout=timeout/2)
exit_code = process.returncode
msg = "Child process finished {} not in clean way: {} {}" \
.format(exit_code, outs, errors)
raise RuntimeError(msg)