如何有效地过滤任意长度的元组作为键的字典?
TL; DR如何有效地过滤任意长度的元组作为键的字典?
什么是实现具有可变维数键的字典的过滤函数的最有效方法?过滤器应采用与字典键相同尺寸的元组,并输出字典中与过滤器匹配的所有键,例如filter[i] is None or filter[i] == key[i]
的所有尺寸为i
。
在我当前的项目,我需要大量的数据处理字典。字典的一般结构是这样的,它包含具有2到4个整数作为键和整数作为值的元组。字典中的所有键具有相同的尺寸。为了说明,字典的例子如下我需要处理:
{(1, 2): 1, (1, 5): 2}
{(1, 5, 3): 2}
{(5, 2, 5, 2): 8}
这些字典在约20 000项中含有大量的条目,其中最大的。我经常需要过滤这些条目,但通常只查看关键元组的某些索引。理想情况下,我想要一个函数,我可以提供一个过滤器元组。该函数然后应该返回匹配过滤器元组的所有键。如果过滤器元组包含None
条目,则这将匹配该索引处字典的关键元组中的任何值。功能应该字典做什么用2维方向键
例子:
>>> dict = {(1, 2): 1, (1, 5): 2, (2, 5): 1, (3, 9): 5}
>>> my_filter_fn((1, None))
{(1, 2), (1, 5)}
>>> my_filter_fn((None, 5))
{(1, 5), (2, 5)}
>>> my_filter_fn((2, 4))
set()
>>> my_filter_fn((None, None))
{(1, 2), (1, 5), (2, 5), (3, 9)}
正如我的词典有自己的元组不同尺寸,我试图通过编写一个生成器表达式这需要解决这个问题该元组的尺寸考虑在内:
def my_filter_fn(entries: dict, match: tuple):
return (x for x in entries.keys() if all(match[i] is None or match[i] == x[i]
for i in range(len(key))))
不幸的是,这是一个相当慢的与通过手((match[0] is None or match[0] === x[0]) and (match[1] is None or match[1] == x[1]
)完全写出条件;对于4维而言,这大约慢了10倍。这对我来说是一个问题,因为我需要经常进行这种过滤。
以下代码演示了性能问题。代码仅用于说明问题并能够复制测试。您可以跳过代码部分,结果如下。
import random
import timeit
def access_variable_length():
for key in entry_keys:
for k in (x for x in all_entries.keys() if all(key[i] is None or key[i] == x[i]
for i in range(len(key)))):
pass
def access_static_length():
for key in entry_keys:
for k in (x for x in all_entries.keys() if
(key[0] is None or x[0] == key[0])
and (key[1] is None or x[1] == key[1])
and (key[2] is None or x[2] == key[2])
and (key[3] is None or x[3] == key[3])):
pass
def get_rand_or_none(start, stop):
number = random.randint(start-1, stop)
if number == start-1:
number = None
return number
entry_keys = set()
for h in range(100):
entry_keys.add((get_rand_or_none(1, 200), get_rand_or_none(1, 10), get_rand_or_none(1, 4), get_rand_or_none(1, 7)))
all_entries = dict()
for l in range(13000):
all_entries[(random.randint(1, 200), random.randint(1, 10), random.randint(1, 4), random.randint(1, 7))] = 1
variable_time = timeit.timeit("access_variable_length()", "from __main__ import access_variable_length", number=10)
static_time = timeit.timeit("access_static_length()", "from __main__ import access_static_length", number=10)
print("variable length time: {}".format(variable_time))
print("static length time: {}".format(static_time))
结果:
variable length time: 9.625867042849316 static length time: 1.043319165662158
我想避免创建三个不同的功能my_filter_fn2
,my_filter_fn3
和my_filter_fn4
覆盖我的字典中所有可能的尺寸,然后用静态维度筛选。我知道对可变维度进行过滤总是比对固定维度进行过滤要慢,但希望它不会慢10倍。由于我不是Python专家,我希望有一种巧妙的方式可以重构我的可变维度生成器表达式,以提供更好的性能。
以我描述的方式过滤巨大词典的最有效方法是什么?
感谢您有机会思考集合和词典中的元组。这是一个非常有用和强大的Python角落。
Python被解释,所以如果你来自一种编译语言,一个很好的经验法则是避免复杂的嵌套迭代,你可以。如果你正在编写复杂的循环或理解,总是值得怀疑是否有更好的方法来做到这一点。
列表下标(stuff[i]
)和range (len(stuff))
在Python中效率低下且冗长,而且很少有必要。这是更有效的(和更自然)迭代:
for item in stuff:
do_something(item)
下面的代码是快,因为它使用了一些Python的优势:内涵,字典,集和元组拆包。
有迭代,但它们很简单和浅。 整个代码中只有一条if语句,并且每个过滤操作只执行4次。这也有助于提高性能 - 并使代码更易于阅读。
的方法的说明...
从原始数据的每个键:
{
(0, 1): (1, 4, 5),
(1, 4): (1, 4, 5),
(2, 5): (1, 4, 5)
}
(从零的Python数字元素:
{(1, 4, 5): 1}
由位置和索引值)
索引被整理成一个大的查找字典,由集的元组:
{
(0, 1): {(1, 4, 5), (1, 6, 7), (1, 2), (1, 8), (1, 4, 2, 8), ...}
(0, 2): {(2, 1), (2, 2), (2, 4, 1, 8), ...}
(1, 4): {(1, 4, 5), (1, 4, 2, 8), (2, 4, 1, 8), ...}
...
}
一旦该查找是建立(和它被非常有效地构建)滤波只设置交点和字典查找,这两者都是疾如。即使是大型字典,过滤也需要几微秒。
该方法使用元数据2,3或4(或任何其他)处理数据,但arity_filtered()
只返回具有与过滤元组相同数量成员的键。所以这个类可以让你选择一起过滤所有数据,或者单独处理不同大小的元组,在性能方面很少选择它们。
大型随机数据集(11500个元组)的计时结果为0.30秒来建立查找,100次查找为0.007秒。
from collections import defaultdict
import random
import timeit
class TupleFilter:
def __init__(self, data):
self.data = data
self.lookup = self.build_lookup()
def build_lookup(self):
lookup = defaultdict(set)
for data_item in self.data:
for member_ref, data_key in tuple_index(data_item).items():
lookup[member_ref].add(data_key)
return lookup
def filtered(self, tuple_filter):
# initially unfiltered
results = self.all_keys()
# reduce filtered set
for position, value in enumerate(tuple_filter):
if value is not None:
match_or_empty_set = self.lookup.get((position, value), set())
results = results.intersection(match_or_empty_set)
return results
def arity_filtered(self, tuple_filter):
tf_length = len(tuple_filter)
return {match for match in self.filtered(tuple_filter) if tf_length == len(match)}
def all_keys(self):
return set(self.data.keys())
def tuple_index(item_key):
member_refs = enumerate(item_key)
return {(pos, val): item_key for pos, val in member_refs}
data = {
(1, 2): 1,
(1, 5): 2,
(1, 5, 3): 2,
(5, 2, 5, 2): 8
}
tests = {
(1, 5): 2,
(1, None, 3): 1,
(1, None): 3,
(None, 5): 2,
}
tf = TupleFilter(data)
for filter_tuple, expected_length in tests.items():
result = tf.filtered(filter_tuple)
print("Filter {0} => {1}".format(filter_tuple, result))
assert len(result) == expected_length
# same arity filtering
filter_tuple = (1, None)
print('Not arity matched: {0} => {1}'
.format(filter_tuple, tf.filtered(filter_tuple)))
print('Arity matched: {0} => {1}'
.format(filter_tuple, tf.arity_filtered(filter_tuple)))
# check unfiltered results return original data set
assert tf.filtered((None, None)) == tf.all_keys()
>>> python filter.py
Filter (1, 5) finds {(1, 5), (1, 5, 3)}
Filter (1, None, 3) finds {(1, 5, 3)}
Filter (1, None) finds {(1, 2), (1, 5), (1, 5, 3)}
Filter (None, 5) finds {(1, 5), (1, 5, 3)}
Arity filtering: note two search results only: (1, None) => {(1, 2), (1, 5)}
我没有一个漂亮的答案,但这种优化通常会使代码更难阅读。但如果您只需要更快的速度,那么您可以做两件事。首先,我们可以直接从循环内部消除重复计算。你说每个字典中的所有条目都有相同的长度,所以你可以计算一次,而不是在循环中重复。这对我来说削减了大约20%:
不漂亮,我同意。但通过使用eval
构建固定长度函数,我们可以使其更快(甚至更丑)。像这样:
def access_variable_length_new():
try:
length = len(iter(entry_keys).next())
except KeyError:
return
func_l = ["(key[{0}] is None or x[{0}] == key[{0}])".format(i) for i in range(length)]
func_s = "lambda x,key: " + " and ".join(func_l)
func = eval(func_s)
for key in entry_keys:
for k in (x for x in all_entries.keys() if func(x,key)):
pass
对我来说,这几乎和静态版本一样快。
让我们[在聊天中继续讨论](http://chat.stackoverflow.com/rooms/145973/discussion-between-chris-and-strubbly)。 – Chris
我做了一些修改:
你不需要使用
dict.keys
方法,通过按键进行迭代,通过dict
对象迭代本身会给我们的钥匙,-
创建独立模块,它有助于阅读和修改:
-
preparations.py
带助手产生测试数据:import random left_ends = [200, 10, 4, 7] def generate_all_entries(count): return {tuple(random.randint(1, num) for num in left_ends): 1 for _ in range(count)} def generate_entry_keys(count): return [tuple(get_rand_or_none(1, num) for num in left_ends) for _ in range(count)] def get_rand_or_none(start, stop): number = random.randint(start - 1, stop) if number == start - 1: number = None return number
-
functions.py
用于测试功能, -
main.py
为基准。
-
-
参数传递给函数,而不是从全球范围内让他们,所以给出的静态&可变长度的版本使用上的
timeit.repeat
代替timeit.timeit
结果min
得到大多数表示的结果(成为def access_static_length(all_entries, entry_keys): for key in entry_keys: for k in (x for x in all_entries if (key[0] is None or x[0] == key[0]) and (key[1] is None or x[1] == key[1]) and (key[2] is None or x[2] == key[2]) and (key[3] is None or x[3] == key[3])): pass def access_variable_length(all_entries, entry_keys): for key in entry_keys: for k in (x for x in all_entries if all(key[i] is None or key[i] == x[i] for i in range(len(key)))): pass
更多在this answer),
变化
entries_keys
元素数从10
至100
(包括端部)与步骤10
,改变
all_entries
元件从10000
到15000
(包括端部)与计数步骤500
。
但要回点。
改进
-
我们可以跳过与
None
值指标检查中键def access_variable_length_with_skipping_none(all_entries, entry_keys): for key in entry_keys: non_none_indexes = {i for i, value in enumerate(key) if value is not None} for k in (x for x in all_entries.keys() if all(key[i] == x[i] for i in non_none_indexes)): pass
-
下一个建议提高过滤是使用
numpy
:import numpy as np def access_variable_length_numpy(all_entries, entry_keys): keys_array = np.array(list(all_entries)) for entry_key in entry_keys: non_none_indexes = [i for i, value in enumerate(entry_key) if value is not None] non_none_values = [value for i, value in enumerate(entry_key) if value is not None] mask = keys_array[:, non_none_indexes] == non_none_values indexes, _ = np.where(mask) for k in map(tuple, keys_array[indexes]): pass
基准
内容的main.py
:
import timeit
from itertools import product
number = 5
repeat = 10
for all_entries_count, entry_keys_count in product(range(10000, 15001, 500),
range(10, 101, 10)):
print('all entries count: {}'.format(all_entries_count))
print('entry keys count: {}'.format(entry_keys_count))
preparation_part = ("from preparation import (generate_all_entries,\n"
" generate_entry_keys)\n"
"all_entries = generate_all_entries({all_entries_count})\n"
"entry_keys = generate_entry_keys({entry_keys_count})\n"
.format(all_entries_count=all_entries_count,
entry_keys_count=entry_keys_count))
static_time = min(timeit.repeat(
"access_static_length(all_entries, entry_keys)",
preparation_part + "from functions import access_static_length",
repeat=repeat,
number=number))
variable_time = min(timeit.repeat(
"access_variable_length(all_entries, entry_keys)",
preparation_part + "from functions import access_variable_length",
repeat=repeat,
number=number))
variable_time_with_skipping_none = min(timeit.repeat(
"access_variable_length_with_skipping_none(all_entries, entry_keys)",
preparation_part +
"from functions import access_variable_length_with_skipping_none",
repeat=repeat,
number=number))
variable_time_numpy = min(timeit.repeat(
"access_variable_length_numpy(all_entries, entry_keys)",
preparation_part +
"from functions import access_variable_length_numpy",
repeat=repeat,
number=number))
print("static length time: {}".format(static_time))
print("variable length time: {}".format(variable_time))
print("variable length time with skipping `None` keys: {}"
.format(variable_time_with_skipping_none))
print("variable length time with numpy: {}"
.format(variable_time_numpy))
这在我的机器有的Python 3.6。1给出:
all entries count: 10000
entry keys count: 10
static length time: 0.06314293399918824
variable length time: 0.5234129569980723
variable length time with skipping `None` keys: 0.2890012050011137
variable length time with numpy: 0.22945181500108447
all entries count: 10000
entry keys count: 20
static length time: 0.12795891799760284
variable length time: 1.0610534609986644
variable length time with skipping `None` keys: 0.5744297259989253
variable length time with numpy: 0.5105678180007089
all entries count: 10000
entry keys count: 30
static length time: 0.19210158399801003
variable length time: 1.6491422000035527
variable length time with skipping `None` keys: 0.8566724129996146
variable length time with numpy: 0.7363859869983571
all entries count: 10000
entry keys count: 40
static length time: 0.2561357790000329
variable length time: 2.08878050599742
variable length time with skipping `None` keys: 1.1256247100027394
variable length time with numpy: 1.0066140279996034
all entries count: 10000
entry keys count: 50
static length time: 0.32130833200062625
variable length time: 2.6166040710013476
variable length time with skipping `None` keys: 1.4147321179989376
variable length time with numpy: 1.1700750320014777
all entries count: 10000
entry keys count: 60
static length time: 0.38276188999952865
variable length time: 3.153736616997776
variable length time with skipping `None` keys: 1.7147898039984284
variable length time with numpy: 1.4533947029995034
all entries count: 10000
entry keys count: 70
...
all entries count: 15000
entry keys count: 80
static length time: 0.7141444490007416
variable length time: 6.186657476999244
variable length time with skipping `None` keys: 3.376506028998847
variable length time with numpy: 3.1577993860009883
all entries count: 15000
entry keys count: 90
static length time: 0.8115685330012639
variable length time: 7.14327938399947
variable length time with skipping `None` keys: 3.7462387939995097
variable length time with numpy: 3.6140603050007485
all entries count: 15000
entry keys count: 100
static length time: 0.8950150890013902
variable length time: 7.829741768000531
variable length time with skipping `None` keys: 4.1662235900003
variable length time with numpy: 3.914334102999419
恢复
我们可以看到如预期numpy
版本也不是那么好,它似乎不numpy
的错。
如果我们删除转换滤波阵列记录tuple
s的map
和刚刚离开
for k in keys_array[indexes]:
...
那么这将是非常快(比静态长度版本速度更快),所以这个问题是转换,从numpy.ndarray
反对到tuple
。
过滤掉None
入门钥匙给我们提供了大约50%的速度增益,所以随时添加它。
比方说,你有一本字典 - d
d = {(1,2):3,(1,4):5,(2,4):2,(1,3):4,(2,3):6,(5,1):5,(3,8):5,(3,6):9}
首先,你可以得到字典键 -
keys = d.keys()
=>
dict_keys([(1, 2), (3, 8), (1, 3), (2, 3), (3, 6), (5, 1), (2, 4), (1, 4)])
现在,让我们定义一个函数is_match
能判定给出两个元,如果他们等于或不是基于你的条件 - is_match((1,7),(1,None))
,is_match((1,5),(None,5))
和is_match((1,4),(1,4))
将返回True
而is_match((1,7),(1,8))
,is_match((4,7),(6,12))
将返回False
。
def if_equal(a, b):
if a is None or b is None:
return True
else:
if a==b:
return True
else:
return False
is_match = lambda a,b: False not in list(map(if_equal, a, b))
tup = (1, None)
matched_keys = [key for key in keys if is_match(key, tup)]
=>
[(1, 2), (1, 3), (1, 4)]
谢谢,但是这个解决方案的性能比我已经提出的更差(见“可变长度”下的问题)。 – Chris
它是不好用的内置插件名称为对象,所以'filter'应该改名(以'filter_entries'例如) –
@AzatIbrakov谢谢你,我改变了它。 – Chris