为什么xml /立即发送流,而直接写入流不会

问题描述:

我试图以小块写入文本到流中,当我将XML流写入器指向输出流时它正常工作(它开始立即发送数据),然后我尝试写入一些文本,然后刷新它直到流关闭才发送任何内容。为什么xml /立即发送流,而直接写入流不会

(defn data 
    "Download a 5MB file and parse it" 
    [] 
    (-> "http://www.cs.washington.edu/research/xmldatasets/data/tpc-h/orders.xml" 
     URL. 
     .openStream 
     xml/parse)) 

(defn send-stuff [request] 
    (condp = (:uri request) 
    "/text" (response/response 
      (ring-io/piped-input-stream 
       #(let [w (io/make-writer % {:encoding "UTF-8"})] 
       (.write w "start\n") 
       (.flush w) 
       (Thread/sleep 1000) 
       (.write w "done\n") 
       (.flush w)))) 
    "/xml" (response/response 
      (ring-io/piped-input-stream 
       #(->> (io/make-writer % {:encoding "UTF-8"}) 
        (xml/emit (data)) 
        .flush))))) 

(comment 
    (def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false})) 
    (.stop server)) 

,卷曲测试这个像这样:

curl localhost:8888/text 

坐在那里默默地整整一秒钟,然后返回

start 
done 

我期望看到“开始”,然后一秒钟后“完成“,而不是延迟一秒钟,然后是两者。

,并使用

curl localhost:8888/xml 

开始流眼欺诈XML立刻(对不起个人偏见在里面悄悄;-)

- 编辑 我已确认该问题是否与码头输出缓冲区,因为如果我将缓冲区设置得很小,它会消失:

(def server (jetty/run-jetty #'send-stuff {:output-buffer-size 1 :port 8888 :join? false})) 

当然,将输出缓冲区设置为1在很多情况下是坏主意

+0

它开始管道后?我会测试什么? –

您所调用的.flush不在用于HTTP响应的流上,而是在output stream of the piped streams pair中。

当你看看source code of PipedOutputStream.flush()你会注意到它只会通知所有等待从连接的PipedInputStream读取的线程,并不意味着刷新到底层的HTTP响应流。

行为上的差异是由响应数据大小引起的。如果你改变你的例子,使用一个小的XML数据的行为将是相同的:

(defn data 
    [] 
    (-> "<?xml version=\"1.0\" encoding=\"UTF-8\"?><a>1</a>" 
     (.getBytes) 
     (ByteArrayInputStream.) 
     (xml/parse))) 


(defn send-stuff [request] 
    (condp = (:uri request) 
    "/text" (response/response 
       (ring-io/piped-input-stream 
       #(let [w (io/make-writer % {:encoding "UTF-8"})] 
        (.write w "start\n") 
        (.flush w) 
        (Thread/sleep 1000) 
        (.write w "done\n") 
        (.flush w)))) 
    "/xml" (response/response 
       (ring-io/piped-input-stream 
       #(let [w (io/make-writer % {:encoding "UTF-8"})] 
        (xml/emit (data) w) 
        (.flush w) 
        (Thread/sleep 1000) 
        (xml/emit (data) w) 
        (.flush w)))))) 

调用curl localhost:8888/xml将只显示后,整个1秒的:

<?xml version="1.0" encoding="UTF-8"?><a>1</a><?xml version="1.0" encoding="UTF-8"?><a>1</a>

可以使用不同流化机构,其中你将不得不控制与阻塞队列冲洗例如HTTP响应流:之前或

(ns so43769408 
    (:require [ring.adapter.jetty :as jetty] 
      [clojure.java.io :as io] 
      [ring.util.response :as response] 
      [ring.core.protocols :as protocols]) 
    (:import (java.io OutputStream) 
      (java.util.concurrent LinkedBlockingQueue))) 

(extend-protocol protocols/StreamableResponseBody 
    LinkedBlockingQueue 
    (write-body-to-stream [output-queue _ ^OutputStream output-stream] 
    (with-open [out (io/writer output-stream)] 
     (loop [chunk (.take output-queue)] 
     (when-not (= chunk ::EOF) 
      (.write out (str chunk)) 
      (.flush out) 
      (recur (.take output-queue))))))) 


(defn send-stuff [request] 
    (response/response 
    (let [output-queue (LinkedBlockingQueue.)] 
     (future 
     (.put output-queue "start\n") 
     (Thread/sleep 1000) 
     (.put output-queue "end\n") 
     (.put output-queue ::EOF)) 
     output-queue))) 

(comment 
    (def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false})) 
    (.stop server)) 
+0

嗯如何进入环和刷新输出缓冲区的任何想法? –

+0

我认为你可以使用你自己的流机制(而不是管道输入/输出流),并通过扩展'ring.core.protocols/StreamableResponseBody'协议来插入它,因为你可以访问'write-body中的HTTP响应输出流按流功能。 –

+0

我最近提到过你有多棒? –