IDEA 中 用spark 1.6.x 实现 不同数据源join (hive join MySQL )

hive 服务端配置 

[[email protected] conf]$ pwd
/home/hadoop/app/hive-1.1.0-cdh5.7.0/conf

[[email protected] conf]$ cat hive-site.xml 

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://192.168.137.251:3306/hive?createDatabaseIfNotExist=true</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>

</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value> 
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>hive</value>
</property>


<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/user/hive/warehouse</value>
</property>


<property>
    <name>hive.cli.print.current.db</name>
    <value>true</value>
  </property>

   <property>
    <name>hive.cli.print.header</name>
    <value>true</value>
   </property>

<property>
<name>hive.server2.thrift.port</name>
   <value>10000</value>

</property>

<property>
  <name>hive.server2.thrift.bind.host</name>
  <value>192.168.137.251</value>
</property>



<property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
</property>

</configuration>



 启动  hive metastore  

[[email protected] ~]$ cp -rp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf/

将 SPARK_HOME/conf/hive-site.xml 文件 添加如下的内容 


  <property>
        <name>hive.metastore.uris</name>
        <value>thrift:/192.168.137.251:9083</value>
    </property>  




hive --service metastore 2>&1 >> metastore1.log& 


idea 项目的pom 文件  添加 hive 和 MySQL的依赖  
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.10</artifactId>
      <version>1.6.1</version>
    </dependency>

<!-- MySQL -->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.40</version>
</dependency>



idea 中 的创建于Scala 同一目录的 resources 文件夹 复制 hive-site.xml文件  ,只需要保留 如下的内容即可

  <property>
        <name>hive.metastore.uris</name>
        <value>thrift://192.168.137.251:9083</value>
    </property>


需要注意的地方:Windows平台必须能够解析 192.168.137.251 这个IP地址

package com.wjl7813

import java.util.Properties

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by 92421 on 2018/3/8.
  */
object HiveJoinMysql {

  def main(args: Array[String]): Unit = {


    //1、构建上下文
    val conf = new SparkConf()
      .setAppName("HiveJoinMysql")
      .setMaster("local[*]")

    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = new HiveContext(sc)

    //Mysql连接信息

    val url = "jdbc:mysql://192.168.137.251:3306/spark"
    val password = "oracle"
    val user = "root"
    val props = new Properties()
    props.put("user", user)
    props.put("password", password)

    // 需求一:将hive中的数据同步到mysql中
    // 通过write.jdbc将数据写出到RDBMs有一个小问题,一般情况下不使用该方式将数据写出到关系型数据库中
    // 没法实现Insert Or Update的操作(基于数据记录进行更新和插入的操作)
    // TODO: 考虑一下到底如何将数据写入RDBMs比较好==>考虑RDD写出数据的方式
    //2、将dept数据同步到mysql中

    sqlContext
      .read
      .table("comm.dept")
      .write
      .mode(SaveMode.Overwrite)
      .jdbc(url, "tb_dept", props)


    // 需求二:hive表和mysql中的表进行join操作
    /**
      * 步骤:
      * 1. 读取mysql中的数据形成DataFrame
      * 2. 将DataFrame注册成为临时表(tmp_tb_dept)
      * 注意:在注册临时表的时候,临时表中不能出现"."这个符号
      * 3. 将tmp_tb_dept和common.emp进行数据join
      */
    sqlContext
      .read
      .jdbc(url, "tb_dept", Array("deptno < 25", "deptno >=25 AND deptno < 28", "deptno >= 28"), props)
      .registerTempTable("tmp_tb_dept")
    val joinResultDF = sqlContext.sql("select a.*,b.dname,b.loc from comm.emp a join tmp_tb_dept b on a.deptno = b.deptno")


    // 需求三:将数据保存到HDFS上格式为parquet
    joinResultDF
      .write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .partitionBy("deptno")
     // .save("result/sql/parquet/01") // 指定的是hdfs上的文件,如果没有给定fs.defaultFS,默认是保存到本地的
    .save("hdfs://192.168.137.251:8020/data/parquet/01")

// 存储到hive中==>将数据存储到Hive中,数据格式为Parquet
joinResultDF
.write
.format("parquet")
.mode(SaveMode.Overwrite)
.partitionBy("deptno")
.saveAsTable("tb_result_join_emp_dept")
val joinResultshow = sqlContext.sql("select * from tb_result_join_emp_dept where comm is not null")
joinResultshow.show()
sc.stop()

}

}




Windows平台上执行过程中可能出现的异常:
  1. Exception in thread "main" java.lang.OutOfMemoryError: PermGen space
        at java.lang.ClassLoader.defineClass1(Native Method)
  解决方案:run->edit configxxxx中给定运行的JVM参数-XX:PermSize=128M -XX:MaxPermSize=256M



IDEA 中 用spark 1.6.x 实现 不同数据源join (hive join MySQL )


  2. 可能出现的问题:由于hadoop在windows上和linux上的执行方式不一样,在使用hiveContext对象的时候,需要应用到hadoop的底层mapreduce的一些相关代码,如果环境和源码之间存在着兼容问题的话,有可能出现NullPointException异常
    解决方案:直接修改hadoop的底层源码,然后将修改好的源码放到spark项目中


  3. 如果配置了HADOOP_USER_NAME,个人建议删除(windows)
  
  4.    ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\ibf\AppData\Local\Temp\spark-d1d77acb-0a02-4db8-b2ba-217796d96207
java.io.IOException: Failed to delete: C:\Users\ibf\AppData\Local\Temp\spark-d1d77acb-0a02-4db8-b2ba-217796d96207
at =====> 不解决,删除临时文件失败导致的,不会影响正式的业务代码,只会在windows上产生