StreamDelegate停止接收事件后迅速读取数量

问题描述:

我有一个通过套接字连接接收消息的iOS聊天应用程序。StreamDelegate停止接收事件后迅速读取数量

当用户长时间打开应用程序并且有超过50条未读消息时,服务器会通过套接字发送一条消息,告诉未读消息的数量,此时应用程序将显示一个带有进度条的警报,然后服务器发送每一条消息。

因此,应用程序获取StreamDelegate方法中的每条消息stream(_ stream:Stream,handle eventCode:Stream.Event)并更新进度条,直到消息结束。

问题是,当我有大量未读消息(大约300+)时,StreamDelegate停止接收带有消息的事件,并且没有显示任何错误消息。

我呼吁在全球队列中的连接方法:

DispatchQueue.global().async { 
    self.connect(host, port: port) 
} 

这是我的插座连接代码:

fileprivate func connect(_ host: String, port: Int) { 

     postStatus(.connecting) 

     self.host = NSString(string: host) 
     self.port = UInt32(port) 

     self.log("connect to \(host):\(port)") 

     var readStream : Unmanaged<CFReadStream>? 
     var writeStream : Unmanaged<CFWriteStream>? 

     CFStreamCreatePairWithSocketToHost(nil, self.host, self.port, &readStream, &writeStream) 

     self.inOk = false 
     self.outOk = false 
     self.inputStream = readStream!.takeRetainedValue() 
     self.outputStream = writeStream!.takeRetainedValue() 

     self.inputStream.delegate = self 
     self.outputStream.delegate = self 


     let mainThread = Thread.isMainThread; 

     let loop = mainThread ? RunLoop.main : RunLoop.current 

     self.inputStream.schedule(in: loop, forMode: RunLoopMode.defaultRunLoopMode) 
     self.outputStream.schedule(in: loop, forMode: RunLoopMode.defaultRunLoopMode) 

     self.inputStream.open() 
     self.outputStream.open() 

     self.timer = Timer.scheduledTimer(timeInterval: 5, target: self, selector: #selector(connectionTimeout), userInfo: nil, repeats: false) 

     if(!mainThread) { 
      loop.run() 
     } 

    } 

在StreamDelegate方法流(_流:物流,拉手EVENTCODE: Stream.Event)我收到消息事件并在方法上读取(字符串)

case Stream.Event.hasBytesAvailable: 

     if let timer = timer { 
      timer.invalidate() 
      self.timer = nil 
     } 

     let json = ChatLibSwift.readMessage(self.inputStream) 

     do { 
      if StringUtils.isNotEmpty(json) { 
       try self.read(json) 
      } 
     } catch let ex as NSError { 
      LogUtils.log("ERROR: \(ex.description)") 
     } 

     break 
    case Stream.Event.hasSpaceAvailable: 
     break 

,上面写着每一个消息的方法:

static func readMessage(_ inputStream: InputStream) -> String { 

    do { 
     var lenBytes = [UInt8](repeating: 0, count: 4) 


     inputStream.read(&lenBytes, maxLength: 4) 

     // header 

     let i32: Int = Int(UInt32.init(lenBytes[3]) | UInt32.init(lenBytes[2]) << 8 | UInt32.init(lenBytes[1]) << 16 | UInt32.init(lenBytes[0]) << 24) 

     var msg = [UInt8](repeating: 0, count: (MemoryLayout<UInt8>.size * Int(i32))) 

     let bytesRead = inputStream.read(&msg, maxLength: Int(i32)) 

     if bytesRead == -1 { 
      print("<< ChatLib ERROR -1") 
      return "" 
     } 

     let s = NSString(bytes: msg, length: bytesRead, encoding: String.Encoding.utf8.rawValue) as String? 

     if let s = s { 
      if bytesRead == Int(i32) { 
       return s 
      } 
      else { 
       print("Error: readMessage \(s)") 
      } 
      return s 
     } 
     return "" 
    } catch { 

     return "" 
    } 
} 

任何人知道如何解决呢?

+0

在哪个线程出现问题?在主线程还是在其他线程中? – DisableR

+0

在其他线程中... –

+0

在函数connect()中,您正在运行其他线程的运行循环,但是从上面的代码片断中您正在从全局队列运行connect()。检查这个答案https://stackoverflow.com/a/38001438/321542,它描述了为什么在自定义NSThread而不是全局队列中进行流处理更好。也许它可以帮助你。 – DisableR

主要思想是强制的时间表流的读取成功读取操作之后:

let _preallocatedBufferSize = 64 * 1024 
var _preallocatedBuffer = [UInt8](repeating: 0, count: MemoryLayout<UInt8>.size * Int(_preallocatedBufferSize)) 

var message : .... 

func readMessage(_ inputStream: InputStream) { 

    if !inputStream.hasBytesAvailable || message.isCompleted { 
     return 
    } 

    var theBuffer : UnsafeMutablePointer<UInt8>? 
    var theLength : Int = 0 

    // try to get buffer from the stream otherwise use the preallocated buffer 
    if !inputStream.getBuffer(&theBuffer, length:&theLength) || nil == theBuffer 
    { 
     memset(&_preallocatedBuffer, 0, _preallocatedBufferSize) 

     let theReadCount = inputStream.read(&_preallocatedBuffer, maxLength:_preallocatedBufferSize) 
     if theReadCount > 0 { 
      theBuffer = _preallocatedBuffer; 
      theLength = theReadCount; 
     } else { 
      theBuffer = nil; 
      theLength = 0; 
     } 
    } 

    if nil != theBuffer && theLength > 0 { 
     _message.appendData(theBuffer, length:theLength) 

     self.perform(#selector(readMessage), with:inputStream, afterDelay:0.0, inModes:[RunLoopMode.defaultRunLoopMode]) 
    } 
}