001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027package org.apache.http.nio.util; 028 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.util.concurrent.locks.Condition; 032import java.util.concurrent.locks.ReentrantLock; 033 034import org.apache.http.annotation.ThreadingBehavior; 035import org.apache.http.annotation.Contract; 036import org.apache.http.nio.ContentDecoder; 037import org.apache.http.nio.IOControl; 038 039/** 040 * Implementation of the {@link ContentInputBuffer} interface that can be 041 * shared by multiple threads, usually the I/O dispatch of an I/O reactor and 042 * a worker thread. 043 * <p> 044 * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer 045 * by calling {@link #consumeContent(ContentDecoder)}. 046 * <p> 047 * The worker thread is expected to read the data from the buffer by calling 048 * {@link #read()} or {@link #read(byte[], int, int)} methods. 049 * <p> 050 * In case of an abnormal situation or when no longer needed the buffer must be shut down 051 * using {@link #shutdown()} method. 052 * 053 * @since 4.0 054 */ 055@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) 056public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer { 057 058 private final ReentrantLock lock; 059 private final Condition condition; 060 061 private volatile IOControl ioctrl; 062 private volatile boolean shutdown = false; 063 private volatile boolean endOfStream = false; 064 065 /** 066 * @deprecated (4.3) use {@link SharedInputBuffer#SharedInputBuffer(int, ByteBufferAllocator)} 067 */ 068 @Deprecated 069 public SharedInputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) { 070 super(buffersize, allocator); 071 this.ioctrl = ioctrl; 072 this.lock = new ReentrantLock(); 073 this.condition = this.lock.newCondition(); 074 } 075 076 /** 077 * @since 4.3 078 */ 079 public SharedInputBuffer(final int buffersize, final ByteBufferAllocator allocator) { 080 super(buffersize, allocator); 081 this.lock = new ReentrantLock(); 082 this.condition = this.lock.newCondition(); 083 } 084 085 /** 086 * @since 4.3 087 */ 088 public SharedInputBuffer(final int buffersize) { 089 this(buffersize, HeapByteBufferAllocator.INSTANCE); 090 } 091 092 @Override 093 public void reset() { 094 if (this.shutdown) { 095 return; 096 } 097 this.lock.lock(); 098 try { 099 clear(); 100 this.endOfStream = false; 101 } finally { 102 this.lock.unlock(); 103 } 104 } 105 106 /** 107 * @deprecated (4.3) use {@link #consumeContent(ContentDecoder, IOControl)} 108 */ 109 @Override 110 @Deprecated 111 public int consumeContent(final ContentDecoder decoder) throws IOException { 112 return consumeContent(decoder, null); 113 } 114 115 /** 116 * @since 4.3 117 */ 118 public int consumeContent(final ContentDecoder decoder, final IOControl ioctrl) throws IOException { 119 if (this.shutdown) { 120 return -1; 121 } 122 this.lock.lock(); 123 try { 124 if (ioctrl != null) { 125 this.ioctrl = ioctrl; 126 } 127 setInputMode(); 128 int totalRead = 0; 129 int bytesRead; 130 while ((bytesRead = decoder.read(this.buffer)) > 0) { 131 totalRead += bytesRead; 132 } 133 if (bytesRead == -1 || decoder.isCompleted()) { 134 this.endOfStream = true; 135 } 136 if (!this.buffer.hasRemaining()) { 137 if (this.ioctrl != null) { 138 this.ioctrl.suspendInput(); 139 } 140 } 141 this.condition.signalAll(); 142 143 if (totalRead > 0) { 144 return totalRead; 145 } else { 146 if (this.endOfStream) { 147 return -1; 148 } else { 149 return 0; 150 } 151 } 152 } finally { 153 this.lock.unlock(); 154 } 155 } 156 157 @Override 158 public boolean hasData() { 159 this.lock.lock(); 160 try { 161 return super.hasData(); 162 } finally { 163 this.lock.unlock(); 164 } 165 } 166 167 @Override 168 public int available() { 169 this.lock.lock(); 170 try { 171 return super.available(); 172 } finally { 173 this.lock.unlock(); 174 } 175 } 176 177 @Override 178 public int capacity() { 179 this.lock.lock(); 180 try { 181 return super.capacity(); 182 } finally { 183 this.lock.unlock(); 184 } 185 } 186 187 @Override 188 public int length() { 189 this.lock.lock(); 190 try { 191 return super.length(); 192 } finally { 193 this.lock.unlock(); 194 } 195 } 196 197 protected void waitForData() throws IOException { 198 this.lock.lock(); 199 try { 200 try { 201 while (!super.hasData() && !this.endOfStream) { 202 if (this.shutdown) { 203 throw new InterruptedIOException("Input operation aborted"); 204 } 205 if (this.ioctrl != null) { 206 this.ioctrl.requestInput(); 207 } 208 this.condition.await(); 209 } 210 } catch (final InterruptedException ex) { 211 throw new IOException("Interrupted while waiting for more data"); 212 } 213 } finally { 214 this.lock.unlock(); 215 } 216 } 217 218 public void close() { 219 if (this.shutdown) { 220 return; 221 } 222 this.endOfStream = true; 223 this.lock.lock(); 224 try { 225 this.condition.signalAll(); 226 } finally { 227 this.lock.unlock(); 228 } 229 } 230 231 public void shutdown() { 232 if (this.shutdown) { 233 return; 234 } 235 this.shutdown = true; 236 this.lock.lock(); 237 try { 238 this.condition.signalAll(); 239 } finally { 240 this.lock.unlock(); 241 } 242 } 243 244 protected boolean isShutdown() { 245 return this.shutdown; 246 } 247 248 protected boolean isEndOfStream() { 249 return this.shutdown || (!hasData() && this.endOfStream); 250 } 251 252 @Override 253 public int read() throws IOException { 254 if (this.shutdown) { 255 return -1; 256 } 257 this.lock.lock(); 258 try { 259 if (!hasData()) { 260 waitForData(); 261 } 262 if (isEndOfStream()) { 263 return -1; 264 } 265 return this.buffer.get() & 0xff; 266 } finally { 267 this.lock.unlock(); 268 } 269 } 270 271 @Override 272 public int read(final byte[] b, final int off, final int len) throws IOException { 273 if (this.shutdown) { 274 return -1; 275 } 276 if (b == null) { 277 return 0; 278 } 279 this.lock.lock(); 280 try { 281 if (!hasData()) { 282 waitForData(); 283 } 284 if (isEndOfStream()) { 285 return -1; 286 } 287 setOutputMode(); 288 int chunk = len; 289 if (chunk > this.buffer.remaining()) { 290 chunk = this.buffer.remaining(); 291 } 292 this.buffer.get(b, off, chunk); 293 return chunk; 294 } finally { 295 this.lock.unlock(); 296 } 297 } 298 299 public int read(final byte[] b) throws IOException { 300 if (this.shutdown) { 301 return -1; 302 } 303 if (b == null) { 304 return 0; 305 } 306 return read(b, 0, b.length); 307 } 308 309}