org.apache.thrift.transport.TTransportException在更新时创建新的RDD

问题描述:

我有一个有两列的数据框:idvalue。我想根据另一张地图更新值。org.apache.thrift.transport.TTransportException在更新时创建新的RDD

df.collect.foreach({ 
    df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value] 
    }) 

这是正确的使用方法吗?

我尝试:

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../"  
val input = sc.textFile(mappingPath) 

输入是jsons的列表,其中每一行是JSON其中我映射到POJO类CountryInfo使用MappingUtils它负责JSON解析和转换:

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 


val events = sqlContext.sql("select itemId EventList") 

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val çountryInfo = showTitleInfo(MappingsList.get(itemId)); 
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() 
    val language = countryInfo.getLanguage() 

    Row(itemId, country, language) 
     }) 

但我不断收到此错误:

org.apache.thrift.transport.TTransportException at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) 
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at 

org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at 
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at 

org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at 

org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

我使用的Spark 1.6

+0

你应该解释一点! –

+0

@AlbertoBonsanto我想更新基于映射器的列'值' – Swetha

+1

我不明白,什么映射器?什么是ID的使用?什么是DF的模式? –

你的问题有点模棱两可。

不要收集不必要的大型RDD。

当在RDD上发出收集操作时,数据集将被复制到驱动程序 ,即主节点。如果 数据集太大而不适合内存,将抛出内存异常; take或takeSample可以是 ,用于仅检索元素的上限数量。

你是collect方法做的方式是不正确的(如果是的大数据帧它可能会导致OOM)..

1)要更新任何列或添加新的列,您可以使用withColumn

DataFrame withColumn(java.lang.String colName, Column col) 
Returns a new DataFrame by adding a column or replacing the existing column that has the same name. 

2)查看基于另一个数据结构的条件..

可以使用when otherwise语法像下面

Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrame example

import org.apache.spark.sql.functions._ 
val sqlcont = new org.apache.spark.sql.SQLContext(sc) 
val df1 = sqlcont.jsonRDD(sc.parallelize(Array(
     """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""", 
     """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""", 
     """{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}""" 
    ))) 

val makeSIfTesla = udf {(make: String) => 
    if(make == "Tesla") "S" else make 
} 
df1.withColumn("make", makeSIfTesla(df1("make"))).show 

以上也可以达到这样的..

val rdd = sc.parallelize(
     List((2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt")) 
) 
    val sqlContext = new SQLContext(sc) 

    // this is used to implicitly convert an RDD to a DataFrame. 
    import sqlContext.implicits._ 

    val dataframe = rdd.toDF() 

    dataframe.foreach(println) 

dataframe.map(row => { 
    val row1 = row.getAs[String](1) 
    val make = if (row1.toLowerCase == "tesla") "S" else row1 
    Row(row(0),make,row(2)) 
    }).collect().foreach(println) 

//[2012,S,S] 
//[1997,Ford,E350] 
//[2015,Chevy,Volt] 
+0

它适用于小型查询,但当我复杂时,我总是收到错误 – Swetha

+0

您的意思是说使用'collect'你会遇到大数据错误? –

+0

更新了错误的问题 – Swetha