akka-http发送连续分块http响应(流)
问题描述:
我有这个粗略的测试示例与akka-http
客户端和服务器。akka-http发送连续分块http响应(流)
Server.scala:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import scala.concurrent.Future
class Server extends Runnable {
def run() = {
implicit val system = ActorSystem("server")
implicit val materializer = ActorMaterializer()
val serverSource = Http().bind(interface = "localhost", port = 8200)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
HttpResponse(entity = HttpEntity(MediaTypes.`text/plain`, "test"))
}
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection =>
connection handleWithSyncHandler requestHandler
}).run()
}
}
Client.scala:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{Uri, HttpRequest}
import akka.stream.ActorMaterializer
object Client extends App {
implicit val system = ActorSystem("client")
import system.dispatcher
new Thread(new Server).start()
implicit val materializer = ActorMaterializer()
val source = Uri("http://localhost:8200/stream")
val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
println(chunk.utf8String)
}
}
}
目前Server
只是一个 “测试” 的答复。
如何更改Server
中的HttpResponse
以每隔1秒的无限循环发送“测试”为分块(流)?
答
找到了答案。
Server.scala:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.duration._
class Server extends Runnable {
def run() = {
implicit val system = ActorSystem("server")
implicit val materializer = ActorMaterializer()
val serverSource = Http().bind(interface = "localhost", port = 8200)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, Source(0 seconds, 1 seconds, "test")))
}
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection =>
connection handleWithSyncHandler requestHandler
}).run()
}
}