2012年12月7日 星期五





於是我設計了兩個Thread,一個負責抓資料(DownloadThread),另外一個負責送資料(UploadThread),以避免互相耽誤對方時間。這應該是一個很常見的設計?接下的問題就是要解決兩個Thread傳遞資料的方法。Java博大精深,想必一定有現成的Class可以負責這種基礎的工作。果不其然,找到了PipedInputStream跟PipedOutputStream,文件中開宗明義就說了:"Receives information from a communications pipe. When two threads want to pass data back and forth, one creates a piped output stream and the other one creates a piped input stream."。用法上也很簡單,只要把兩個Class各自的Instance關聯起來,然後一個Thread呼叫write,另外一個Thread呼叫read就搞定了Thread間傳遞資料Thread-safety的問題,從此之後就過著幸福快樂的日子。

一段時日之後,秉著我對工作的熱愛,心血來潮用Traceview來看一下這段實作的表現。結果令人意外!發現寫入端(DownloadThread)花不少時間在等待。但是這現象從實際條件看來並不合理,因為DownloadThread是從Internet下載資料,而UploadThread是把資料送到LAN內的Server,理應UploadThread送出的網路速度會比較快。於是我將Android Source打開(這就是Android開發的少數好處之一,有些不懂的、文件沒寫或寫不清楚的就自己看Source就對了),看了一下Source:

先從源頭開始,PipedOutputStream的write(byte[] buffer, int offset, int count)
public void write(byte[] buffer, int offset, int count) throws IOException {
    super.write(buffer, offset, count);

OutputStream的實作是把byte array一個byte一個byte地呼叫write(int oneByte)
public void write(byte[] buffer, int offset, int count) throws IOException {
    Arrays.checkOffsetAndCount(buffer.length, offset, count);
    for (int i = offset; i < offset + count; i++) {
所以再看回PipedOutputStream的write(int oneByte)
public void write(int oneByte) throws IOException {
    PipedInputStream stream = target;
    if (stream == null) {
        throw new IOException("Pipe not connected");
    protected synchronized void receive(int oneByte) throws IOException {
        if (buffer == null || isClosed) {
            throw new IOException("Pipe is closed");

         * Set the last thread to be writing on this PipedInputStream. If
         * lastWriter dies while someone is waiting to read an IOException of
         * "Pipe broken" will be thrown in read()
        lastWriter = Thread.currentThread();
        try {
            while (buffer != null && out == in) {
                if (lastReader != null && !lastReader.isAlive()) {
                    throw new IOException("Pipe broken");
        } catch (InterruptedException e) {
            throw new InterruptedIOException();
        if (buffer == null) {
            throw new IOException("Pipe is closed");
        if (in == -1) {
            in = 0;
        buffer[in++] = (byte) oneByte;
        if (in == buffer.length) {
            in = 0;

        // let blocked readers read the newly available data
再看一下PipedInputStream的read(byte[] bytes, int offset, int byteCount)
    public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException {
        Arrays.checkOffsetAndCount(bytes.length, offset, byteCount);
        if (byteCount == 0) {
            return 0;

        if (!isConnected) {
            throw new IOException("Not connected");

        if (buffer == null) {
            throw new IOException("InputStream is closed");

         * Set the last thread to be reading on this PipedInputStream. If
         * lastReader dies while someone is waiting to write an IOException of
         * "Pipe broken" will be thrown in receive()
        lastReader = Thread.currentThread();
        try {
            int attempts = 3;
            while (in == -1) {
                // Are we at end of stream?
                if (isClosed) {
                    return -1;
                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
                    throw new IOException("Pipe broken");
                // Notify callers of receive()
        } catch (InterruptedException e) {
            throw new InterruptedIOException();

        int totalCopied = 0;

        // copy bytes from out thru the end of buffer
        if (out >= in) {
            int leftInBuffer = buffer.length - out;
            int length = leftInBuffer < byteCount ? leftInBuffer : byteCount;
            System.arraycopy(buffer, out, bytes, offset, length);
            out += length;
            if (out == buffer.length) {
                out = 0;
            if (out == in) {
                // empty buffer
                in = -1;
                out = 0;
            totalCopied += length;

        // copy bytes from out thru in
        if (totalCopied < byteCount && in != -1) {
            int leftInBuffer = in - out;
            int leftToCopy = byteCount - totalCopied;
            int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer;
            System.arraycopy(buffer, out, bytes, offset + totalCopied, length);
            out += length;
            if (out == in) {
                // empty buffer
                in = -1;
                out = 0;
            totalCopied += length;

        // let blocked writers write to the newly available buffer space

        return totalCopied;



所以UploadThread會在PipedOutputStream把Byte Array拆成Byte「傳送」的迴圈內的每個Iteration都有機會搶到read的「權利」,所以可能會讀到很零散的片段,不只造成Context Switch得很頻繁,也有可能因為Thread排程造成「挨餓」。而且PipedInputStream的read的作法是半Polling的作法,所以如果DownloadThread遲遲沒資料進來的話,UploadThread也會花不少CPU時間在Polling。

改用BlockingQueue,自己把Byte Array放進BlockingQueue內,一個Thread呼叫add/offer,另外一個Thread呼叫poll。當初沒用BlockingQueue是怕Memory Footprints會失控,因為先入為主地不太相信GC。(但是不相信是因為不懂>///<")相信配上其他設計就可控制Memory Footprints。不過目前觀察下並沒有看到Memory Footprints失控的狀況,所以就先這樣吧。



