Azure数据工厂复制

问题描述:

我有一个管道内的Azure数据工厂复制活动。复制活动正在运行 - 但数据被复制多次。我的数据源是Azure NoSQL数据库。如何将复制活动配置为不重复记录?Azure数据工厂复制

这里是我的活动

{ 
    "name": "Copy Usage Session Data", 
    "properties": 
    { 
    "description": "", 
    "activities": 
    [ 
     { 
     "type": "Copy", 
     "typeProperties": 
     { 
      "source": {"type": "DocumentDbCollectionSource"}, 
      "sink": 
      { 
      "type": "SqlSink", 
      "writeBatchSize": 0, 
      "writeBatchTimeout": "05:00:00", 
      "sliceIdentifierColumnName": "InstallationSliceIdentifier" 
      }, 
      "translator": 
      { 
      "type": "TabularTranslator", 
      "ColumnMappings": "machineKey: machineKey, product: product, softwareVersion: softwareVersion, id: DocumentDBId" 
      } 

     }, 
     "inputs": [{"name": "Machine Registration Input Data"}], 
     "outputs": [{"name": "Machine Registration Output Data"}], 
     "policy": 
     { 
      "timeout": "01:00:00", 
      "concurrency": 1, 
      "executionPriorityOrder": "OldestFirst" 
     }, 
     "scheduler": 
     { 
      "frequency": "Hour", 
      "interval": 1 
     }, 
     "name": "Machine Registration Data To History", 
     "description": "Copy Machine Registration Data To SQL Server DB Activity" 
     }, 
     { 
     "type": "Copy", 
     "typeProperties": 
     { 
      "source": {"type": "DocumentDbCollectionSource"}, 
      "sink": 
      { 
      "type": "SqlSink", 
      "writeBatchSize": 0, 
      "writeBatchTimeout": "05:00:00", 
      "sliceIdentifierColumnName": "UsageSessionSliceIdentifier" 
      }, 
      "translator": 
      { 
      "type": "TabularTranslator", 
      "ColumnMappings": "id: usageSessionId, usageInstallationId: usageInstallationId, startTime: startTime, stopTime: stopTime, currentVersion: currentVersion" 
      } 
     }, 
     "inputs": [{"name": "Usage Session Input Data"}], 
     "outputs": [{"name": "Usage Session Output Data"}], 
     "policy": 
     { 
      "timeout": "01:00:00", 
      "concurrency": 2, 
      "executionPriorityOrder": "OldestFirst" 
     }, 
     "scheduler": 
     { 
      "frequency": "Hour", 
      "interval": 1 
     }, 
     "name": "Usage Session Data To History", 
     "description": "Copy Usage Session Data To SQL Server DB Activity" 
     } 
    ], 
    "start": "2017-05-29T16:15:00Z", 
    "end": "2500-01-01T00:00:00Z", 
    "isPaused": false,   
    "pipelineMode": "Scheduled" 
    } 
} 

+2

你可以分享你的副本活动的JSON模板。没有这个,很难提供解决方案。 – Venky

+0

我想添加我的活动 - 但要等待评论 - 我无法编辑该问题。那么如何编辑问题 - 请参阅无编辑标签? 我阅读了关于基于查询技术进行过滤的sqlReaderQuery选项 - 他们只显示基于数据集/窗口时间片的过滤。但是我的源代码是NoSQL DB,我只能读取SQL源可以使用的数据库 - 最重要的是,我在数据库集合中的时间戳是从1970/1/1偏移的。我找不到sqlReaderQuery的明确说明。我在哪里可以找到如何使用? – Peter

+0

您需要添加查询以选择要复制的数据片。现在,因为您没有指定查询,两个活动都将整个数据从NoSql复制到Sink。因此它看起来像复制了两次。在查询中使用'SliceStart','SliceEnd'。 – Venky

将管道开始日期更改为当前日期。如果流水线开始日期在过去,那么从该日期到当前日期创建许多数据片段,并且它们将被复制。此外,您已设置Concurrency : 2。这意味着2次活动将一次运行。

例如,如果您的输出数据集可用性为1天,并且您的管道开始日期为29 - 05 - 2017年,那么直到今天16-06-2017总共将为每一天创建18个数据切片。如果将并发设置为2,则一次运行2个复制活动。如果Concurrency : 10那么10个复制活动并行运行。

请注意输出数据集的可用性,管道开始日期,并发性和源查询。

源查询的示例是$$Text.Format('select * from c where c.ModifiedDate >= \'{0:yyyy-MM-ddTHH:mm:ssZ}\' AND c.ModifiedDate < \'{1:yyyy-MM-ddTHH:mm:ssZ}\'', WindowStart, WindowEnd)其中,ModifiedDate是一个列,指示在该特定集合中创建的文档的时间。

更新时间:

{ 
    "name": "DocDbToBlobPipeline", 
    "properties": { 
    "activities": [ 
     { 
     "type": "Copy", 
     "typeProperties": { 
      "source": { 
      "type": "DocumentDbCollectionSource", 
      "query": "SELECT Person.Id, Person.Name.First AS FirstName, Person.Name.Middle as MiddleName, Person.Name.Last AS LastName FROM Person", 
      "nestingSeparator": "." 
      }, 
      "sink": { 
      "type": "BlobSink", 
      "blobWriterAddHeader": true, 
      "writeBatchSize": 1000, 
      "writeBatchTimeout": "00:00:59" 
      } 
     }, 
     "inputs": [ 
      { 
      "name": "PersonDocumentDbTable" 
      } 
     ], 
     "outputs": [ 
      { 
      "name": "PersonBlobTableOut" 
      } 
     ], 
     "policy": { 
      "concurrency": 1 
     }, 
     "name": "CopyFromDocDbToBlob" 
     } 
    ], 
    "start": "2015-04-01T00:00:00Z", 
    "end": "2015-04-02T00:00:00Z" 
    } 
} 

看一看Data Factory scheduling and execution

为了您Reference

+0

{知道我缺少查询参数。问题在于文档不清楚它的工作原理。另外,正如我所说的,我的数据源是一个Azure NoSQL DB,我想知道如何使用它作为源。文档给人的印象是无法使用的。 – Peter

+0

尝试使用“数据工厂复制数据”向导从Azure NoSQL(DocumentDB)复制到支持的其他接收器。它自动创建管道,活动,数据集,链接服务。如果您的数据源或接收器在“复制数据”向导中不可用,那么这意味着,这些还不受支持。 – Venky

+0

确定 - 查看复制数据向导DocumentDB NoSQL被指定为允许用作数据源。我想知道的是DocumentDB中是否有默认值选项 - 就像在SQL Server数据库中一样 – Peter

您可以使用查询与创建/修改日期(它应该在你的表存在),只挑记录当前日期。这将由切片开始或结束日期提供,并且这种方式您每天只能读取新创建的记录。