为什么xml /立即发送流,而直接写入流不会
我试图以小块写入文本到流中,当我将XML流写入器指向输出流时它正常工作(它开始立即发送数据),然后我尝试写入一些文本,然后刷新它直到流关闭才发送任何内容。为什么xml /立即发送流,而直接写入流不会
(defn data
"Download a 5MB file and parse it"
[]
(-> "http://www.cs.washington.edu/research/xmldatasets/data/tpc-h/orders.xml"
URL.
.openStream
xml/parse))
(defn send-stuff [request]
(condp = (:uri request)
"/text" (response/response
(ring-io/piped-input-stream
#(let [w (io/make-writer % {:encoding "UTF-8"})]
(.write w "start\n")
(.flush w)
(Thread/sleep 1000)
(.write w "done\n")
(.flush w))))
"/xml" (response/response
(ring-io/piped-input-stream
#(->> (io/make-writer % {:encoding "UTF-8"})
(xml/emit (data))
.flush)))))
(comment
(def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
(.stop server))
,卷曲测试这个像这样:
curl localhost:8888/text
坐在那里默默地整整一秒钟,然后返回
start
done
我期望看到“开始”,然后一秒钟后“完成“,而不是延迟一秒钟,然后是两者。
,并使用
curl localhost:8888/xml
开始流眼欺诈XML立刻(对不起个人偏见在里面悄悄;-)
- 编辑 我已确认该问题是否与码头输出缓冲区,因为如果我将缓冲区设置得很小,它会消失:
(def server (jetty/run-jetty #'send-stuff {:output-buffer-size 1 :port 8888 :join? false}))
当然,将输出缓冲区设置为1在很多情况下是坏主意。
您所调用的.flush
不在用于HTTP响应的流上,而是在output stream of the piped streams pair中。
当你看看source code of PipedOutputStream.flush()
你会注意到它只会通知所有等待从连接的PipedInputStream读取的线程,并不意味着刷新到底层的HTTP响应流。
行为上的差异是由响应数据大小引起的。如果你改变你的例子,使用一个小的XML数据的行为将是相同的:
(defn data
[]
(-> "<?xml version=\"1.0\" encoding=\"UTF-8\"?><a>1</a>"
(.getBytes)
(ByteArrayInputStream.)
(xml/parse)))
(defn send-stuff [request]
(condp = (:uri request)
"/text" (response/response
(ring-io/piped-input-stream
#(let [w (io/make-writer % {:encoding "UTF-8"})]
(.write w "start\n")
(.flush w)
(Thread/sleep 1000)
(.write w "done\n")
(.flush w))))
"/xml" (response/response
(ring-io/piped-input-stream
#(let [w (io/make-writer % {:encoding "UTF-8"})]
(xml/emit (data) w)
(.flush w)
(Thread/sleep 1000)
(xml/emit (data) w)
(.flush w))))))
调用curl localhost:8888/xml
将只显示后,整个1秒的:
<?xml version="1.0" encoding="UTF-8"?><a>1</a><?xml version="1.0" encoding="UTF-8"?><a>1</a>
可以使用不同流化机构,其中你将不得不控制与阻塞队列冲洗例如HTTP响应流:之前或
(ns so43769408
(:require [ring.adapter.jetty :as jetty]
[clojure.java.io :as io]
[ring.util.response :as response]
[ring.core.protocols :as protocols])
(:import (java.io OutputStream)
(java.util.concurrent LinkedBlockingQueue)))
(extend-protocol protocols/StreamableResponseBody
LinkedBlockingQueue
(write-body-to-stream [output-queue _ ^OutputStream output-stream]
(with-open [out (io/writer output-stream)]
(loop [chunk (.take output-queue)]
(when-not (= chunk ::EOF)
(.write out (str chunk))
(.flush out)
(recur (.take output-queue)))))))
(defn send-stuff [request]
(response/response
(let [output-queue (LinkedBlockingQueue.)]
(future
(.put output-queue "start\n")
(Thread/sleep 1000)
(.put output-queue "end\n")
(.put output-queue ::EOF))
output-queue)))
(comment
(def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
(.stop server))
嗯如何进入环和刷新输出缓冲区的任何想法? –
我认为你可以使用你自己的流机制(而不是管道输入/输出流),并通过扩展'ring.core.protocols/StreamableResponseBody'协议来插入它,因为你可以访问'write-body中的HTTP响应输出流按流功能。 –
我最近提到过你有多棒? –
它开始管道后?我会测试什么? –