使用scala执行SparkStreaming程序时拒绝连接

问题描述:

我试图在Cloudera VM中执行一个简单的wordcount SparkStreaming程序。我在REPL模式下使用Scala,而不是使用IDE。使用scala执行SparkStreaming程序时拒绝连接

这里是我的代码

val ssc = new StreamingContext(sc, Seconds(2)) 

val lines = ssc.socketTextStream("localhost",8585,MEMORY_ONLY) 

val wordsFlatMap = lines.flatMap(_.split(" ")) 

val wordsMap = wordsFlatMap.map(w => (w,1)) 

val wordCount = wordsMap.reduceByKey((a,b) => (a+b)) 

wordCount.print 

ssc.start 

我得到拒绝连接错误。我在REPL模式下执行程序。以下是错误。

scala> ssc.start 
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Starting 1 receivers 
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: ReceiverTracker started 
17/04/19 03:06:43 INFO dstream.ForEachDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.MappedDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.MappedDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.MappedDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.MappedDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.MappedDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO util.RecurringTimer: Started timer for JobGenerator at time 1492596404000 
17/04/19 03:06:43 INFO scheduler.JobGenerator: Started JobGenerator at 1492596404000 ms 
17/04/19 03:06:43 INFO scheduler.JobScheduler: Started JobScheduler 
17/04/19 03:06:43 INFO streaming.StreamingContext: StreamingContext started 

scala> 17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Receiver 0 started 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Got job 0 (submitJob at ReceiverTracker.scala:557) with 1 output partitions 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(submitJob at ReceiverTracker.scala:557) 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Missing parents: List() 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554), which has no missing parents 
17/04/19 03:06:44 INFO scheduler.JobScheduler: Added jobs for time 1492596404000 ms 
17/04/19 03:06:44 INFO scheduler.JobScheduler: Starting job streaming job 1492596404000 ms.0 from job set of time 1492596404000 ms 
17/04/19 03:06:44 INFO spark.SparkContext: Starting job: print at <console>:47 
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(65984) called with curMem=0, maxMem=560497950 
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.4 KB, free 534.5 MB) 
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(22354) called with curMem=65984, maxMem=560497950 
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 21.8 KB, free 534.4 MB) 
17/04/19 03:06:44 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41905 (size: 21.8 KB, free: 534.5 MB) 
17/04/19 03:06:44 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861 
17/04/19 03:06:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554) 
17/04/19 03:06:44 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:42) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Got job 1 (print at <console>:47) with 1 output partitions 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(print at <console>:47) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Missing parents: List() 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44), which has no missing parents 
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(2400) called with curMem=88338, maxMem=560497950 
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 534.4 MB) 
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(1429) called with curMem=90738, maxMem=560497950 
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1429.0 B, free 534.4 MB) 
17/04/19 03:06:45 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:41905 (size: 1429.0 B, free: 534.5 MB) 
17/04/19 03:06:45 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44) 
17/04/19 03:06:45 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 
17/04/19 03:06:45 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2644 bytes) 
17/04/19 03:06:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 
17/04/19 03:06:45 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1492596405400 
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started BlockGenerator 
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started block pushing thread 
17/04/19 03:06:45 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 10.0.2.15:50802 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Starting receiver 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped 
17/04/19 03:06:45 INFO dstream.SocketReceiver: Connecting to localhost:8585 
17/04/19 03:06:45 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:8585 
java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at java.net.Socket.<init>(Socket.java:425) 
    at java.net.Socket.<init>(Socket.java:208) 
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) 
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59) 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error connecting to localhost:8585: java.net.ConnectException: Connection refused 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 
17/04/19 03:06:45 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:8585 - java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at java.net.Socket.<init>(Socket.java:425) 
    at java.net.Socket.<init>(Socket.java:208) 
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) 
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59) 

我得到一个不同的错误,当我改变了我的代码如下所示:

var sparkConf = new SparkConf().setAppName("Streaming Example").setMaster("local[2]").set("spark.drive.allowMultipleContexts","true") 
val ssc = new StreamingContext(sparkConf,Seconds(2)) 

-

17/04/19 03:18:52 INFO spark.SparkContext: Running Spark version 1.5.0-cdh5.5.0 
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing view acls to: cloudera 
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing modify acls to: cloudera 
    17/04/19 03:18:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 
    17/04/19 03:18:53 INFO slf4j.Slf4jLogger: Slf4jLogger started 
    17/04/19 03:18:53 INFO Remoting: Starting remoting 
    17/04/19 03:18:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42235] 
    17/04/19 03:18:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:42235] 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'sparkDriver' on port 42235. 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering MapOutputTracker 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering BlockManagerMaster 
    17/04/19 03:18:53 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-b87051bc-5b7f-4c4f-975f-a0661b3ec29f 
    17/04/19 03:18:53 INFO storage.MemoryStore: MemoryStore started with capacity 534.5 MB 
    17/04/19 03:18:53 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3c5d465-ca27-4aa0-ad43-47088abb7703/httpd-01babb12-0237-4faa-9917-394a768cbcaa 
    17/04/19 03:18:53 INFO spark.HttpServer: Starting HTTP Server 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:52313 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'HTTP file server' on port 52313. 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering OutputCommitCoordinator 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]:4040: java.net.BindException: Address already in use 
    java.net.BindException: Address already in use 
     at sun.nio.ch.Net.bind0(Native Method) 
     at sun.nio.ch.Net.bind(Net.java:444) 
     at sun.nio.ch.Net.bind(Net.java:436) 
     at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) 
     at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) 
     at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.spark-project.jetty.server.Server.doStart(Server.java:293) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913) 
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
     at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904) 
     at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246) 
     at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) 
     at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854) 
     at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
     at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38) 
     at $line31.$read$$iwC$$iwC.<init>(<console>:40) 
     at $line31.$read$$iwC.<init>(<console>:42) 
     at $line31.$read.<init>(<console>:44) 
     at $line31.$read$.<init>(<console>:48) 
     at $line31.$read$.<clinit>(<console>) 
     at $line31.$eval$.<init>(<console>:7) 
     at $line31.$eval$.<clinit>(<console>) 
     at $line31.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]: java.net.BindException: Address already in use 
    java.net.BindException: Address already in use 
     at sun.nio.ch.Net.bind0(Native Method) 
     at sun.nio.ch.Net.bind(Net.java:444) 
     at sun.nio.ch.Net.bind(Net.java:436) 
     at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) 
     at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) 
     at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.spark-project.jetty.server.Server.doStart(Server.java:293) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913) 
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
     at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904) 
     at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246) 
     at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) 
     at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854) 
     at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
     at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38) 
     at $line31.$read$$iwC$$iwC.<init>(<console>:40) 
     at $line31.$read$$iwC.<init>(<console>:42) 
     at $line31.$read.<init>(<console>:44) 
     at $line31.$read$.<init>(<console>:48) 
     at $line31.$read$.<clinit>(<console>) 
     at $line31.$eval$.<init>(<console>:7) 
     at $line31.$eval$.<clinit>(<console>) 
     at $line31.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 
    17/04/19 03:18:53 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:4041 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'SparkUI' on port 4041. 
    17/04/19 03:18:53 INFO ui.SparkUI: Started SparkUI at http://localhost:4041 
    17/04/19 03:18:53 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 
    17/04/19 03:18:53 INFO storage.BlockManagerMaster: Registered BlockManager 
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) 
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017) 

人可以帮我纠正错误?

+0

你实际上在8585端口有一个开放的插座吗? –

方法1

您所看到的错误是预期,因为你已经使用socketTextStream()。所以火花产生,它使用java.net.socket

而且java.net.socketSocketInputDStream一个实例是一个客户端套接字,这意味着它期待服务器在您指定的地址和端口号上运行。

所以你需要有一些服务在本地机器的8585端口上运行。

要了解我的意思,请尝试以下操作(您可能无需在您的环境中设置master或appName)。

import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.SparkConf 

object MyStream 
{ 
    def main(args:Array[String]) 
    { 
    val sc = new StreamingContext(new SparkConf().setMaster("local").setAppName("socketstream"),Seconds(10)) 
    val mystreamRDD = sc.socketTextStream("bbc.co.uk",80) 
    mystreamRDD.print() 
    sc.start() 
    sc.awaitTermination() 
    } 
} 

这不会返回任何内容,因为应用程序不会向bbc网站发送HTTP,但它不会收到连接被拒绝的异常。

在Linux上运行的本地服务器,您可以就使用netcat用一个简单的命令,如

cat data.txt | ncat -l -p 8585 

如果上面的代码,然后给出了同样的错误遵循的方法2.

方法2

然而,很多事情可能会导致错误:

  • 您正试图连接到错误的IP /端口。
  • 您尚未启动服务器。
  • 您的服务器没有监听连接。
  • 您的服务器有太多未决连接等待接受。
  • 防火墙在到达服务器之前阻止了连接。

希望这可以帮助你。

+1

就像添加第二个错误BindException用于sparkUI。它看起来像属性名称是misspelt。它是spark.drive.allowMultipleContexts;它应该是spark.driver.allowMultipleContexts – sparker

+0

谢谢! :)现在工作正常。 – Swa