OLAP 数据分析系统实现方案
什么是OLAP
数据处理大致可以分成两大类:联机事务处理OLTP(on-line transaction processing)、联机分析处理OLAP(On-Line Analytical Processing);
-
OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。通俗的讲,就是对数据的增删改查等操作。
-
OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。通俗的讲,就是对数据按不同维度的聚合,维度的上钻,下卷等
这里我们聊一聊OLAP,OLAP系统对数据的处理也有多种:MPP、搜索引擎和预处理。这里讲的是预处理的场景。
一个预处理的OLAP系统在处理数据时总是分成如下几个阶段:数据收集,数据清洗,数据计算,数据输出。根据业务需要部分阶段会重复。
数据收集
数据收集阶段负责对接第三方系统,从第三方系统接收数据。有如下方式
-
API方式:通过程序发布一个接口,一般使用restful api,这个方式比较灵活。
-
共享数据库:通过约定一个数据库表,给第三方提供一个数据库账号,第三方直接写数据进去。
-
消息队列:将数据直接发送进消息队列,之后通过API将数据转到数据库。
各方式的比较
API | 数据库 | 消息队列 | |
---|---|---|---|
安全认证 | 可以自定义 | 依赖数据库 | 依赖消息队列 |
接收复杂数据 | 复杂JSON | 关系数据 | 复杂JSON |
数据校验 | 个性化校验 | 字段长度和类型的校验 | 不校验 |
发布协议 | 任意协议 | 依赖数据库 | 依赖消息队列 |
开发工作 | 需要 | 不需要 | 不需要 |
维护难度 | 高,需要修改Bug | 低 | 高,需要修改Bug |
可用性 | 中,部署服务会中断业务 | 中,增加字段会中断业务 | 高,不中断业务 |
部署设备 | 数据库+API服务器 | 数据库 | 数据库+API服务器+消息队列 |
此阶段有两个注意事项:
-
无论使用什么方式,这些接口都是一个事件接口。
事件接口的意思是这个接口接收的信息不是名词,是动词。类似一个接收物流运单的接口,这个接口接收的不是物流运单,接收的是物流运单变化。所以接口需要知道运单是新建的、修改的、删除的。
-
保存原始数据 任何逻辑都会有bug,如果保存数据时对数据做了加工,一旦发现逻辑有bug,将没有修复的机会了。所以要求数据一旦接收了,就原封不动的保存。所以这里的数据库只保存原始数据,尽可能的原封不动。
对于接收到的数据我们可以称之为Data Event。这些数据一般有时间戳,或者其他的确定顺序的字段,便于事件回放。
数据清洗
数据清洗阶段的主要作用是对接收到的数据按照业务调整格式。
-
将事件转换为业务数据。 收到的数据可能是好多条:新建运输单;修改运输方式为海运;修改重量为1吨。经过清洗变成一条业务数据:运输单,运输方式为海运,重量1吨。
-
业务模型转换 上游第三方系统发送的数据一般会按照他们的业务来发送数据,类似第三方可能会跟踪一个一个的小箱子,而我们的新系统需要将这些小箱子当成一个运输单来跟踪。
-
数据格式转换 类似将字符串的2020-02-20,调整为时间格式,将各种时间统一到一个时区,将字符串的男女转换为枚举值
-
记录变化 这个是一个可选项,有些业务会统计变化次数,此时这个记录就很重要了。但是这个信息的记录会比较繁琐,根据业务实际情况酌情处理。
经过数据清洗,我们得到了具有业务含义的基础数据,这些数据会根据第三方提供的数据不断更新,我们称其为Base Data。这部分数据的特点是
-
和Data Event基本一致
-
各种转换是非常确定的业务,未来几乎不变化
-
这类数据也建议放一个时间戳,标记什么时候更新过,方便定位问题
数据计算
此阶段的逻辑是:加载Base Data,根据配置和业务需要计算KPI。类似计算每个运输单从发货到收货持续了多长时间。这个阶段计算内容有如下要求:
-
幂等性。相同的数据多次计算结果应该是一样的。
-
计算的KPI的逻辑之间彼此独立。这是为了便于维护
-
如果KPI的逻辑是配置的,需要有version,每一条数据在计算的时候都可以找到固定的version,保证第一条幂等性。
-
建议增加更新时间信息,方便定位问题
这个阶段的产物是可以直接输出或者经过简单的汇聚就可以输出的数据,基本到达了最后的可视数据阶段,我们称其为View Data。在一个复杂的系统中,一个业务的View Data可能成为另一个业务的Base Data。此时会多出来一个阶段:计算关联数据。
通过计算关联数据,让数据计算过程只被一种数据触发计算,在实际实施时可以有效防止数据冲突。
数据输出
代表整个平台将计算后数据对外输出,可以有如下形式
-
监控数据变化,给下游系统发送变化内容
-
对下游系统提供查询接口
-
定期发送符合条件的数据到服务器。类似ftp服务器。
-
做简单的汇聚,类似统计指定时间段中运输单耗时为1天2天3天的量分别为多少。
对于主动给下游发送数据的接口,在设计的时候要提供重复发送的功能。重复发送功能对定位问题和实际线上生产问题的数据恢复很有帮助。发送数据接口最好提供指定模拟发送时间功能,这样在数据遗漏,补充发送时会更为简单。
Demo
这里举一个复杂点的物流系统的例子,如图:
图中实线是数据的流向,虚线是消息队列的流向,Kafka代表Kafka的一个topic。
这里有两类数据,运单和发票。
-
运单
运单的场景是一个普通场景。
-
采用数据库接收了数据。
-
通过"运单Processor"将数据清洗了一下,结果保存到运单Base库中,同时发送运单单号到kafaka。
-
运单Integrator在接到Kafka中运单号任务后,从运单Base库中读取运单信息计算需要的KPI,之后保存结果到运单View。
-
运单Service负责对外提供数据查询和统计运单的Data API。
-
-
发票
发票的场景是一个复杂场景。标准流程部分
-
采用数据库接收了数据。
-
通过"发票Processor"将数据清洗了一下,结果保存到运单Base库中,同时发送运单单号到kafaka。
-
发票Integrator在接到Kafka中运单号任务后,从发票Base库和运单View库两个中读取信息计算需要的KPI,之后保存结果到发票View。
-
发票Service负责对外提供数据查询和统计发票的Data API。
附加流程部分
-
为了监控运单的变化,运单Integrator会将更新后运单号发送到发票Discovery的Kafka中。
-
发票Discovery服务从发票View中可以找到所有曾经分摊过的发票数据,这部分需要重新计算。
-
发票Discovery服务将找到的发票号重新发送到发票Integrator的Kafka中
-
其他
-
归档
一个完备的系统应该也要提供数据归档机制,不过一般数据在运行1,2年内不会涉及这个功能。可以延迟一年开发。
-
重跑
任何程序都会出问题,所以每个阶段都要提供从跑的机制。