IDEA 中 用spark 1.6.x 实现 不同数据源join (hive join MySQL )
hive 服务端配置
[[email protected] conf]$ pwd
/home/hadoop/app/hive-1.1.0-cdh5.7.0/conf
[[email protected] conf]$ cat hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.137.251:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>192.168.137.251</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
启动 hive metastore
[[email protected] ~]$ cp -rp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf/
将 SPARK_HOME/conf/hive-site.xml 文件 添加如下的内容
<property>
<name>hive.metastore.uris</name>
<value>thrift:/192.168.137.251:9083</value>
</property>
hive --service metastore 2>&1 >> metastore1.log&
idea 项目的pom 文件 添加 hive 和 MySQL的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency>
idea 中 的创建于Scala 同一目录的 resources 文件夹 复制 hive-site.xml文件 ,只需要保留 如下的内容即可
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.137.251:9083</value>
</property>
需要注意的地方:Windows平台必须能够解析 192.168.137.251 这个IP地址
package com.wjl7813 import java.util.Properties import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} /** * Created by 92421 on 2018/3/8. */ object HiveJoinMysql { def main(args: Array[String]): Unit = { //1、构建上下文 val conf = new SparkConf() .setAppName("HiveJoinMysql") .setMaster("local[*]") val sc = SparkContext.getOrCreate(conf) val sqlContext = new HiveContext(sc) //Mysql连接信息 val url = "jdbc:mysql://192.168.137.251:3306/spark" val password = "oracle" val user = "root" val props = new Properties() props.put("user", user) props.put("password", password) // 需求一:将hive中的数据同步到mysql中 // 通过write.jdbc将数据写出到RDBMs有一个小问题,一般情况下不使用该方式将数据写出到关系型数据库中 // 没法实现Insert Or Update的操作(基于数据记录进行更新和插入的操作) // TODO: 考虑一下到底如何将数据写入RDBMs比较好==>考虑RDD写出数据的方式 //2、将dept数据同步到mysql中 sqlContext .read .table("comm.dept") .write .mode(SaveMode.Overwrite) .jdbc(url, "tb_dept", props) // 需求二:hive表和mysql中的表进行join操作 /** * 步骤: * 1. 读取mysql中的数据形成DataFrame * 2. 将DataFrame注册成为临时表(tmp_tb_dept) * 注意:在注册临时表的时候,临时表中不能出现"."这个符号 * 3. 将tmp_tb_dept和common.emp进行数据join */ sqlContext .read .jdbc(url, "tb_dept", Array("deptno < 25", "deptno >=25 AND deptno < 28", "deptno >= 28"), props) .registerTempTable("tmp_tb_dept") val joinResultDF = sqlContext.sql("select a.*,b.dname,b.loc from comm.emp a join tmp_tb_dept b on a.deptno = b.deptno") // 需求三:将数据保存到HDFS上格式为parquet joinResultDF .write .format("parquet") .mode(SaveMode.Overwrite) .partitionBy("deptno") // .save("result/sql/parquet/01") // 指定的是hdfs上的文件,如果没有给定fs.defaultFS,默认是保存到本地的
.save("hdfs://192.168.137.251:8020/data/parquet/01")
// 存储到hive中==>将数据存储到Hive中,数据格式为Parquet
joinResultDF
.write
.format("parquet")
.mode(SaveMode.Overwrite)
.partitionBy("deptno")
.saveAsTable("tb_result_join_emp_dept")
val joinResultshow = sqlContext.sql("select * from tb_result_join_emp_dept where comm is not null")
joinResultshow.show()
sc.stop()
}
}
Windows平台上执行过程中可能出现的异常:
1. Exception in thread "main" java.lang.OutOfMemoryError: PermGen space
at java.lang.ClassLoader.defineClass1(Native Method)
解决方案:run->edit configxxxx中给定运行的JVM参数-XX:PermSize=128M -XX:MaxPermSize=256M
2. 可能出现的问题:由于hadoop在windows上和linux上的执行方式不一样,在使用hiveContext对象的时候,需要应用到hadoop的底层mapreduce的一些相关代码,如果环境和源码之间存在着兼容问题的话,有可能出现NullPointException异常
解决方案:直接修改hadoop的底层源码,然后将修改好的源码放到spark项目中
3. 如果配置了HADOOP_USER_NAME,个人建议删除(windows)
4. ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\ibf\AppData\Local\Temp\spark-d1d77acb-0a02-4db8-b2ba-217796d96207
java.io.IOException: Failed to delete: C:\Users\ibf\AppData\Local\Temp\spark-d1d77acb-0a02-4db8-b2ba-217796d96207
at =====> 不解决,删除临时文件失败导致的,不会影响正式的业务代码,只会在windows上产生