无法在远程计算机上使用asynio和aiohttp创建大量的帖子vs本地

问题描述:

我编写了一个程序,该程序将使用asyncioaiohttp来发布事件。当我在本地运行它时,该程序起作用。我可以发布10k事件没有问题。不过,我SCPed整个代码库到远程机器和机器内我不能没有收到此错误后超过15个事件:无法在远程计算机上使用asynio和aiohttp创建大量的帖子vs本地

RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a53989410> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a5397ffc0> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 

如何调试这个或找到这个问题的根源?

这里是我创建的类和我使用的方法post()运行:

import uuid 
import os 
import asyncio 
import time 
import random 
import json 
import aiohttp 
from tracer.utils.phase import Phase 

class Poster(Phase): 
    def __init__(self, log, endpoint, num_post, topic, datafile, timeout, oracles, secure=False, thru_proxy=True): 
     Phase.__init__(self, log, "post", oracles, secure, thru_proxy) 
     self.log = log 
     self.num_post = int(num_post) 
     self.datafile = datafile.readlines() 
     self.topic = topic 
     self.endpoint = self.set_endpoint(endpoint, self.topic) 
     self.response = None 
     self.timeout = timeout 

    def random_line(self): 
     """ Returns random line from file and converts it to JSON """ 
     return json.loads(random.choice(self.datafile)) 

    @staticmethod 
    def change_uuid(event): 
     """ Creates new UUID for event_id """ 
     new_uuid = str(uuid.uuid4()) 
     event["event_header"]["event_id"] = new_uuid 
     return event 

    @staticmethod 
    def wrapevent(event): 
     """ Wrap event with metadata for analysis later on """ 
     return { 
      "tracer": { 
       "post": { 
        "statusCode": None, 
        "timestamp": None, 
       }, 
       "awsKafkaTimestamp": None, 
       "qdcKakfaTimestamp": None, 
       "hdfsTimestamp": None 
      }, 
      "event": event 
     } 

    def gen_random_event(self): 
     random_event = self.random_line() 
     event = self.change_uuid(random_event) 
     dataspec = self.wrapevent(event) 
     return dataspec 

    async def async_post_event(self, event, session): 
     async with session.post(self.endpoint, data=event, proxy=self.proxy) as resp: 
      event["tracer"]["post"]["timestamp"] = time.time() * 1000.0 
      event["tracer"]["post"]["statusCode"] = resp.status 
      unique_id = event["event"]["event_header"]["event_id"] 
      oracle_endpoint = os.path.join(self.oracle, unique_id) 
     async with session.put(oracle_endpoint, data=json.dumps(event), proxy=self.proxy) as resp: 
      if resp.status != 200: 
       self.log.debug("Post to ElasticSearch not 200") 
       self.log.debug(event["event"]["event_header"]["event_id"]) 
       self.log.debug("Status code: " + str(resp.status)) 
      return event["event"]["event_header"]["event_id"], resp.status 

    async def async_post_events(self, events): 
     coros = [] 
     conn = aiohttp.TCPConnector(verify_ssl=self.secure) 
     async with aiohttp.ClientSession(connector=conn) as session: 
      for event in events: 
       coros.append(self.async_post_event(event, session)) 
      return await asyncio.gather(*coros) 

    def post(self): 
     event_loop = asyncio.get_event_loop() 
     try: 
      events = [self.gen_random_event() for i in range(self.num_post)] 
      start_time = time.time() 
      results = event_loop.run_until_complete(self.async_post_events(events)) 
      print("Time taken: " + str(time.time() - start_time)) 
     finally: 
      event_loop.close() 

你不能,一旦它的关闭,重新使用循环。从AbstractEventLoop.close文档:

This is idempotent and irreversible. No other methods should be called after this one.

要么删除loop.close来电或为每一个岗位一个新的循环。

我的建议是通过运行循环中的所有内容并在需要时等待async_post_events来避免这些问题。