001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027package org.apache.http.nio.util;
028
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.util.concurrent.locks.Condition;
032import java.util.concurrent.locks.ReentrantLock;
033
034import org.apache.http.annotation.ThreadingBehavior;
035import org.apache.http.annotation.Contract;
036import org.apache.http.nio.ContentDecoder;
037import org.apache.http.nio.IOControl;
038
039/**
040 * Implementation of the {@link ContentInputBuffer} interface that can be
041 * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
042 * a worker thread.
043 * <p>
044 * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
045 *   by calling {@link #consumeContent(ContentDecoder)}.
046 * <p>
047 * The worker thread is expected to read the data from the buffer by calling
048 *   {@link #read()} or {@link #read(byte[], int, int)} methods.
049 * <p>
050 * In case of an abnormal situation or when no longer needed the buffer must be shut down
051 * using {@link #shutdown()} method.
052 *
053 * @since 4.0
054 */
055@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
056public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
057
058    private final ReentrantLock lock;
059    private final Condition condition;
060
061    private volatile IOControl ioctrl;
062    private volatile boolean shutdown = false;
063    private volatile boolean endOfStream = false;
064
065    /**
066     * @deprecated (4.3) use {@link SharedInputBuffer#SharedInputBuffer(int, ByteBufferAllocator)}
067     */
068    @Deprecated
069    public SharedInputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
070        super(buffersize, allocator);
071        this.ioctrl = ioctrl;
072        this.lock = new ReentrantLock();
073        this.condition = this.lock.newCondition();
074    }
075
076    /**
077     * @since 4.3
078     */
079    public SharedInputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
080        super(buffersize, allocator);
081        this.lock = new ReentrantLock();
082        this.condition = this.lock.newCondition();
083    }
084
085    /**
086     * @since 4.3
087     */
088    public SharedInputBuffer(final int buffersize) {
089        this(buffersize, HeapByteBufferAllocator.INSTANCE);
090    }
091
092    @Override
093    public void reset() {
094        if (this.shutdown) {
095            return;
096        }
097        this.lock.lock();
098        try {
099            clear();
100            this.endOfStream = false;
101        } finally {
102            this.lock.unlock();
103        }
104    }
105
106    /**
107     * @deprecated (4.3) use {@link #consumeContent(ContentDecoder, IOControl)}
108     */
109    @Override
110    @Deprecated
111    public int consumeContent(final ContentDecoder decoder) throws IOException {
112        return consumeContent(decoder, null);
113    }
114
115    /**
116     * @since 4.3
117     */
118    public int consumeContent(final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
119        if (this.shutdown) {
120            return -1;
121        }
122        this.lock.lock();
123        try {
124            if (ioctrl != null) {
125                this.ioctrl = ioctrl;
126            }
127            setInputMode();
128            int totalRead = 0;
129            int bytesRead;
130            while ((bytesRead = decoder.read(this.buffer)) > 0) {
131                totalRead += bytesRead;
132            }
133            if (bytesRead == -1 || decoder.isCompleted()) {
134                this.endOfStream = true;
135            }
136            if (!this.buffer.hasRemaining()) {
137                if (this.ioctrl != null) {
138                    this.ioctrl.suspendInput();
139                }
140            }
141            this.condition.signalAll();
142
143            if (totalRead > 0) {
144                return totalRead;
145            } else {
146                if (this.endOfStream) {
147                    return -1;
148                } else {
149                    return 0;
150                }
151            }
152        } finally {
153            this.lock.unlock();
154        }
155    }
156
157    @Override
158    public boolean hasData() {
159        this.lock.lock();
160        try {
161            return super.hasData();
162        } finally {
163            this.lock.unlock();
164        }
165    }
166
167    @Override
168    public int available() {
169        this.lock.lock();
170        try {
171            return super.available();
172        } finally {
173            this.lock.unlock();
174        }
175    }
176
177    @Override
178    public int capacity() {
179        this.lock.lock();
180        try {
181            return super.capacity();
182        } finally {
183            this.lock.unlock();
184        }
185    }
186
187    @Override
188    public int length() {
189        this.lock.lock();
190        try {
191            return super.length();
192        } finally {
193            this.lock.unlock();
194        }
195    }
196
197    protected void waitForData() throws IOException {
198        this.lock.lock();
199        try {
200            try {
201                while (!super.hasData() && !this.endOfStream) {
202                    if (this.shutdown) {
203                        throw new InterruptedIOException("Input operation aborted");
204                    }
205                    if (this.ioctrl != null) {
206                        this.ioctrl.requestInput();
207                    }
208                    this.condition.await();
209                }
210            } catch (final InterruptedException ex) {
211                throw new IOException("Interrupted while waiting for more data");
212            }
213        } finally {
214            this.lock.unlock();
215        }
216    }
217
218    public void close() {
219        if (this.shutdown) {
220            return;
221        }
222        this.endOfStream = true;
223        this.lock.lock();
224        try {
225            this.condition.signalAll();
226        } finally {
227            this.lock.unlock();
228        }
229    }
230
231    public void shutdown() {
232        if (this.shutdown) {
233            return;
234        }
235        this.shutdown = true;
236        this.lock.lock();
237        try {
238            this.condition.signalAll();
239        } finally {
240            this.lock.unlock();
241        }
242    }
243
244    protected boolean isShutdown() {
245        return this.shutdown;
246    }
247
248    protected boolean isEndOfStream() {
249        return this.shutdown || (!hasData() && this.endOfStream);
250    }
251
252    @Override
253    public int read() throws IOException {
254        if (this.shutdown) {
255            return -1;
256        }
257        this.lock.lock();
258        try {
259            if (!hasData()) {
260                waitForData();
261            }
262            if (isEndOfStream()) {
263                return -1;
264            }
265            return this.buffer.get() & 0xff;
266        } finally {
267            this.lock.unlock();
268        }
269    }
270
271    @Override
272    public int read(final byte[] b, final int off, final int len) throws IOException {
273        if (this.shutdown) {
274            return -1;
275        }
276        if (b == null) {
277            return 0;
278        }
279        this.lock.lock();
280        try {
281            if (!hasData()) {
282                waitForData();
283            }
284            if (isEndOfStream()) {
285                return -1;
286            }
287            setOutputMode();
288            int chunk = len;
289            if (chunk > this.buffer.remaining()) {
290                chunk = this.buffer.remaining();
291            }
292            this.buffer.get(b, off, chunk);
293            return chunk;
294        } finally {
295            this.lock.unlock();
296        }
297    }
298
299    public int read(final byte[] b) throws IOException {
300        if (this.shutdown) {
301            return -1;
302        }
303        if (b == null) {
304            return 0;
305        }
306        return read(b, 0, b.length);
307    }
308
309}