

TL; DR如何有效地过滤任意长度的元组作为键的字典?

什么是实现具有可变维数键的字典的过滤函数的最有效方法?过滤器应采用与字典键相同尺寸的元组,并输出字典中与过滤器匹配的所有键,例如filter[i] is None or filter[i] == key[i]的所有尺寸为i


{(1, 2): 1, (1, 5): 2} 
{(1, 5, 3): 2} 
{(5, 2, 5, 2): 8} 

这些字典在约20 000项中含有大量的条目,其中最大的。我经常需要过滤这些条目,但通常只查看关键元组的某些索引。理想情况下,我想要一个函数,我可以提供一个过滤器元组。该函数然后应该返回匹配过滤器元组的所有键。如果过滤器元组包含None条目,则这将匹配该索引处字典的关键元组中的任何值。功能应该字典做什么用2维方向键


>>> dict = {(1, 2): 1, (1, 5): 2, (2, 5): 1, (3, 9): 5} 
>>> my_filter_fn((1, None)) 
{(1, 2), (1, 5)} 
>>> my_filter_fn((None, 5)) 
{(1, 5), (2, 5)} 
>>> my_filter_fn((2, 4)) 
>>> my_filter_fn((None, None)) 
{(1, 2), (1, 5), (2, 5), (3, 9)} 


def my_filter_fn(entries: dict, match: tuple): 
    return (x for x in entries.keys() if all(match[i] is None or match[i] == x[i] 
              for i in range(len(key)))) 

不幸的是,这是一个相当慢的与通过手((match[0] is None or match[0] === x[0]) and (match[1] is None or match[1] == x[1])完全写出条件;对于4维而言,这大约慢了10倍。这对我来说是一个问题,因为我需要经常进行这种过滤。


import random 
import timeit 

def access_variable_length(): 
    for key in entry_keys: 
     for k in (x for x in all_entries.keys() if all(key[i] is None or key[i] == x[i] 
                 for i in range(len(key)))): 

def access_static_length(): 
    for key in entry_keys: 
     for k in (x for x in all_entries.keys() if 
        (key[0] is None or x[0] == key[0]) 
        and (key[1] is None or x[1] == key[1]) 
        and (key[2] is None or x[2] == key[2]) 
        and (key[3] is None or x[3] == key[3])): 

def get_rand_or_none(start, stop): 
    number = random.randint(start-1, stop) 
    if number == start-1: 
     number = None 
    return number 

entry_keys = set() 
for h in range(100): 
    entry_keys.add((get_rand_or_none(1, 200), get_rand_or_none(1, 10), get_rand_or_none(1, 4), get_rand_or_none(1, 7))) 
all_entries = dict() 
for l in range(13000): 
    all_entries[(random.randint(1, 200), random.randint(1, 10), random.randint(1, 4), random.randint(1, 7))] = 1 

variable_time = timeit.timeit("access_variable_length()", "from __main__ import access_variable_length", number=10) 
static_time = timeit.timeit("access_static_length()", "from __main__ import access_static_length", number=10) 

print("variable length time: {}".format(variable_time)) 
print("static length time: {}".format(static_time)) 


variable length time: 9.625867042849316 
static length time: 1.043319165662158




它是不好用的内置插件名称为对象,所以'filter'应该改名(以'filter_entries'例如) –


@AzatIbrakov谢谢你,我改变了它。 – Chris



列表下标(stuff[i])和range (len(stuff))在Python中效率低下且冗长,而且很少有必要。这是更有效的(和更自然)迭代:

for item in stuff: 


有迭代,但它们很简单和浅。 整个代码中只有一条if语句,并且每个过滤操作只执行4次。这也有助于提高性能 - 并使代码更易于阅读。



    (0, 1): (1, 4, 5), 
    (1, 4): (1, 4, 5), 
    (2, 5): (1, 4, 5) 


{(1, 4, 5): 1} 



    (0, 1): {(1, 4, 5), (1, 6, 7), (1, 2), (1, 8), (1, 4, 2, 8), ...} 
    (0, 2): {(2, 1), (2, 2), (2, 4, 1, 8), ...} 
    (1, 4): {(1, 4, 5), (1, 4, 2, 8), (2, 4, 1, 8), ...} 




from collections import defaultdict 
import random 
import timeit 

class TupleFilter: 
    def __init__(self, data): 
     self.data = data 
     self.lookup = self.build_lookup() 

    def build_lookup(self): 
     lookup = defaultdict(set) 
     for data_item in self.data: 
      for member_ref, data_key in tuple_index(data_item).items(): 
     return lookup 

    def filtered(self, tuple_filter): 
     # initially unfiltered 
     results = self.all_keys() 
     # reduce filtered set 
     for position, value in enumerate(tuple_filter): 
      if value is not None: 
       match_or_empty_set = self.lookup.get((position, value), set()) 
       results = results.intersection(match_or_empty_set) 
     return results 

    def arity_filtered(self, tuple_filter): 
     tf_length = len(tuple_filter) 
     return {match for match in self.filtered(tuple_filter) if tf_length == len(match)} 

    def all_keys(self): 
     return set(self.data.keys()) 

def tuple_index(item_key): 
    member_refs = enumerate(item_key) 
    return {(pos, val): item_key for pos, val in member_refs} 

data = { 
    (1, 2): 1, 
    (1, 5): 2, 
    (1, 5, 3): 2, 
    (5, 2, 5, 2): 8 

tests = { 
    (1, 5): 2, 
    (1, None, 3): 1, 
    (1, None): 3, 
    (None, 5): 2, 

tf = TupleFilter(data) 
for filter_tuple, expected_length in tests.items(): 
    result = tf.filtered(filter_tuple) 
    print("Filter {0} => {1}".format(filter_tuple, result)) 
    assert len(result) == expected_length 
# same arity filtering 
filter_tuple = (1, None) 
print('Not arity matched: {0} => {1}' 
     .format(filter_tuple, tf.filtered(filter_tuple))) 
print('Arity matched: {0} => {1}' 
     .format(filter_tuple, tf.arity_filtered(filter_tuple))) 
# check unfiltered results return original data set 
assert tf.filtered((None, None)) == tf.all_keys() 

>>> python filter.py 
Filter (1, 5) finds {(1, 5), (1, 5, 3)} 
Filter (1, None, 3) finds {(1, 5, 3)} 
Filter (1, None) finds {(1, 2), (1, 5), (1, 5, 3)} 
Filter (None, 5) finds {(1, 5), (1, 5, 3)} 
Arity filtering: note two search results only: (1, None) => {(1, 2), (1, 5)} 

谢谢,非常好的主意。不幸的是,实际上我的字典会不时变化。我需要检查重建查找表的成本是否被查找的加速所抵消。 – Chris


首先,构建查找词典比串行搜索更快,那么为什么不重新构建呢?其次,没有什么能阻止你向查阅词典添加新的数据项。如果有删除,可以通过检查数据的过滤结果来轻松处理。更新比较棘手,并且通过查找重建最直接地处理 – Nick


嘿,祝你好运。随意标记选定的答案,如果倾向 – Nick




def access_variable_length_new(): 
     length = len(iter(entry_keys).next()) 
    except KeyError: 
    func_l = ["(key[{0}] is None or x[{0}] == key[{0}])".format(i) for i in range(length)] 
    func_s = "lambda x,key: " + " and ".join(func_l) 
    func = eval(func_s) 
    for key in entry_keys: 
     for k in (x for x in all_entries.keys() if func(x,key)): 



让我们[在聊天中继续讨论](http://chat.*.com/rooms/145973/discussion-between-chris-and-strubbly)。 – Chris


  • 你不需要使用dict.keys方法,通过按键进行迭代,通过dict对象迭代本身会给我们的钥匙,

  • 创建独立模块,它有助于阅读和修改:

    • preparations.py带助手产生测试数据:

      import random 
      left_ends = [200, 10, 4, 7] 
      def generate_all_entries(count): 
          return {tuple(random.randint(1, num) 
              for num in left_ends): 1 
            for _ in range(count)} 
      def generate_entry_keys(count): 
          return [tuple(get_rand_or_none(1, num) 
              for num in left_ends) 
            for _ in range(count)] 
      def get_rand_or_none(start, stop): 
          number = random.randint(start - 1, stop) 
          if number == start - 1: 
           number = None 
          return number 
    • functions.py用于测试功能,
    • main.py为基准。
  • 参数传递给函数,而不是从全球范围内让他们,所以给出的静态&可变长度的版本使用上的timeit.repeat代替timeit.timeit结果min得到大多数表示的结果(成为

    def access_static_length(all_entries, entry_keys): 
        for key in entry_keys: 
         for k in (x 
            for x in all_entries 
            if (key[0] is None or x[0] == key[0]) 
            and (key[1] is None or x[1] == key[1]) 
            and (key[2] is None or x[2] == key[2]) 
            and (key[3] is None or x[3] == key[3])): 
    def access_variable_length(all_entries, entry_keys): 
        for key in entry_keys: 
         for k in (x 
            for x in all_entries 
            if all(key[i] is None or key[i] == x[i] 
             for i in range(len(key)))): 
  • 更多在this answer),

  • 变化entries_keys元素数从10100(包括端部)与步骤10

  • 改变all_entries元件从1000015000(包括端部)与计数步骤500



  1. 我们可以跳过与None值指标检查中键

    def access_variable_length_with_skipping_none(all_entries, entry_keys): 
        for key in entry_keys: 
         non_none_indexes = {i 
              for i, value in enumerate(key) 
              if value is not None} 
         for k in (x 
            for x in all_entries.keys() 
            if all(key[i] == x[i] 
             for i in non_none_indexes)): 
  2. 下一个建议提高过滤是使用numpy

    import numpy as np 
    def access_variable_length_numpy(all_entries, entry_keys): 
        keys_array = np.array(list(all_entries)) 
        for entry_key in entry_keys: 
         non_none_indexes = [i 
              for i, value in enumerate(entry_key) 
              if value is not None] 
         non_none_values = [value 
              for i, value in enumerate(entry_key) 
              if value is not None] 
         mask = keys_array[:, non_none_indexes] == non_none_values 
         indexes, _ = np.where(mask) 
         for k in map(tuple, keys_array[indexes]): 



import timeit 
from itertools import product 

number = 5 
repeat = 10 
for all_entries_count, entry_keys_count in product(range(10000, 15001, 500), 
                range(10, 101, 10)): 
    print('all entries count: {}'.format(all_entries_count)) 
    print('entry keys count: {}'.format(entry_keys_count)) 
    preparation_part = ("from preparation import (generate_all_entries,\n" 
         "       generate_entry_keys)\n" 
         "all_entries = generate_all_entries({all_entries_count})\n" 
         "entry_keys = generate_entry_keys({entry_keys_count})\n" 
    static_time = min(timeit.repeat(
     "access_static_length(all_entries, entry_keys)", 
     preparation_part + "from functions import access_static_length", 
    variable_time = min(timeit.repeat(
     "access_variable_length(all_entries, entry_keys)", 
     preparation_part + "from functions import access_variable_length", 
    variable_time_with_skipping_none = min(timeit.repeat(
     "access_variable_length_with_skipping_none(all_entries, entry_keys)", 
     preparation_part + 
     "from functions import access_variable_length_with_skipping_none", 
    variable_time_numpy = min(timeit.repeat(
     "access_variable_length_numpy(all_entries, entry_keys)", 
     preparation_part + 
     "from functions import access_variable_length_numpy", 

    print("static length time: {}".format(static_time)) 
    print("variable length time: {}".format(variable_time)) 
    print("variable length time with skipping `None` keys: {}" 
    print("variable length time with numpy: {}" 

这在我的机器有的Python 3.6。1给出:

all entries count: 10000 
entry keys count: 10 
static length time: 0.06314293399918824 
variable length time: 0.5234129569980723 
variable length time with skipping `None` keys: 0.2890012050011137 
variable length time with numpy: 0.22945181500108447 
all entries count: 10000 
entry keys count: 20 
static length time: 0.12795891799760284 
variable length time: 1.0610534609986644 
variable length time with skipping `None` keys: 0.5744297259989253 
variable length time with numpy: 0.5105678180007089 
all entries count: 10000 
entry keys count: 30 
static length time: 0.19210158399801003 
variable length time: 1.6491422000035527 
variable length time with skipping `None` keys: 0.8566724129996146 
variable length time with numpy: 0.7363859869983571 
all entries count: 10000 
entry keys count: 40 
static length time: 0.2561357790000329 
variable length time: 2.08878050599742 
variable length time with skipping `None` keys: 1.1256247100027394 
variable length time with numpy: 1.0066140279996034 
all entries count: 10000 
entry keys count: 50 
static length time: 0.32130833200062625 
variable length time: 2.6166040710013476 
variable length time with skipping `None` keys: 1.4147321179989376 
variable length time with numpy: 1.1700750320014777 
all entries count: 10000 
entry keys count: 60 
static length time: 0.38276188999952865 
variable length time: 3.153736616997776 
variable length time with skipping `None` keys: 1.7147898039984284 
variable length time with numpy: 1.4533947029995034 
all entries count: 10000 
entry keys count: 70 
all entries count: 15000 
entry keys count: 80 
static length time: 0.7141444490007416 
variable length time: 6.186657476999244 
variable length time with skipping `None` keys: 3.376506028998847 
variable length time with numpy: 3.1577993860009883 
all entries count: 15000 
entry keys count: 90 
static length time: 0.8115685330012639 
variable length time: 7.14327938399947 
variable length time with skipping `None` keys: 3.7462387939995097 
variable length time with numpy: 3.6140603050007485 
all entries count: 15000 
entry keys count: 100 
static length time: 0.8950150890013902 
variable length time: 7.829741768000531 
variable length time with skipping `None` keys: 4.1662235900003 
variable length time with numpy: 3.914334102999419 



如果我们删除转换滤波阵列记录tuple s的map和刚刚离开

for k in keys_array[indexes]: 



比方说,你有一本字典 - d

d = {(1,2):3,(1,4):5,(2,4):2,(1,3):4,(2,3):6,(5,1):5,(3,8):5,(3,6):9} 

首先,你可以得到字典键 -

keys = d.keys() 
dict_keys([(1, 2), (3, 8), (1, 3), (2, 3), (3, 6), (5, 1), (2, 4), (1, 4)]) 

现在,让我们定义一个函数is_match能判定给出两个元,如果他们等于或不是基于你的条件 -

def if_equal(a, b): 
    if a is None or b is None: 
     return True 
     if a==b: 
      return True 
      return False 

is_match = lambda a,b: False not in list(map(if_equal, a, b)) 

tup = (1, None) 
matched_keys = [key for key in keys if is_match(key, tup)] 
[(1, 2), (1, 3), (1, 4)] 

谢谢,但是这个解决方案的性能比我已经提出的更差(见“可变长度”下的问题)。 – Chris