日前遇到在Java上Thread之間要傳遞資料的問題,於似乎就找到了PipedInputStream與PipedOutputStream。今天就讓我們來聊聊看似方便卻又有需要小心的地方的PipedInputStream與PipedOutputStream。
最原本的工作是要從網路上抓資料然後再把資料重新包裝之後再透過網路送到另外一個地方。
於是我設計了兩個Thread,一個負責抓資料(DownloadThread),另外一個負責送資料(UploadThread),以避免互相耽誤對方時間。這應該是一個很常見的設計?接下的問題就是要解決兩個Thread傳遞資料的方法。Java博大精深,想必一定有現成的Class可以負責這種基礎的工作。果不其然,找到了PipedInputStream跟PipedOutputStream,文件中開宗明義就說了:"Receives information from a communications pipe. When two threads want to pass data back and forth, one creates a piped output stream and the other one creates a piped input stream."。用法上也很簡單,只要把兩個Class各自的Instance關聯起來,然後一個Thread呼叫write,另外一個Thread呼叫read就搞定了Thread間傳遞資料Thread-safety的問題,從此之後就過著幸福快樂的日子。
一段時日之後,秉著我對工作的熱愛,心血來潮用Traceview來看一下這段實作的表現。結果令人意外!發現寫入端(DownloadThread)花不少時間在等待。但是這現象從實際條件看來並不合理,因為DownloadThread是從Internet下載資料,而UploadThread是把資料送到LAN內的Server,理應UploadThread送出的網路速度會比較快。於是我將Android Source打開(這就是Android開發的少數好處之一,有些不懂的、文件沒寫或寫不清楚的就自己看Source就對了),看了一下Source:
先從源頭開始,PipedOutputStream的write(byte[] buffer, int offset, int count):
public void write(byte[] buffer, int offset, int count) throws IOException { super.write(buffer, offset, count); }
而OutputStream的實作是把byte array一個byte一個byte地呼叫write(int oneByte):
public void write(byte[] buffer, int offset, int count) throws IOException { Arrays.checkOffsetAndCount(buffer.length, offset, count); for (int i = offset; i < offset + count; i++) { write(buffer[i]); } }所以再看回PipedOutputStream的write(int oneByte):
public void write(int oneByte) throws IOException { PipedInputStream stream = target; if (stream == null) { throw new IOException("Pipe not connected"); } stream.receive(oneByte); }終於輪到PipedInputStream登場:
protected synchronized void receive(int oneByte) throws IOException { if (buffer == null || isClosed) { throw new IOException("Pipe is closed"); } /* * Set the last thread to be writing on this PipedInputStream. If * lastWriter dies while someone is waiting to read an IOException of * "Pipe broken" will be thrown in read() */ lastWriter = Thread.currentThread(); try { while (buffer != null && out == in) { if (lastReader != null && !lastReader.isAlive()) { throw new IOException("Pipe broken"); } notifyAll(); wait(1000); } } catch (InterruptedException e) { throw new InterruptedIOException(); } if (buffer == null) { throw new IOException("Pipe is closed"); } if (in == -1) { in = 0; } buffer[in++] = (byte) oneByte; if (in == buffer.length) { in = 0; } // let blocked readers read the newly available data notifyAll(); }再看一下PipedInputStream的read(byte[] bytes, int offset, int byteCount):
public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException { Arrays.checkOffsetAndCount(bytes.length, offset, byteCount); if (byteCount == 0) { return 0; } if (!isConnected) { throw new IOException("Not connected"); } if (buffer == null) { throw new IOException("InputStream is closed"); } /* * Set the last thread to be reading on this PipedInputStream. If * lastReader dies while someone is waiting to write an IOException of * "Pipe broken" will be thrown in receive() */ lastReader = Thread.currentThread(); try { int attempts = 3; while (in == -1) { // Are we at end of stream? if (isClosed) { return -1; } if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { throw new IOException("Pipe broken"); } // Notify callers of receive() notifyAll(); wait(1000); } } catch (InterruptedException e) { throw new InterruptedIOException(); } int totalCopied = 0; // copy bytes from out thru the end of buffer if (out >= in) { int leftInBuffer = buffer.length - out; int length = leftInBuffer < byteCount ? leftInBuffer : byteCount; System.arraycopy(buffer, out, bytes, offset, length); out += length; if (out == buffer.length) { out = 0; } if (out == in) { // empty buffer in = -1; out = 0; } totalCopied += length; } // copy bytes from out thru in if (totalCopied < byteCount && in != -1) { int leftInBuffer = in - out; int leftToCopy = byteCount - totalCopied; int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer; System.arraycopy(buffer, out, bytes, offset + totalCopied, length); out += length; if (out == in) { // empty buffer in = -1; out = 0; } totalCopied += length; } // let blocked writers write to the newly available buffer space notifyAll(); return totalCopied; }
有沒有看到可以解釋我遇到的現象的原因?
對!重點就在:
- PipedOutputStream會把寫入的Byte Array拆成一個Byte一個Byte「傳給」PipedInputStream
- PipedInputStream沒有像文件說的一樣:"…Blocks until at least one byte has been read,…"
所以UploadThread會在PipedOutputStream把Byte Array拆成Byte「傳送」的迴圈內的每個Iteration都有機會搶到read的「權利」,所以可能會讀到很零散的片段,不只造成Context Switch得很頻繁,也有可能因為Thread排程造成「挨餓」。而且PipedInputStream的read的作法是半Polling的作法,所以如果DownloadThread遲遲沒資料進來的話,UploadThread也會花不少CPU時間在Polling。
基於以上種種問題,決定不用PipedOutputStream與PipedInputStream了。
改用BlockingQueue,自己把Byte Array放進BlockingQueue內,一個Thread呼叫add/offer,另外一個Thread呼叫poll。當初沒用BlockingQueue是怕Memory Footprints會失控,因為先入為主地不太相信GC。(但是不相信是因為不懂>///<")相信配上其他設計就可控制Memory Footprints。不過目前觀察下並沒有看到Memory Footprints失控的狀況,所以就先這樣吧。
不重新發明輪子固然重要,但是挑對輪子也是一門學問。有更好的輪子,請好心推薦我。
補充:
- SO上面也有在討論這個問題 http://stackoverflow.com/questions/5152087/android-pipedoutputstream-pipedinputstream-transfer-byte-by-byte-seems-wrong。有人去查Oracle的實作發現比較有效率,㖽㖽㖽。
沒有留言:
張貼留言