如何有效地过滤任意长度的元组作为键的字典?

问题描述:

TL; DR如何有效地过滤任意长度的元组作为键的字典?

什么是实现具有可变维数键的字典的过滤函数的最有效方法?过滤器应采用与字典键相同尺寸的元组,并输出字典中与过滤器匹配的所有键,例如filter[i] is None or filter[i] == key[i]的所有尺寸为i


在我当前的项目,我需要大量的数据处理字典。字典的一般结构是这样的,它包含具有2到4个整数作为键和整数作为值的元组。字典中的所有键具有相同的尺寸。为了说明,字典的例子如下我需要处理:

{(1, 2): 1, (1, 5): 2} 
{(1, 5, 3): 2} 
{(5, 2, 5, 2): 8} 

这些字典在约20 000项中含有大量的条目,其中最大的。我经常需要过滤这些条目,但通常只查看关键元组的某些索引。理想情况下,我想要一个函数,我可以提供一个过滤器元组。该函数然后应该返回匹配过滤器元组的所有键。如果过滤器元组包含None条目,则这将匹配该索引处字典的关键元组中的任何值。功能应该字典做什么用2维方向键

例子:

>>> dict = {(1, 2): 1, (1, 5): 2, (2, 5): 1, (3, 9): 5} 
>>> my_filter_fn((1, None)) 
{(1, 2), (1, 5)} 
>>> my_filter_fn((None, 5)) 
{(1, 5), (2, 5)} 
>>> my_filter_fn((2, 4)) 
set() 
>>> my_filter_fn((None, None)) 
{(1, 2), (1, 5), (2, 5), (3, 9)} 

正如我的词典有自己的元组不同尺寸,我试图通过编写一个生成器表达式这需要解决这个问题该元组的尺寸考虑在内:

def my_filter_fn(entries: dict, match: tuple): 
    return (x for x in entries.keys() if all(match[i] is None or match[i] == x[i] 
              for i in range(len(key)))) 

不幸的是,这是一个相当慢的与通过手((match[0] is None or match[0] === x[0]) and (match[1] is None or match[1] == x[1])完全写出条件;对于4维而言,这大约慢了10倍。这对我来说是一个问题,因为我需要经常进行这种过滤。

以下代码演示了性能问题。代码仅用于说明问题并能够复制测试。您可以跳过代码部分,结果如下。

import random 
import timeit 


def access_variable_length(): 
    for key in entry_keys: 
     for k in (x for x in all_entries.keys() if all(key[i] is None or key[i] == x[i] 
                 for i in range(len(key)))): 
      pass 


def access_static_length(): 
    for key in entry_keys: 
     for k in (x for x in all_entries.keys() if 
        (key[0] is None or x[0] == key[0]) 
        and (key[1] is None or x[1] == key[1]) 
        and (key[2] is None or x[2] == key[2]) 
        and (key[3] is None or x[3] == key[3])): 
      pass 


def get_rand_or_none(start, stop): 
    number = random.randint(start-1, stop) 
    if number == start-1: 
     number = None 
    return number 


entry_keys = set() 
for h in range(100): 
    entry_keys.add((get_rand_or_none(1, 200), get_rand_or_none(1, 10), get_rand_or_none(1, 4), get_rand_or_none(1, 7))) 
all_entries = dict() 
for l in range(13000): 
    all_entries[(random.randint(1, 200), random.randint(1, 10), random.randint(1, 4), random.randint(1, 7))] = 1 

variable_time = timeit.timeit("access_variable_length()", "from __main__ import access_variable_length", number=10) 
static_time = timeit.timeit("access_static_length()", "from __main__ import access_static_length", number=10) 

print("variable length time: {}".format(variable_time)) 
print("static length time: {}".format(static_time)) 

结果:

variable length time: 9.625867042849316 
static length time: 1.043319165662158

我想避免创建三个不同的功能my_filter_fn2my_filter_fn3my_filter_fn4覆盖我的字典中所有可能的尺寸,然后用静态维度筛​​选。我知道对可变维度进行过滤总是比对固定维度进行过滤要慢,但希望它不会慢10倍。由于我不是Python专家,我希望有一种巧妙的方式可以重构我的可变维度生成器表达式,以提供更好的性能。

以我描述的方式过滤巨大词典的最有效方法是什么?

+1

它是不好用的内置插件名称为对象,所以'filter'应该改名(以'filter_entries'例如) –

+0

@AzatIbrakov谢谢你,我改变了它。 – Chris

感谢您有机会思考集合和词典中的元组。这是一个非常有用和强大的Python角落。

Python被解释,所以如果你来自一种编译语言,一个很好的经验法则是避免复杂的嵌套迭代,你可以。如果你正在编写复杂的循环或理解,总是值得怀疑是否有更好的方法来做到这一点。

列表下标(stuff[i])和range (len(stuff))在Python中效率低下且冗长,而且很少有必要。这是更有效的(和更自然)迭代:

for item in stuff: 
    do_something(item) 

下面的代码是快,因为它使用了一些Python的优势:内涵,字典,集和元组拆包。

有迭代,但它们很简单和浅。 整个代码中只有一条if语句,并且每个过滤操作只执行4次。这也有助于提高性能 - 并使代码更易于阅读。

的方法的说明...

从原始数据的每个键:

{ 
    (0, 1): (1, 4, 5), 
    (1, 4): (1, 4, 5), 
    (2, 5): (1, 4, 5) 
} 

(从零的Python数字元素:

{(1, 4, 5): 1} 

由位置和索引值)

索引被整理成一个大的查找字典,由集的元组:

{ 
    (0, 1): {(1, 4, 5), (1, 6, 7), (1, 2), (1, 8), (1, 4, 2, 8), ...} 
    (0, 2): {(2, 1), (2, 2), (2, 4, 1, 8), ...} 
    (1, 4): {(1, 4, 5), (1, 4, 2, 8), (2, 4, 1, 8), ...} 
    ... 
} 

一旦该查找是建立(和它被非常有效地构建)滤波只设置交点和字典查找,这两者都是疾如。即使是大型字典,过滤也需要几微秒。

该方法使用元数据2,3或4(或任何其他)处理数据,但arity_filtered()只返回具有与过滤元组相同数量成员的键。所以这个类可以让你选择一起过滤所有数据,或者单独处理不同大小的元组,在性能方面很少选择它们。

大型随机数据集(11500个元组)的计时结果为0.30秒来建立查找,100次查找为0.007秒。

from collections import defaultdict 
import random 
import timeit 


class TupleFilter: 
    def __init__(self, data): 
     self.data = data 
     self.lookup = self.build_lookup() 

    def build_lookup(self): 
     lookup = defaultdict(set) 
     for data_item in self.data: 
      for member_ref, data_key in tuple_index(data_item).items(): 
       lookup[member_ref].add(data_key) 
     return lookup 

    def filtered(self, tuple_filter): 
     # initially unfiltered 
     results = self.all_keys() 
     # reduce filtered set 
     for position, value in enumerate(tuple_filter): 
      if value is not None: 
       match_or_empty_set = self.lookup.get((position, value), set()) 
       results = results.intersection(match_or_empty_set) 
     return results 

    def arity_filtered(self, tuple_filter): 
     tf_length = len(tuple_filter) 
     return {match for match in self.filtered(tuple_filter) if tf_length == len(match)} 

    def all_keys(self): 
     return set(self.data.keys()) 


def tuple_index(item_key): 
    member_refs = enumerate(item_key) 
    return {(pos, val): item_key for pos, val in member_refs} 


data = { 
    (1, 2): 1, 
    (1, 5): 2, 
    (1, 5, 3): 2, 
    (5, 2, 5, 2): 8 
} 

tests = { 
    (1, 5): 2, 
    (1, None, 3): 1, 
    (1, None): 3, 
    (None, 5): 2, 
} 

tf = TupleFilter(data) 
for filter_tuple, expected_length in tests.items(): 
    result = tf.filtered(filter_tuple) 
    print("Filter {0} => {1}".format(filter_tuple, result)) 
    assert len(result) == expected_length 
# same arity filtering 
filter_tuple = (1, None) 
print('Not arity matched: {0} => {1}' 
     .format(filter_tuple, tf.filtered(filter_tuple))) 
print('Arity matched: {0} => {1}' 
     .format(filter_tuple, tf.arity_filtered(filter_tuple))) 
# check unfiltered results return original data set 
assert tf.filtered((None, None)) == tf.all_keys() 


>>> python filter.py 
Filter (1, 5) finds {(1, 5), (1, 5, 3)} 
Filter (1, None, 3) finds {(1, 5, 3)} 
Filter (1, None) finds {(1, 2), (1, 5), (1, 5, 3)} 
Filter (None, 5) finds {(1, 5), (1, 5, 3)} 
Arity filtering: note two search results only: (1, None) => {(1, 2), (1, 5)} 
+0

谢谢,非常好的主意。不幸的是,实际上我的字典会不时变化。我需要检查重建查找表的成本是否被查找的加速所抵消。 – Chris

+0

首先,构建查找词典比串行搜索更快,那么为什么不重新构建呢?其次,没有什么能阻止你向查阅词典添加新的数据项。如果有删除,可以通过检查数据的过滤结果来轻松处理。更新比较棘手,并且通过查找重建最直接地处理 – Nick

+0

嘿,祝你好运。随意标记选定的答案,如果倾向 – Nick

我没有一个漂亮的答案,但这种优化通常会使代码更难阅读。但如果您只需要更快的速度,那么您可以做两件事。首先,我们可以直接从循环内部消除重复计算。你说每个字典中的所有条目都有相同的长度,所以你可以计算一次,而不是在循环中重复。这对我来说削减了大约20%:

​​

不漂亮,我同意。但通过使用eval构建固定长度函数,我们可以使其更快(甚至更丑)。像这样:

def access_variable_length_new(): 
    try: 
     length = len(iter(entry_keys).next()) 
    except KeyError: 
     return 
    func_l = ["(key[{0}] is None or x[{0}] == key[{0}])".format(i) for i in range(length)] 
    func_s = "lambda x,key: " + " and ".join(func_l) 
    func = eval(func_s) 
    for key in entry_keys: 
     for k in (x for x in all_entries.keys() if func(x,key)): 
      pass 

对我来说,这几乎和静态版本一样快。

+0

让我们[在聊天中继续讨论](http://chat.*.com/rooms/145973/discussion-between-chris-and-strubbly)。 – Chris

我做了一些修改:

  • 你不需要使用dict.keys方法,通过按键进行迭代,通过dict对象迭代本身会给我们的钥匙,

  • 创建独立模块,它有助于阅读和修改:

    • preparations.py带助手产生测试数据:

      import random 
      
      left_ends = [200, 10, 4, 7] 
      
      
      def generate_all_entries(count): 
          return {tuple(random.randint(1, num) 
              for num in left_ends): 1 
            for _ in range(count)} 
      
      
      def generate_entry_keys(count): 
          return [tuple(get_rand_or_none(1, num) 
              for num in left_ends) 
            for _ in range(count)] 
      
      
      def get_rand_or_none(start, stop): 
          number = random.randint(start - 1, stop) 
          if number == start - 1: 
           number = None 
          return number 
      
    • functions.py用于测试功能,
    • main.py为基准。
  • 参数传递给函数,而不是从全球范围内让他们,所以给出的静态&可变长度的版本使用上的timeit.repeat代替timeit.timeit结果min得到大多数表示的结果(成为

    def access_static_length(all_entries, entry_keys): 
        for key in entry_keys: 
         for k in (x 
            for x in all_entries 
            if (key[0] is None or x[0] == key[0]) 
            and (key[1] is None or x[1] == key[1]) 
            and (key[2] is None or x[2] == key[2]) 
            and (key[3] is None or x[3] == key[3])): 
          pass 
    
    
    def access_variable_length(all_entries, entry_keys): 
        for key in entry_keys: 
         for k in (x 
            for x in all_entries 
            if all(key[i] is None or key[i] == x[i] 
             for i in range(len(key)))): 
          pass 
    
  • 更多在this answer),

  • 变化entries_keys元素数从10100(包括端部)与步骤10

  • 改变all_entries元件从1000015000(包括端部)与计数步骤500


但要回点。

改进

  1. 我们可以跳过与None值指标检查中键

    def access_variable_length_with_skipping_none(all_entries, entry_keys): 
        for key in entry_keys: 
         non_none_indexes = {i 
              for i, value in enumerate(key) 
              if value is not None} 
         for k in (x 
            for x in all_entries.keys() 
            if all(key[i] == x[i] 
             for i in non_none_indexes)): 
          pass 
    
  2. 下一个建议提高过滤是使用numpy

    import numpy as np 
    
    
    def access_variable_length_numpy(all_entries, entry_keys): 
        keys_array = np.array(list(all_entries)) 
        for entry_key in entry_keys: 
         non_none_indexes = [i 
              for i, value in enumerate(entry_key) 
              if value is not None] 
         non_none_values = [value 
              for i, value in enumerate(entry_key) 
              if value is not None] 
         mask = keys_array[:, non_none_indexes] == non_none_values 
         indexes, _ = np.where(mask) 
         for k in map(tuple, keys_array[indexes]): 
          pass 
    

基准

内容的main.py

import timeit 
from itertools import product 

number = 5 
repeat = 10 
for all_entries_count, entry_keys_count in product(range(10000, 15001, 500), 
                range(10, 101, 10)): 
    print('all entries count: {}'.format(all_entries_count)) 
    print('entry keys count: {}'.format(entry_keys_count)) 
    preparation_part = ("from preparation import (generate_all_entries,\n" 
         "       generate_entry_keys)\n" 
         "all_entries = generate_all_entries({all_entries_count})\n" 
         "entry_keys = generate_entry_keys({entry_keys_count})\n" 
         .format(all_entries_count=all_entries_count, 
           entry_keys_count=entry_keys_count)) 
    static_time = min(timeit.repeat(
     "access_static_length(all_entries, entry_keys)", 
     preparation_part + "from functions import access_static_length", 
     repeat=repeat, 
     number=number)) 
    variable_time = min(timeit.repeat(
     "access_variable_length(all_entries, entry_keys)", 
     preparation_part + "from functions import access_variable_length", 
     repeat=repeat, 
     number=number)) 
    variable_time_with_skipping_none = min(timeit.repeat(
     "access_variable_length_with_skipping_none(all_entries, entry_keys)", 
     preparation_part + 
     "from functions import access_variable_length_with_skipping_none", 
     repeat=repeat, 
     number=number)) 
    variable_time_numpy = min(timeit.repeat(
     "access_variable_length_numpy(all_entries, entry_keys)", 
     preparation_part + 
     "from functions import access_variable_length_numpy", 
     repeat=repeat, 
     number=number)) 

    print("static length time: {}".format(static_time)) 
    print("variable length time: {}".format(variable_time)) 
    print("variable length time with skipping `None` keys: {}" 
      .format(variable_time_with_skipping_none)) 
    print("variable length time with numpy: {}" 
      .format(variable_time_numpy)) 

这在我的机器有的Python 3.6。1给出:

all entries count: 10000 
entry keys count: 10 
static length time: 0.06314293399918824 
variable length time: 0.5234129569980723 
variable length time with skipping `None` keys: 0.2890012050011137 
variable length time with numpy: 0.22945181500108447 
all entries count: 10000 
entry keys count: 20 
static length time: 0.12795891799760284 
variable length time: 1.0610534609986644 
variable length time with skipping `None` keys: 0.5744297259989253 
variable length time with numpy: 0.5105678180007089 
all entries count: 10000 
entry keys count: 30 
static length time: 0.19210158399801003 
variable length time: 1.6491422000035527 
variable length time with skipping `None` keys: 0.8566724129996146 
variable length time with numpy: 0.7363859869983571 
all entries count: 10000 
entry keys count: 40 
static length time: 0.2561357790000329 
variable length time: 2.08878050599742 
variable length time with skipping `None` keys: 1.1256247100027394 
variable length time with numpy: 1.0066140279996034 
all entries count: 10000 
entry keys count: 50 
static length time: 0.32130833200062625 
variable length time: 2.6166040710013476 
variable length time with skipping `None` keys: 1.4147321179989376 
variable length time with numpy: 1.1700750320014777 
all entries count: 10000 
entry keys count: 60 
static length time: 0.38276188999952865 
variable length time: 3.153736616997776 
variable length time with skipping `None` keys: 1.7147898039984284 
variable length time with numpy: 1.4533947029995034 
all entries count: 10000 
entry keys count: 70 
... 
all entries count: 15000 
entry keys count: 80 
static length time: 0.7141444490007416 
variable length time: 6.186657476999244 
variable length time with skipping `None` keys: 3.376506028998847 
variable length time with numpy: 3.1577993860009883 
all entries count: 15000 
entry keys count: 90 
static length time: 0.8115685330012639 
variable length time: 7.14327938399947 
variable length time with skipping `None` keys: 3.7462387939995097 
variable length time with numpy: 3.6140603050007485 
all entries count: 15000 
entry keys count: 100 
static length time: 0.8950150890013902 
variable length time: 7.829741768000531 
variable length time with skipping `None` keys: 4.1662235900003 
variable length time with numpy: 3.914334102999419 

恢复

我们可以看到如预期numpy版本也不是那么好,它似乎不numpy的错。

如果我们删除转换滤波阵列记录tuple s的map和刚刚离开

for k in keys_array[indexes]: 
    ... 

那么这将是非常快(比静态长度版本速度更快),所以这个问题是转换,从numpy.ndarray反对到tuple

过滤掉None入门钥匙给我们提供了大约50%的速度增益,所以随时添加它。

比方说,你有一本字典 - d

d = {(1,2):3,(1,4):5,(2,4):2,(1,3):4,(2,3):6,(5,1):5,(3,8):5,(3,6):9} 

首先,你可以得到字典键 -

keys = d.keys() 
=> 
dict_keys([(1, 2), (3, 8), (1, 3), (2, 3), (3, 6), (5, 1), (2, 4), (1, 4)]) 

现在,让我们定义一个函数is_match能判定给出两个元,如果他们等于或不是基于你的条件 -
is_match((1,7),(1,None)),is_match((1,5),(None,5))is_match((1,4),(1,4))将返回Trueis_match((1,7),(1,8)),is_match((4,7),(6,12))将返回False

def if_equal(a, b): 
    if a is None or b is None: 
     return True 
    else: 
     if a==b: 
      return True 
     else: 
      return False 

is_match = lambda a,b: False not in list(map(if_equal, a, b)) 

tup = (1, None) 
matched_keys = [key for key in keys if is_match(key, tup)] 
=> 
[(1, 2), (1, 3), (1, 4)] 
+0

谢谢,但是这个解决方案的性能比我已经提出的更差(见“可变长度”下的问题)。 – Chris