org.apache.thrift.transport.TTransportException在更新时创建新的RDD
问题描述:
我有一个有两列的数据框:id
和value
。我想根据另一张地图更新值。org.apache.thrift.transport.TTransportException在更新时创建新的RDD
df.collect.foreach({
df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value]
})
这是正确的使用方法吗?
我尝试:
import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo
val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)
输入是jsons的列表,其中每一行是JSON其中我映射到POJO类CountryInfo使用MappingUtils它负责JSON解析和转换:
val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap
MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]
def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}
val events = sqlContext.sql("select itemId EventList")
val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val language = countryInfo.getLanguage()
Row(itemId, country, language)
})
但我不断收到此错误:
org.apache.thrift.transport.TTransportException at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
我使用的Spark 1.6
答
你的问题有点模棱两可。
不要收集不必要的大型RDD。
当在RDD上发出收集操作时,数据集将被复制到驱动程序 ,即主节点。如果 数据集太大而不适合内存,将抛出内存异常; take或takeSample可以是 ,用于仅检索元素的上限数量。
你是collect
方法做的方式是不正确的(如果是的大数据帧它可能会导致OOM)..
1)要更新任何列或添加新的列,您可以使用withColumn
DataFrame withColumn(java.lang.String colName, Column col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
2)查看基于另一个数据结构的条件..
可以使用when otherwise
语法像下面
Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrame example:
import org.apache.spark.sql.functions._
val sqlcont = new org.apache.spark.sql.SQLContext(sc)
val df1 = sqlcont.jsonRDD(sc.parallelize(Array(
"""{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""",
"""{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""",
"""{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}"""
)))
val makeSIfTesla = udf {(make: String) =>
if(make == "Tesla") "S" else make
}
df1.withColumn("make", makeSIfTesla(df1("make"))).show
以上也可以达到这样的..
val rdd = sc.parallelize(
List((2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt"))
)
val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val dataframe = rdd.toDF()
dataframe.foreach(println)
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
}).collect().foreach(println)
//[2012,S,S]
//[1997,Ford,E350]
//[2015,Chevy,Volt]
你应该解释一点! –
@AlbertoBonsanto我想更新基于映射器的列'值' – Swetha
我不明白,什么映射器?什么是ID的使用?什么是DF的模式? –