将一个特定的PCollection写入BigQuery

问题描述:

假设我通过SideOutputs创建了两个输出PCollections,并且取决于某些条件,我只想将其中一个写入BigQuery。这个怎么做?将一个特定的PCollection写入BigQuery

基本上我的用例是我试图使Write_Append和Write_Truncate动态。我从我在BigQuery中维护的配置表中获取信息(append/truncate)。所以根据我在配置表中的内容,我必须应用截断或追加。

因此,使用SideOutputs我可以创建两个PCollections(分别为Append和Truncate),其中一个将为空。具有所有行的那个必须写入BigQuery。这种方法是否正确?

,我正在使用的代码:

final TupleTag<TableRow> truncate = 
        new TupleTag<TableRow>(){}; 
       // Output that contains word lengths. 
       final TupleTag<TableRow> append = 
        new TupleTag<TableRow>(){}; 

       PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){ 
       @ProcessElement 
       public void processElement(ProcessContext c) 
       { 
        String value = c.sideInput(configView).get(0).toString(); 
        LOG.info("config: "+value); 
        if(value.equals("truncate")){ 
         LOG.info("outputting to truncate"); 
         c.output(new TableRow().set("color", c.element())); 
        } 
        else 
        { 
         LOG.info("outputting to append"); 
         c.output(append,new TableRow().set("color", c.element())); 
        } 
        //c.output(new TableRow().set("color", c.element())); 
       } 
      }).withSideInputs(configView).withOutputTags(truncate, 
        TupleTagList.of(append))); 

       results.get(truncate).apply("truncate",BigQueryIO.writeTableRows() 
         .to("projectid:datasetid.tableid") 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

       results.get(append).apply("append",BigQueryIO.writeTableRows() 
         .to("projectid:datasetid.tableid") 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

我需要执行一出两个。如果我做这两个表将会被截断反正。

P.S.我使用的Java SDK(Apache的梁2.1)

+0

这是一个普遍的问题,“这种方法是否正确?”或者你想要一些代码解决方案? –

+0

@Marcin Zablocki好吧,我也想要一些代码解决方案 – rish0097

+0

你说你有两个PCollections,那么问题是什么呢?分裂和写作的方法似乎没问题。 –

我相信你是正确的,如果你的管道包括在所有的BigQuery资料表WRITE_TRUNCATE写,目前该表将变得更截断如果没有数据。在这种情况下,请拨打file a JIRA以支持更多可配置的行为。

所以,如果你想它有条件地不被截断,你需要有条件地不包括写转换。是否有办法将条件推到该级别,还是实际上必须从流水线中的其他数据计算条件?

(我能想到的唯一解决方法是使用DynamicDestinations动态选择要截断的表的名称,并截断一些其他的虚拟空表 - 相反,我可以在回答上一段后详细说明这一点)

+0

嗨@jkff ...一种将条件推到该级别的方法是我期待的一种解决方案,可以在这里找到......并且是必须从我从配置表中检索的数据计算得出的,该配置表表明APPEND或TRUNCATE ...所以如果有一个使用DynamicDestinations的解决方法,我真的很想知道... – rish0097

+0

你可以在你的主程序中说if(condition){p.apply(... APPEND ...) }其他{p.apply(... TRUNCATE ...)}?或者,条件本身是否依赖于管道计算的数据,并且无法通过构建管道的主程序进行评估? – jkff

+0

它依赖于通过流水线计算的数据 – rish0097