PipedOutStream和PipedInputStream管道流
以JDK1.8源码为例。
管道流的主要作用是进行两个线程之间的通信,如图:
管道流分为输出管道流(PipedOutStream)和输入管道流(PipedInputStream)。如果要进行管道输出,必须把输出管道流连接到输入管道流上。输出流管道PipedOutStream通过public synchronized void connect(PipedInputStream snk) throws IOException方法与输入管道流管道建立连接,当然也可以反过来,通过输入管道流的public void connect(PipedOutputStream src) throws IOException方法与输出管道流建立连接,其本质也是调用了输出流管道的connect。线程1通过PipedOutStream类调用PipedInputStream的recieve方法将字节流数据写入PipedInputStream的循环缓冲区(或者环形缓冲区)buffer数组中,线程2通过PipedInputStream的read方法从缓冲区的读取数据。PipedInputStream类中定义了两个私有变量in和out用于表示缓冲区存储字节的索引位置和读取字节的索引位置,如果in<-1,表明缓冲区为空,如果in等于out表明缓冲区已经存满,那么线程1会阻塞,等待线程2从缓冲区中读取数据。
源码如下:
PipedOutputStream:
package java.io;
import java.io.*;
//输出流管道类,它是数据的发送端,将数据发送到输入流管道
public
class PipedOutputStream extends OutputStream {
//输入流管道
private PipedInputStream sink;
//构造具有输入流管道的输出流管道
public PipedOutputStream(PipedInputStream snk) throws IOException {
//连接输入流管道
connect(snk);
}
public PipedOutputStream() {
}
//连接输入流管道
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) {//输入流管道未实例化
throw new NullPointerException();
} else if (sink != null || snk.connected) {//已经与该输入流管道建立连接
throw new IOException("Already connected");
}
sink = snk;
//输入流管道中环形缓冲的数据索引
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
//向输入流管道中写入字节
public void write(int b) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
}
//输入流管道接收数据
sink.receive(b);
}
//向输入流管道中写入字节数组从指定位置开始的len长度的字节
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
//输入管道接收数据
sink.receive(b, off, len);
}
/**
* Flushes this output stream and forces any buffered output bytes
* to be written out.
* This will notify any readers that bytes are waiting in the pipe.
*
* @exception IOException if an I/O error occurs.
*/
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
/**
* Closes this piped output stream and releases any system resources
* associated with this stream. This stream may no longer be used for
* writing bytes.
*
* @exception IOException if an I/O error occurs.
*/
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
}
PipedInputStream :
package java.io;
public class PipedInputStream extends InputStream {
//关闭输入管道向其他地方输出:false表示未关闭,true表示可以向其他地方输出
boolean closedByWriter = false;
//关闭从输出管道读取数据:false表示未关闭
volatile boolean closedByReader = false;
//是否有连接的输出管道
boolean connected = false;
//从输出管道读取数据到输入管道的线程
Thread readSide;
//从输入管道写数据到其他地方的写入线程
Thread writeSide;
//默认输入管道大小
private static final int DEFAULT_PIPE_SIZE = 1024;
//管道大小
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
//存放数据的环形缓冲字节数组
protected byte buffer[];
//从已连接的输出管道接收到的数据字节将存储在循环缓冲的位置索引
//in小于0表示缓冲是空的,in等于out表示缓冲是满的
protected int in = -1;
//输入管道读取的下一个字节在循环缓冲中的位置索引
protected int out = 0;
//构造指定输出管道和大小1024的输入管道
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
//构造指定输出管道和大小的输入管道
public PipedInputStream(PipedOutputStream src, int pipeSize)
throws IOException {
initPipe(pipeSize);
//连接输出管道
connect(src);
}
//构造默认大小1024的输入管道,使用前需要连接输出管道
public PipedInputStream() {
initPipe(DEFAULT_PIPE_SIZE);
}
//初始化一个具有一定大小的输入管道
public PipedInputStream(int pipeSize) {
initPipe(pipeSize);
}
//初始化输入管道
private void initPipe(int pipeSize) {
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize];
}
//连接输出管道
public void connect(PipedOutputStream src) throws IOException {
src.connect(this);
}
//由输出流管道调用此方法,将数据写入输入流管道
protected synchronized void receive(int b) throws IOException {
//检查接收端状态
checkStateForReceive();
//写入端:实际是输出流管道实例线程
writeSide = Thread.currentThread();
//如果环形缓冲已经写满,等待空白空间
if (in == out)
awaitSpace();
if (in < 0) {
in = 0;
out = 0;
}
//将输出流管道字节数据放入缓冲中
buffer[in++] = (byte)(b & 0xFF);
//写到缓冲最后,从头开始写,所谓环形缓冲或循环缓冲
if (in >= buffer.length) {
in = 0;
}
}
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;
while (bytesToTransfer > 0) {
if (in == out)
awaitSpace();
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = 0;
}
}
}
//检查输入管道状态
private void checkStateForReceive() throws IOException {
if (!connected) {
//未与输出管道连接,抛异常
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
//向输入流管道写入数据已经关闭,或者从输出流管道读取数据已经关闭,抛出异常
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
//读取端线程已经死亡
throw new IOException("Read end dead");
}
}
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
/* full: kick any waiting readers */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
//最后一个字节已经接收到后,标记向输入流管道的写入已经关闭。唤醒所有等待的线程
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();
}
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;
while ((in >= 0) && (len > 1)) {
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - 1)) {
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}
//输入流中可读取的字节数
public synchronized int available() throws IOException {
if(in < 0)
return 0;
else if(in == out)
return buffer.length;
else if (in > out)
return in - out;
else
return in + buffer.length - out;
}
public void close() throws IOException {
closedByReader = true;
synchronized (this) {
in = -1;
}
}
}
下面我们看一个线程通信的例子:
Sender:
package com.leboop;
import java.io.IOException;
import java.io.PipedOutputStream;
public class Sender implements Runnable {
private PipedOutputStream pos = null;
private byte[] bytes = null;
public Sender(String data) {
this.pos = new PipedOutputStream();
this.bytes = data.getBytes();
}
public void connect(Reciever reciever){
try {
this.pos.connect(reciever.getPis());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
try {
pos.write(bytes);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
pos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Reciever:
package com.leboop;
import java.io.IOException;
import java.io.PipedInputStream;
public class Reciever implements Runnable {
private PipedInputStream pis = new PipedInputStream();
private byte[] b = new byte[1024];
public Reciever() {
}
public PipedInputStream getPis() {
return pis;
}
public void setPis(PipedInputStream pis) {
this.pis = pis;
}
@Override
public void run() {
try {
int len = this.pis.read(b);
System.out.println(new String(b,0,len));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
PipeTest:
package com.leboop;
public class PipeTest {
public static void main(String[] args) {
Sender sender = new Sender("Hello World!");
Reciever reciever = new Reciever();
sender.connect(reciever);
new Thread(sender).start();
new Thread(reciever).start();
}
}
结果输出Hello World!。