具有二级索引的MapR-DB Spark连接器
与大数据空间上的任何其他工具相比,MapR数据平台具有明显的优势。 MapR-DB是该平台的核心组件之一,它提供了先进的功能,可以摧毁大多数NoSQL数据库。
MapR-DB的一个重要附加功能是能够通过Connector for Apache Spark来使用Apache Spark进行编写和查询。 使用此连接器非常方便,因为它可以使用不同的Spark API(例如RDD,DataFrame和Streams)从Spark读取和写入MapR-DB。
使用连接器,我们可以发出如下查询。
生成的类型是一个数据Dataframe
,我们可以将其用作其他来源的任何其他数据框,就像我们通常在Spark中一样。
如果我们随后过滤掉数据集,就会开始出现问题。 例如,让我们看下面的查询。
过滤器被下推,因此MapR-DB进行过滤,并且仅将符合过滤器的数据发回,从而减少了MapR-DB和Spark之间传输的数据量。 但是,如果在名字字段上创建了索引,则该索引将被忽略并且表将被完全扫描,以尝试查找符合过滤条件的行。
通过在字段上建立索引,我们希望使用该索引,以便对该字段上的查询进行优化,从而最终加快计算速度。 提供的连接器根本没有使用此功能。
必要性
我们的团队MapR Professional Services知道使用MapR-DB二级索引进行过滤对于性能而言是巨大的,并且由于我们的许多客户实际上都在尝试利用此功能(二级索引),因此我们采取了不同的方法来强制使用使用Spark时的索引数量。
下一篇文章是由一位同事撰写的, 如何在OJAI中使用Spark中的二级索引 ,他在其中解释了解决现有问题的一些方法。
即使我们可以采取一些捷径,我们也必须放弃默认连接器具有的一些不错的构造,例如.loadFromMapRDB(...)
。
独立连接器
过去,我以太多方式扩展了Apache Spark。 我已经编写了自己的自定义数据源 ,最近又编写了一个用于Spark结构化流的自定义流源 。
再次,我冒险尝试编写自己的Spark数据源,但是这次是针对MapR-DB,因此我们在利用二级索引的全部优势的同时,保留了与Apache Spark的当前MapR-DB连接器相同的API。
在本文的结尾,我们将能够在完全使用二级索引的同时,通过以下方式编写查询。
Spark数据源版本2
以下数据源实现使用spark 2.3.1并使用数据源API V2。
让我们开始看我们需要的东西。
- ReadSupportWithSchema ,使我们可以创建一个DataSourceReader。
- DataSourceReader允许我们获取数据的架构,同时需要指定如何创建DataReaderFactory 。
- SupportsPushDownFilters ,使我们能够拦截查询过滤器,以便将其下推到MapR-DB。
- SupportsPushDownRequiredColumns ,使我们可以拦截查询投影,因此可以将其下推到MapR-DB。
让我们从实现ReadSupportWithSchema开始。
如我们所见,从MapR-DB读取数据时,我们只需获取要使用的表路径和架构。 然后,将它们传递给MapRDBDataSourceReader 。
MapRDBDataSourceReader
MapRDBDataSourceReader实现了DataSourceReader
,我们还将在SupportsPushDownFilters
和SupportsPushDownRequiredColumns
进行混合,以表明我们希望将过滤器和投影向下推到MapR-DB。
projections
变量将保存我们要projections
的架构(如果有)。 如果我们没有通过.select
显式地投影字段,则将所有字段投影在schema
变量上。
readSchema
与projections
和pruneColumns
结合使用。 如果在我们的Spark查询中指定一个select
则所选字段将传递给pruneColumns
而这是我们将从MapR-DB中获得的唯一字段。
pushFilters
指示我们在Spark查询的where
或filter
子句中指定的filter
。 基本上,我们必须决定要向下推送到MapR-DB的哪些,其余的将在数据存储在内存中后由Spark应用。
在上面的代码段中,我们表明我们将仅下推两种类型的过滤器,即EqualTo
和 GreaterThan
。 从MapR-DB加载数据后,除这两个过滤器之外的任何其他过滤器都不会下推,并且过滤将在内存(火花内存)中进行。
我们正在努力添加更多过滤器以匹配当前的MapR-DB连接器。
createDataReaderFactories
创建一个数据读取器列表,这些数据读取器实际上在从我们的来源MapR-DB中进行大量读取工作。 在本例中,我们仅创建一个数据读取器,但理想情况下,每个MapR-DB区域/分区都拥有一个读取器,因此我们可以利用MapR-DB提供的并行性。
MapRDBDataReaderFactory
我们几乎完成了,但是最重要的部分即将到来。
MapRDBDataReaderFactory
是我们实际构建MapR-DB查询并再次执行MapR-DB表的地方。 请注意,我们正在传递要读取的表格,要向下推的过滤器和投影。
现在,我们需要通过打开连接并创建文档存储对象来连接到MapR-DB。
createFilterCondition
构建我们要针对MapR-DB执行的查询条件。 这是我们整个实施过程中最重要的部分。
在这里,我们将合并所有过滤器。 如我们所见,我们仅针对两种数据类型实现了两个受支持的过滤器,但我们正在努力扩展此实现以匹配当前的MapR-DB连接器。
query
创建要发送到MapR-DB的最终命令。 该任务是将查询条件和投影应用于我们的connection
对象的问题。
请务必注意,由于我们使用的是OJAI ,它将自动使用字段的任何二级索引作为我们正在应用的过滤器的一部分。 确保在本文结尾处检查输出。
documents
是基于query
来自MapR-DB的数据流。
createDataReader
使用我们创建的流( documents
)进行实际的读取并将数据返回给Spark。
使用我们的连接器
此时,我们准备通过以下方式将自定义数据源插入spark。
这使我们能够使用自己的方式从MapR-DB进行读取,以便所应用的任何作为物理表二级索引一部分的过滤器都将用于优化读取。
句法
为了保持与默认MapR-DB连接器提供的API类似的API,我们通过以下方式向我们的库中添加了一些语法。
现在,我们可以像使用默认连接器一样使用连接器。
使用MapR-DB二级索引
当我们运行上面的代码时, OJAI的TRACE输出看起来类似于以下内容。
请注意,它会自动使用称为uid_idx的索引,该索引是字段uid
的索引,而该字段uid
是同时在火花过滤器中使用的字段。
结论
MapR-DB是功能强大的工具,可作为MapR数据平台的一部分运行。 Spark连接器提供了一种与MapR-DB交互的有趣方式,因为它允许我们在使用此NoSQL系统时大规模使用所有Spark结构。 但是,有时默认连接器不够用,因为当我们最需要它们时,它不使用MapR-DB的二级索引功能。
另一方面,我们的实现模仿了Connector API,并确保实现的Spark数据源使用MapR-DB二级索引,因为它依赖能够直接支持二级索引的纯OJAI查询。
我们的库代码可以在这里 MapRDBConnector 找到 。
免责声明:这是一项改进查询MapR-DB的独立工作。 该库不能替代 Apache Spark 的官方 连接器 由MapR提供,作为其发行的一部分。
From: https://hackernoon.com/mapr-db-spark-connector-with-secondary-indexes-df41909f28ea