将一个特定的PCollection写入BigQuery
假设我通过SideOutputs创建了两个输出PCollections,并且取决于某些条件,我只想将其中一个写入BigQuery。这个怎么做?将一个特定的PCollection写入BigQuery
基本上我的用例是我试图使Write_Append和Write_Truncate动态。我从我在BigQuery中维护的配置表中获取信息(append/truncate)。所以根据我在配置表中的内容,我必须应用截断或追加。
因此,使用SideOutputs我可以创建两个PCollections(分别为Append和Truncate),其中一个将为空。具有所有行的那个必须写入BigQuery。这种方法是否正确?
,我正在使用的代码:
final TupleTag<TableRow> truncate =
new TupleTag<TableRow>(){};
// Output that contains word lengths.
final TupleTag<TableRow> append =
new TupleTag<TableRow>(){};
PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c)
{
String value = c.sideInput(configView).get(0).toString();
LOG.info("config: "+value);
if(value.equals("truncate")){
LOG.info("outputting to truncate");
c.output(new TableRow().set("color", c.element()));
}
else
{
LOG.info("outputting to append");
c.output(append,new TableRow().set("color", c.element()));
}
//c.output(new TableRow().set("color", c.element()));
}
}).withSideInputs(configView).withOutputTags(truncate,
TupleTagList.of(append)));
results.get(truncate).apply("truncate",BigQueryIO.writeTableRows()
.to("projectid:datasetid.tableid")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
results.get(append).apply("append",BigQueryIO.writeTableRows()
.to("projectid:datasetid.tableid")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
我需要执行一出两个。如果我做这两个表将会被截断反正。
P.S.我使用的Java SDK(Apache的梁2.1)
我相信你是正确的,如果你的管道包括在所有的BigQuery资料表WRITE_TRUNCATE写,目前该表将变得更截断如果没有数据。在这种情况下,请拨打file a JIRA以支持更多可配置的行为。
所以,如果你想它有条件地不被截断,你需要有条件地不包括写转换。是否有办法将条件推到该级别,还是实际上必须从流水线中的其他数据计算条件?
(我能想到的唯一解决方法是使用DynamicDestinations动态选择要截断的表的名称,并截断一些其他的虚拟空表 - 相反,我可以在回答上一段后详细说明这一点)
嗨@jkff ...一种将条件推到该级别的方法是我期待的一种解决方案,可以在这里找到......并且是必须从我从配置表中检索的数据计算得出的,该配置表表明APPEND或TRUNCATE ...所以如果有一个使用DynamicDestinations的解决方法,我真的很想知道... – rish0097
你可以在你的主程序中说if(condition){p.apply(... APPEND ...) }其他{p.apply(... TRUNCATE ...)}?或者,条件本身是否依赖于管道计算的数据,并且无法通过构建管道的主程序进行评估? – jkff
它依赖于通过流水线计算的数据 – rish0097
这是一个普遍的问题,“这种方法是否正确?”或者你想要一些代码解决方案? –
@Marcin Zablocki好吧,我也想要一些代码解决方案 – rish0097
你说你有两个PCollections,那么问题是什么呢?分裂和写作的方法似乎没问题。 –