001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027package org.apache.http.conn; 028 029import java.io.IOException; 030import java.io.InputStream; 031 032import org.apache.http.util.Args; 033 034/** 035 * A stream wrapper that triggers actions on {@link #close close()} and EOF. 036 * Primarily used to auto-release an underlying managed connection when the response 037 * body is consumed or no longer needed. 038 * 039 * @see EofSensorWatcher 040 * 041 * @since 4.0 042 */ 043// don't use FilterInputStream as the base class, we'd have to 044// override markSupported(), mark(), and reset() to disable them 045public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger { 046 047 /** 048 * The wrapped input stream, while accessible. 049 * The value changes to {@code null} when the wrapped stream 050 * becomes inaccessible. 051 */ 052 protected InputStream wrappedStream; 053 054 /** 055 * Indicates whether this stream itself is closed. 056 * If it isn't, but {@link #wrappedStream wrappedStream} 057 * is {@code null}, we're running in EOF mode. 058 * All read operations will indicate EOF without accessing 059 * the underlying stream. After closing this stream, read 060 * operations will trigger an {@link IOException IOException}. 061 * 062 * @see #isReadAllowed isReadAllowed 063 */ 064 private boolean selfClosed; 065 066 /** The watcher to be notified, if any. */ 067 private final EofSensorWatcher eofWatcher; 068 069 /** 070 * Creates a new EOF sensor. 071 * If no watcher is passed, the underlying stream will simply be 072 * closed when EOF is detected or {@link #close close} is called. 073 * Otherwise, the watcher decides whether the underlying stream 074 * should be closed before detaching from it. 075 * 076 * @param in the wrapped stream 077 * @param watcher the watcher for events, or {@code null} for 078 * auto-close behavior without notification 079 */ 080 public EofSensorInputStream(final InputStream in, 081 final EofSensorWatcher watcher) { 082 Args.notNull(in, "Wrapped stream"); 083 wrappedStream = in; 084 selfClosed = false; 085 eofWatcher = watcher; 086 } 087 088 boolean isSelfClosed() { 089 return selfClosed; 090 } 091 092 InputStream getWrappedStream() { 093 return wrappedStream; 094 } 095 096 /** 097 * Checks whether the underlying stream can be read from. 098 * 099 * @return {@code true} if the underlying stream is accessible, 100 * {@code false} if this stream is in EOF mode and 101 * detached from the underlying stream 102 * 103 * @throws IOException if this stream is already closed 104 */ 105 protected boolean isReadAllowed() throws IOException { 106 if (selfClosed) { 107 throw new IOException("Attempted read on closed stream."); 108 } 109 return (wrappedStream != null); 110 } 111 112 @Override 113 public int read() throws IOException { 114 int l = -1; 115 116 if (isReadAllowed()) { 117 try { 118 l = wrappedStream.read(); 119 checkEOF(l); 120 } catch (final IOException ex) { 121 checkAbort(); 122 throw ex; 123 } 124 } 125 126 return l; 127 } 128 129 @Override 130 public int read(final byte[] b, final int off, final int len) throws IOException { 131 int l = -1; 132 133 if (isReadAllowed()) { 134 try { 135 l = wrappedStream.read(b, off, len); 136 checkEOF(l); 137 } catch (final IOException ex) { 138 checkAbort(); 139 throw ex; 140 } 141 } 142 143 return l; 144 } 145 146 @Override 147 public int read(final byte[] b) throws IOException { 148 return read(b, 0, b.length); 149 } 150 151 @Override 152 public int available() throws IOException { 153 int a = 0; // not -1 154 155 if (isReadAllowed()) { 156 try { 157 a = wrappedStream.available(); 158 // no checkEOF() here, available() can't trigger EOF 159 } catch (final IOException ex) { 160 checkAbort(); 161 throw ex; 162 } 163 } 164 165 return a; 166 } 167 168 @Override 169 public void close() throws IOException { 170 // tolerate multiple calls to close() 171 selfClosed = true; 172 checkClose(); 173 } 174 175 /** 176 * Detects EOF and notifies the watcher. 177 * This method should only be called while the underlying stream is 178 * still accessible. Use {@link #isReadAllowed isReadAllowed} to 179 * check that condition. 180 * <p> 181 * If EOF is detected, the watcher will be notified and this stream 182 * is detached from the underlying stream. This prevents multiple 183 * notifications from this stream. 184 * </p> 185 * 186 * @param eof the result of the calling read operation. 187 * A negative value indicates that EOF is reached. 188 * 189 * @throws IOException 190 * in case of an IO problem on closing the underlying stream 191 */ 192 protected void checkEOF(final int eof) throws IOException { 193 194 final InputStream toCheckStream = wrappedStream; 195 if ((toCheckStream != null) && (eof < 0)) { 196 try { 197 boolean scws = true; // should close wrapped stream? 198 if (eofWatcher != null) { 199 scws = eofWatcher.eofDetected(toCheckStream); 200 } 201 if (scws) { 202 toCheckStream.close(); 203 } 204 } finally { 205 wrappedStream = null; 206 } 207 } 208 } 209 210 /** 211 * Detects stream close and notifies the watcher. 212 * There's not much to detect since this is called by {@link #close close}. 213 * The watcher will only be notified if this stream is closed 214 * for the first time and before EOF has been detected. 215 * This stream will be detached from the underlying stream to prevent 216 * multiple notifications to the watcher. 217 * 218 * @throws IOException 219 * in case of an IO problem on closing the underlying stream 220 */ 221 protected void checkClose() throws IOException { 222 223 final InputStream toCloseStream = wrappedStream; 224 if (toCloseStream != null) { 225 try { 226 boolean scws = true; // should close wrapped stream? 227 if (eofWatcher != null) { 228 scws = eofWatcher.streamClosed(toCloseStream); 229 } 230 if (scws) { 231 toCloseStream.close(); 232 } 233 } finally { 234 wrappedStream = null; 235 } 236 } 237 } 238 239 /** 240 * Detects stream abort and notifies the watcher. 241 * There's not much to detect since this is called by 242 * {@link #abortConnection abortConnection}. 243 * The watcher will only be notified if this stream is aborted 244 * for the first time and before EOF has been detected or the 245 * stream has been {@link #close closed} gracefully. 246 * This stream will be detached from the underlying stream to prevent 247 * multiple notifications to the watcher. 248 * 249 * @throws IOException 250 * in case of an IO problem on closing the underlying stream 251 */ 252 protected void checkAbort() throws IOException { 253 254 final InputStream toAbortStream = wrappedStream; 255 if (toAbortStream != null) { 256 try { 257 boolean scws = true; // should close wrapped stream? 258 if (eofWatcher != null) { 259 scws = eofWatcher.streamAbort(toAbortStream); 260 } 261 if (scws) { 262 toAbortStream.close(); 263 } 264 } finally { 265 wrappedStream = null; 266 } 267 } 268 } 269 270 /** 271 * Same as {@link #close close()}. 272 */ 273 @Override 274 public void releaseConnection() throws IOException { 275 close(); 276 } 277 278 /** 279 * Aborts this stream. 280 * This is a special version of {@link #close close()} which prevents 281 * re-use of the underlying connection, if any. Calling this method 282 * indicates that there should be no attempt to read until the end of 283 * the stream. 284 */ 285 @Override 286 public void abortConnection() throws IOException { 287 // tolerate multiple calls 288 selfClosed = true; 289 checkAbort(); 290 } 291 292} 293