001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027package org.apache.http.nio.util; 028 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.util.concurrent.locks.Condition; 032import java.util.concurrent.locks.ReentrantLock; 033 034import org.apache.http.annotation.ThreadingBehavior; 035import org.apache.http.annotation.Contract; 036import org.apache.http.nio.ContentEncoder; 037import org.apache.http.nio.IOControl; 038import org.apache.http.util.Args; 039import org.apache.http.util.Asserts; 040 041/** 042 * Implementation of the {@link ContentOutputBuffer} interface that can be 043 * shared by multiple threads, usually the I/O dispatch of an I/O reactor and 044 * a worker thread. 045 * <p> 046 * The I/O dispatch thread is expected to transfer data from the buffer to 047 * {@link ContentEncoder} by calling {@link #produceContent(ContentEncoder)}. 048 * <p> 049 * The worker thread is expected to write data to the buffer by calling 050 * {@link #write(int)}, {@link #write(byte[], int, int)} or {@link #writeCompleted()} 051 * <p> 052 * In case of an abnormal situation or when no longer needed the buffer must be 053 * shut down using {@link #shutdown()} method. 054 * 055 * @since 4.0 056 */ 057@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) 058public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer { 059 060 private final ReentrantLock lock; 061 private final Condition condition; 062 063 private volatile IOControl ioctrl; 064 private volatile boolean shutdown = false; 065 private volatile boolean endOfStream = false; 066 067 /** 068 * @deprecated (4.3) use {@link SharedOutputBuffer#SharedOutputBuffer(int, ByteBufferAllocator)} 069 */ 070 @Deprecated 071 public SharedOutputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) { 072 super(buffersize, allocator); 073 Args.notNull(ioctrl, "I/O content control"); 074 this.ioctrl = ioctrl; 075 this.lock = new ReentrantLock(); 076 this.condition = this.lock.newCondition(); 077 } 078 079 /** 080 * @since 4.3 081 */ 082 public SharedOutputBuffer(final int buffersize, final ByteBufferAllocator allocator) { 083 super(buffersize, allocator); 084 this.lock = new ReentrantLock(); 085 this.condition = this.lock.newCondition(); 086 } 087 088 /** 089 * @since 4.3 090 */ 091 public SharedOutputBuffer(final int buffersize) { 092 this(buffersize, HeapByteBufferAllocator.INSTANCE); 093 } 094 095 @Override 096 public void reset() { 097 if (this.shutdown) { 098 return; 099 } 100 this.lock.lock(); 101 try { 102 clear(); 103 this.endOfStream = false; 104 } finally { 105 this.lock.unlock(); 106 } 107 } 108 109 @Override 110 public boolean hasData() { 111 this.lock.lock(); 112 try { 113 return super.hasData(); 114 } finally { 115 this.lock.unlock(); 116 } 117 } 118 119 @Override 120 public int available() { 121 this.lock.lock(); 122 try { 123 return super.available(); 124 } finally { 125 this.lock.unlock(); 126 } 127 } 128 129 @Override 130 public int capacity() { 131 this.lock.lock(); 132 try { 133 return super.capacity(); 134 } finally { 135 this.lock.unlock(); 136 } 137 } 138 139 @Override 140 public int length() { 141 this.lock.lock(); 142 try { 143 return super.length(); 144 } finally { 145 this.lock.unlock(); 146 } 147 } 148 149 /** 150 * @deprecated (4.3) use {@link #produceContent(ContentEncoder, IOControl)} 151 */ 152 @Override 153 @Deprecated 154 public int produceContent(final ContentEncoder encoder) throws IOException { 155 return produceContent(encoder, null); 156 } 157 158 /** 159 * @since 4.3 160 */ 161 public int produceContent(final ContentEncoder encoder, final IOControl ioctrl) throws IOException { 162 if (this.shutdown) { 163 return -1; 164 } 165 this.lock.lock(); 166 try { 167 if (ioctrl != null) { 168 this.ioctrl = ioctrl; 169 } 170 setOutputMode(); 171 int bytesWritten = 0; 172 if (super.hasData()) { 173 bytesWritten = encoder.write(this.buffer); 174 if (encoder.isCompleted()) { 175 this.endOfStream = true; 176 } 177 } 178 if (!super.hasData()) { 179 // No more buffered content 180 // If at the end of the stream, terminate 181 if (this.endOfStream && !encoder.isCompleted()) { 182 encoder.complete(); 183 } 184 if (!this.endOfStream) { 185 // suspend output events 186 if (this.ioctrl != null) { 187 this.ioctrl.suspendOutput(); 188 } 189 } 190 } 191 this.condition.signalAll(); 192 return bytesWritten; 193 } finally { 194 this.lock.unlock(); 195 } 196 } 197 198 public void close() { 199 shutdown(); 200 } 201 202 public void shutdown() { 203 if (this.shutdown) { 204 return; 205 } 206 this.shutdown = true; 207 this.lock.lock(); 208 try { 209 this.condition.signalAll(); 210 } finally { 211 this.lock.unlock(); 212 } 213 } 214 215 @Override 216 public void write(final byte[] b, final int off, final int len) throws IOException { 217 if (b == null) { 218 return; 219 } 220 int pos = off; 221 this.lock.lock(); 222 try { 223 Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing"); 224 setInputMode(); 225 int remaining = len; 226 while (remaining > 0) { 227 if (!this.buffer.hasRemaining()) { 228 flushContent(); 229 setInputMode(); 230 } 231 final int chunk = Math.min(remaining, this.buffer.remaining()); 232 this.buffer.put(b, pos, chunk); 233 remaining -= chunk; 234 pos += chunk; 235 } 236 } finally { 237 this.lock.unlock(); 238 } 239 } 240 241 public void write(final byte[] b) throws IOException { 242 if (b == null) { 243 return; 244 } 245 write(b, 0, b.length); 246 } 247 248 @Override 249 public void write(final int b) throws IOException { 250 this.lock.lock(); 251 try { 252 Asserts.check(!this.shutdown && !this.endOfStream, "Buffer already closed for writing"); 253 setInputMode(); 254 if (!this.buffer.hasRemaining()) { 255 flushContent(); 256 setInputMode(); 257 } 258 this.buffer.put((byte)b); 259 } finally { 260 this.lock.unlock(); 261 } 262 } 263 264 @Override 265 public void flush() throws IOException { 266 } 267 268 private void flushContent() throws IOException { 269 this.lock.lock(); 270 try { 271 try { 272 while (super.hasData()) { 273 if (this.shutdown) { 274 throw new InterruptedIOException("Output operation aborted"); 275 } 276 if (this.ioctrl != null) { 277 this.ioctrl.requestOutput(); 278 } 279 this.condition.await(); 280 } 281 } catch (final InterruptedException ex) { 282 throw new IOException("Interrupted while flushing the content buffer"); 283 } 284 } finally { 285 this.lock.unlock(); 286 } 287 } 288 289 @Override 290 public void writeCompleted() throws IOException { 291 this.lock.lock(); 292 try { 293 if (this.endOfStream) { 294 return; 295 } 296 this.endOfStream = true; 297 if (this.ioctrl != null) { 298 this.ioctrl.requestOutput(); 299 } 300 } finally { 301 this.lock.unlock(); 302 } 303 } 304 305}