001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027package org.apache.http.nio.util;
028
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.util.concurrent.locks.Condition;
032import java.util.concurrent.locks.ReentrantLock;
033
034import org.apache.http.annotation.ThreadingBehavior;
035import org.apache.http.annotation.Contract;
036import org.apache.http.nio.ContentEncoder;
037import org.apache.http.nio.IOControl;
038import org.apache.http.util.Args;
039import org.apache.http.util.Asserts;
040
041/**
042 * Implementation of the {@link ContentOutputBuffer} interface that can be
043 * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
044 * a worker thread.
045 * <p>
046 * The I/O dispatch thread is expected to transfer data from the buffer to
047 *   {@link ContentEncoder} by calling {@link #produceContent(ContentEncoder)}.
048 * <p>
049 * The worker thread is expected to write data to the buffer by calling
050 * {@link #write(int)}, {@link #write(byte[], int, int)} or {@link #writeCompleted()}
051 * <p>
052 * In case of an abnormal situation or when no longer needed the buffer must be
053 * shut down using {@link #shutdown()} method.
054 *
055 * @since 4.0
056 */
057@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
058public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
059
060    private final ReentrantLock lock;
061    private final Condition condition;
062
063    private volatile IOControl ioctrl;
064    private volatile boolean shutdown = false;
065    private volatile boolean endOfStream = false;
066
067    /**
068     * @deprecated (4.3) use {@link SharedOutputBuffer#SharedOutputBuffer(int, ByteBufferAllocator)}
069     */
070    @Deprecated
071    public SharedOutputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
072        super(buffersize, allocator);
073        Args.notNull(ioctrl, "I/O content control");
074        this.ioctrl = ioctrl;
075        this.lock = new ReentrantLock();
076        this.condition = this.lock.newCondition();
077    }
078
079    /**
080     * @since 4.3
081     */
082    public SharedOutputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
083        super(buffersize, allocator);
084        this.lock = new ReentrantLock();
085        this.condition = this.lock.newCondition();
086    }
087
088    /**
089     * @since 4.3
090     */
091    public SharedOutputBuffer(final int buffersize) {
092        this(buffersize, HeapByteBufferAllocator.INSTANCE);
093    }
094
095    @Override
096    public void reset() {
097        if (this.shutdown) {
098            return;
099        }
100        this.lock.lock();
101        try {
102            clear();
103            this.endOfStream = false;
104        } finally {
105            this.lock.unlock();
106        }
107    }
108
109    @Override
110    public boolean hasData() {
111        this.lock.lock();
112        try {
113            return super.hasData();
114        } finally {
115            this.lock.unlock();
116        }
117    }
118
119    @Override
120    public int available() {
121        this.lock.lock();
122        try {
123            return super.available();
124        } finally {
125            this.lock.unlock();
126        }
127    }
128
129    @Override
130    public int capacity() {
131        this.lock.lock();
132        try {
133            return super.capacity();
134        } finally {
135            this.lock.unlock();
136        }
137    }
138
139    @Override
140    public int length() {
141        this.lock.lock();
142        try {
143            return super.length();
144        } finally {
145            this.lock.unlock();
146        }
147    }
148
149    /**
150     * @deprecated (4.3) use {@link #produceContent(ContentEncoder, IOControl)}
151     */
152    @Override
153    @Deprecated
154    public int produceContent(final ContentEncoder encoder) throws IOException {
155        return produceContent(encoder, null);
156    }
157
158    /**
159     * @since 4.3
160     */
161    public int produceContent(final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
162        if (this.shutdown) {
163            return -1;
164        }
165        this.lock.lock();
166        try {
167            if (ioctrl != null) {
168                this.ioctrl = ioctrl;
169            }
170            setOutputMode();
171            int bytesWritten = 0;
172            if (super.hasData()) {
173                bytesWritten = encoder.write(this.buffer);
174                if (encoder.isCompleted()) {
175                    this.endOfStream = true;
176                }
177            }
178            if (!super.hasData()) {
179                // No more buffered content
180                // If at the end of the stream, terminate
181                if (this.endOfStream && !encoder.isCompleted()) {
182                    encoder.complete();
183                }
184                if (!this.endOfStream) {
185                    // suspend output events
186                    if (this.ioctrl != null) {
187                        this.ioctrl.suspendOutput();
188                    }
189                }
190            }
191            this.condition.signalAll();
192            return bytesWritten;
193        } finally {
194            this.lock.unlock();
195        }
196    }
197
198    public void close() {
199        shutdown();
200    }
201
202    public void shutdown() {
203        if (this.shutdown) {
204            return;
205        }
206        this.shutdown = true;
207        this.lock.lock();
208        try {
209            this.condition.signalAll();
210        } finally {
211            this.lock.unlock();
212        }
213    }
214
215    @Override
216    public void write(final byte[] b, final int off, final int len) throws IOException {
217        if (b == null) {
218            return;
219        }
220        int pos = off;
221        this.lock.lock();
222        try {
223            Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing");
224            setInputMode();
225            int remaining = len;
226            while (remaining > 0) {
227                if (!this.buffer.hasRemaining()) {
228                    flushContent();
229                    setInputMode();
230                }
231                final int chunk = Math.min(remaining, this.buffer.remaining());
232                this.buffer.put(b, pos, chunk);
233                remaining -= chunk;
234                pos += chunk;
235            }
236        } finally {
237            this.lock.unlock();
238        }
239    }
240
241    public void write(final byte[] b) throws IOException {
242        if (b == null) {
243            return;
244        }
245        write(b, 0, b.length);
246    }
247
248    @Override
249    public void write(final int b) throws IOException {
250        this.lock.lock();
251        try {
252            Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing");
253            setInputMode();
254            if (!this.buffer.hasRemaining()) {
255                flushContent();
256                setInputMode();
257            }
258            this.buffer.put((byte)b);
259        } finally {
260            this.lock.unlock();
261        }
262    }
263
264    @Override
265    public void flush() throws IOException {
266    }
267
268    private void flushContent() throws IOException {
269        this.lock.lock();
270        try {
271            try {
272                while (super.hasData()) {
273                    if (this.shutdown) {
274                        throw new InterruptedIOException("Output operation aborted");
275                    }
276                    if (this.ioctrl != null) {
277                        this.ioctrl.requestOutput();
278                    }
279                    this.condition.await();
280                }
281            } catch (final InterruptedException ex) {
282                throw new IOException("Interrupted while flushing the content buffer");
283            }
284        } finally {
285            this.lock.unlock();
286        }
287    }
288
289    @Override
290    public void writeCompleted() throws IOException {
291        this.lock.lock();
292        try {
293            if (this.endOfStream) {
294                return;
295            }
296            this.endOfStream = true;
297            if (this.ioctrl != null) {
298                this.ioctrl.requestOutput();
299            }
300        } finally {
301            this.lock.unlock();
302        }
303    }
304
305}