OGG同步数据到Hadoop平台(Kafka)
需求
需求很简单,就是从数据中心利用OGG同步数据到应用系统中的kafka。整理如下
名称 | 系统版本 | IP地址 | OGG版本 | 其他服务 |
---|---|---|---|---|
源端 | Linux version 3.8.13-68.3.4.el6uek.x86_64 | 192.168.1.41 |
Version 12.3.0.1.0 OGGCORE_12.3.0.1.0_PLATFORMS_170721.0154_FBO |
Oracle Release 12.2.0.1.0 Production |
目的端 | Linux version 3.10.0-514.el7.x86_64 | 192.168.3.63 |
Version 12.3.0.1.0 OGGCORE_OGGADP.12.3.0.1.0GA_PLATFORMS_170828.1608 |
kafka_2.12-2.0.0 zookeeper-3.4.13 |
准备
下载
zookeeper:http://zookeeper.apache.org/releases.html#download
kafka:http://kafka.apache.org/downloads
ogg:http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html
文件自行下载,需说明的是源端和目标端的文件不一样,源端需要下载Oracle GoldenGate for Oracle,目标端需要下载Oracle GoldenGate for Big Data,
安装
根据说明安装相关的软件,并做好相关的配置,如环境变量、网络防火墙等,不在细说,不明白的问度娘
这里列举一下自己的安装路径:
名称 | OGG | Oracle | zookeeper | kafka |
---|---|---|---|---|
源端 | /oracle/app/ogg123 | /oracle/app/oracle/product/12.1.0/db_1/bin | -- | -- |
目的端 | /usr/local/ogg | -- | /usr/local/zookeeper-3.4.13 | /usr/local/kafka_2.12-2.0.0 |
配置
名称 | 源端 | 目的端 |
---|---|---|
OGG环境变量 |
1.编辑profile [[email protected] bin]$ vim /etc/profile 2.添加如下 export OGG_HOME=/oracle/app/ogg123 export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib export PATH=$OGG_HOME:$PATH 3.使之生效 [[email protected] bin]$ source /etc/profile 4.测试 [[email protected] bin]$ ggsci |
1.编辑profile [[email protected]]# $ vim /etc/profile 2.添加如下 export OGG_HOME=/usr/local/ogg $JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so: $JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib 3.使之生效 [[email protected]]$ source /etc/profile 4.测试 [[email protected]]$ ggsci |
Oracle服务配置 |
3.启用最小补充日志记录与强制日志记录模式(必须开启) [OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容] 3.1登录sql plus SQL> ALTER DATABASE FORCE LOGGING;
4.1.tnsnames.ora配置PDB连接地址 |
|
Manager配置 |
GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 42> edit params mgr --默认监听端口 PORT 7809 --动态端口列表,当默认监听端口不可用时,会在端口列表中选择一个,范围为256个 --重启参数设置表示重启所有EXTRACT进程,最多3次,每次间隔5分钟 --TRAIL文件的定期清理 |
GGSCI (localhost.localdomain) 54> edit params mgr PORT 7809 |
Extract进程配置 |
GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 49> edit param extkafka --进程名称 extract extkafka --设置环境变量,这里分别设置了Oracle数据库以及字符集 --SETENV (ORACLE_SID = "zoehdc") --OGG连接Oracle数据库的帐号密码 --定义trail文件的保存位置以及文件名,文件名只能是2个字母,其余部分OGG会补齐 SOURCECATALOG ZOEHDC --复制表的表名,支持*通配,必须以;结尾
使用PDB用户登录 GGSCI (zoedb41) 1>DBLOGIN USERID C##CDCCONNECT password CDCCONNECT 添加进程 GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 2>add extract extkafka,tranlog,begin now EXTRACT added. 添加trail文件与extract进程绑定 GGSCI (zoedb41 as C##[email protected]/ZOEHDC)3>add exttrail ./dirdat/to,extract extkafka EXTTRAIL added. |
|
Pump进程配置 |
GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 60> edit params pumkafka --指定extract进程名称 SETENV(NLS_LANG ="AMERICAN_AMERICA.ZHS16GBK") --OGG连接Oracle数据库的帐号密码 --目标端(kafka)OGG的mgr服务的地址以及监听端口 --目标端trail文件存储位置以及名称 --复制表的表名,支持*通配,必须以;结尾
同extract进程一样: GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 1> add extract pumkafka,exttrailsource ./dirdat/to EXTRACT added. GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 2> add rmttrail ./dirdat/to,extract pumkafka RMTTRAIL added. |
|
映射表 |
GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 79> edit params ormkafka --Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输, --故需要定义表之间的关系映射 --同步的表信息 table ods.PAT_BASIC_INFO;
在OGG主目录下执行(oracle用户): [[email protected] ogg123]$ ./defgen paramfile ./dirprm/ormkafka.prm 检查生成的define文件是否存在 [[email protected] dirdef]$ ll -l /oracle/app/ogg123/dirdef 将生成的/oracle/app/ogg123/dirdef/ods.PAT_BASIC_INFO发送的目标端ogg目录下的dirdef里: [email protected] dirdef]$ scp -r /oracle/app/ogg123/dirdef/ods.PAT_BASIC_INFO [email protected]:/usr/local/ogg/dirdef/ |
检查从源端生成的define文件是否存在 [[email protected]]# ll -l /usr/local/ogg/dirdef/
|
Replicat进程配置 |
GGSCI (localhost.localdomain) 1> edit params rekafka --rep进程名称 --在源服务器上定义的表映射文件ormkafka --定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props --复制任务的报告生成频率 --以事务传输时,事务合并的单位,减少IO操作 --源端与目标端的映射关系
添加trail文件与replicat进程绑定 GGSCI (localhost.localdomain) 2> add replicat rekafka exttrail ./dirdat/to,checkpointtable ormkafka.checkpoint REPLICAT added. |
|
kafka服务配置 |
进入config/server.properties [[email protected] kafka_2.12-2.0.0]# vi config/server.properties 编辑一下属性 #设置监听地址 listeners=PLAINTEXT://192.168.3.63:9092
配置kafka.props [[email protected] ogg]# cd /usr/local/ogg/dirprm/ [[email protected] ogg]# vim ./dirprm/kafka.props
配置custom_kafka_producer.properties [[email protected] ogg]# vim ./dirprm/custom_kafka_producer.properties
注意:ogg不识别注释,所以properties文件中不能有注释 |
运行
启动
名称 | 命令(按顺序) | 验证 |
---|---|---|
源端 |
start mgr start extkafka start pumkafka |
GGSCI (zoedb41 as C##[email protected]/ZOEHDC) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING |
目的端 |
start mgr start rekafka |
GGSCI (localhost.localdomain) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING |
异常
如果有不是RUNNING可通过查看日志的方法检查解决问题,通过下面两种方法
1.系统
[[email protected] ogg]# vim ggser.log
[[email protected] ogg]# cat ggser.log
[[email protected] ogg]# tail -n 100 ggser.log
2.ogg命令行,以rekafka进程为例
GGSCI (localhost.localdomain) 1> view report rekafka
测试
1.现在源端执行sql语句
conn ods/ods
update PAT_BASIC_INFO set PATIENT_NAME='农行333' where PATIENT_ID=262;
commit;
2.查看源端trail文件状态
ls -l /oralce/app/ogg123/dirdat/to*
-rw-rw-rw- 1 oracle oinstall 1464 May 23 10:31 /opt/ogg/dirdat/to000000
3.查看目标端trail文件状态
ls -l /usr/local/ogg/dirdat/to*
-rw-r----- 1 root root 1504 May 23 10:31 /opt/ogg/dirdat/to000000
4.查看kafka是否自动建立对应的主题,在列表中显示有test_ogg则表示没问题
[[email protected] kafka_2.12-2.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
e
test-ogg
test_ogg
topic-qc
5.通过消费者看是否有同步消息
[[email protected] kafka_2.12-2.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.63:9092 --topic test_ogg --from-beginning
{"table":"ODS.PAT_BASIC_INFO","op_type":"U","op_ts":"2018-09-04 11:39:56.317593","current_ts":"2018-09-04T12:39:04.642000","pos":"00000000010000004077","before":{"PATIENT_ID":"260","PATIENT_NAME":"建行xx1"},"after":{"PATIENT_ID":"260","PATIENT_NAME":"建行xx12"}}
{"table":"ODS.PAT_BASIC_INFO","op_type":"U","op_ts":"2018-09-04 12:40:30.402956","current_ts":"2018-09-04T12:40:36.882000","pos":"00000000010000004271","before":{"PATIENT_ID":"262","PATIENT_NAME":"农行"},"after":{"PATIENT_ID":"262","PATIENT_NAME":"农行333"}}