001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027package org.apache.http.conn;
028
029import java.io.IOException;
030import java.io.InputStream;
031
032import org.apache.http.util.Args;
033
034/**
035 * A stream wrapper that triggers actions on {@link #close close()} and EOF.
036 * Primarily used to auto-release an underlying managed connection when the response
037 * body is consumed or no longer needed.
038 *
039 * @see EofSensorWatcher
040 *
041 * @since 4.0
042 */
043// don't use FilterInputStream as the base class, we'd have to
044// override markSupported(), mark(), and reset() to disable them
045public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
046
047    /**
048     * The wrapped input stream, while accessible.
049     * The value changes to {@code null} when the wrapped stream
050     * becomes inaccessible.
051     */
052    protected InputStream wrappedStream;
053
054    /**
055     * Indicates whether this stream itself is closed.
056     * If it isn't, but {@link #wrappedStream wrappedStream}
057     * is {@code null}, we're running in EOF mode.
058     * All read operations will indicate EOF without accessing
059     * the underlying stream. After closing this stream, read
060     * operations will trigger an {@link IOException IOException}.
061     *
062     * @see #isReadAllowed isReadAllowed
063     */
064    private boolean selfClosed;
065
066    /** The watcher to be notified, if any. */
067    private final EofSensorWatcher eofWatcher;
068
069    /**
070     * Creates a new EOF sensor.
071     * If no watcher is passed, the underlying stream will simply be
072     * closed when EOF is detected or {@link #close close} is called.
073     * Otherwise, the watcher decides whether the underlying stream
074     * should be closed before detaching from it.
075     *
076     * @param in        the wrapped stream
077     * @param watcher   the watcher for events, or {@code null} for
078     *                  auto-close behavior without notification
079     */
080    public EofSensorInputStream(final InputStream in,
081                                final EofSensorWatcher watcher) {
082        Args.notNull(in, "Wrapped stream");
083        wrappedStream = in;
084        selfClosed = false;
085        eofWatcher = watcher;
086    }
087
088    boolean isSelfClosed() {
089        return selfClosed;
090    }
091
092    InputStream getWrappedStream() {
093        return wrappedStream;
094    }
095
096    /**
097     * Checks whether the underlying stream can be read from.
098     *
099     * @return  {@code true} if the underlying stream is accessible,
100     *          {@code false} if this stream is in EOF mode and
101     *          detached from the underlying stream
102     *
103     * @throws IOException      if this stream is already closed
104     */
105    protected boolean isReadAllowed() throws IOException {
106        if (selfClosed) {
107            throw new IOException("Attempted read on closed stream.");
108        }
109        return (wrappedStream != null);
110    }
111
112    @Override
113    public int read() throws IOException {
114        int l = -1;
115
116        if (isReadAllowed()) {
117            try {
118                l = wrappedStream.read();
119                checkEOF(l);
120            } catch (final IOException ex) {
121                checkAbort();
122                throw ex;
123            }
124        }
125
126        return l;
127    }
128
129    @Override
130    public int read(final byte[] b, final int off, final int len) throws IOException {
131        int l = -1;
132
133        if (isReadAllowed()) {
134            try {
135                l = wrappedStream.read(b,  off,  len);
136                checkEOF(l);
137            } catch (final IOException ex) {
138                checkAbort();
139                throw ex;
140            }
141        }
142
143        return l;
144    }
145
146    @Override
147    public int read(final byte[] b) throws IOException {
148        return read(b, 0, b.length);
149    }
150
151    @Override
152    public int available() throws IOException {
153        int a = 0; // not -1
154
155        if (isReadAllowed()) {
156            try {
157                a = wrappedStream.available();
158                // no checkEOF() here, available() can't trigger EOF
159            } catch (final IOException ex) {
160                checkAbort();
161                throw ex;
162            }
163        }
164
165        return a;
166    }
167
168    @Override
169    public void close() throws IOException {
170        // tolerate multiple calls to close()
171        selfClosed = true;
172        checkClose();
173    }
174
175    /**
176     * Detects EOF and notifies the watcher.
177     * This method should only be called while the underlying stream is
178     * still accessible. Use {@link #isReadAllowed isReadAllowed} to
179     * check that condition.
180     * <p>
181     * If EOF is detected, the watcher will be notified and this stream
182     * is detached from the underlying stream. This prevents multiple
183     * notifications from this stream.
184     * </p>
185     *
186     * @param eof       the result of the calling read operation.
187     *                  A negative value indicates that EOF is reached.
188     *
189     * @throws IOException
190     *          in case of an IO problem on closing the underlying stream
191     */
192    protected void checkEOF(final int eof) throws IOException {
193
194        final InputStream toCheckStream = wrappedStream;
195        if ((toCheckStream != null) && (eof < 0)) {
196            try {
197                boolean scws = true; // should close wrapped stream?
198                if (eofWatcher != null) {
199                    scws = eofWatcher.eofDetected(toCheckStream);
200                }
201                if (scws) {
202                    toCheckStream.close();
203                }
204            } finally {
205                wrappedStream = null;
206            }
207        }
208    }
209
210    /**
211     * Detects stream close and notifies the watcher.
212     * There's not much to detect since this is called by {@link #close close}.
213     * The watcher will only be notified if this stream is closed
214     * for the first time and before EOF has been detected.
215     * This stream will be detached from the underlying stream to prevent
216     * multiple notifications to the watcher.
217     *
218     * @throws IOException
219     *          in case of an IO problem on closing the underlying stream
220     */
221    protected void checkClose() throws IOException {
222
223        final InputStream toCloseStream = wrappedStream;
224        if (toCloseStream != null) {
225            try {
226                boolean scws = true; // should close wrapped stream?
227                if (eofWatcher != null) {
228                    scws = eofWatcher.streamClosed(toCloseStream);
229                }
230                if (scws) {
231                    toCloseStream.close();
232                }
233            } finally {
234                wrappedStream = null;
235            }
236        }
237    }
238
239    /**
240     * Detects stream abort and notifies the watcher.
241     * There's not much to detect since this is called by
242     * {@link #abortConnection abortConnection}.
243     * The watcher will only be notified if this stream is aborted
244     * for the first time and before EOF has been detected or the
245     * stream has been {@link #close closed} gracefully.
246     * This stream will be detached from the underlying stream to prevent
247     * multiple notifications to the watcher.
248     *
249     * @throws IOException
250     *          in case of an IO problem on closing the underlying stream
251     */
252    protected void checkAbort() throws IOException {
253
254        final InputStream toAbortStream = wrappedStream;
255        if (toAbortStream != null) {
256            try {
257                boolean scws = true; // should close wrapped stream?
258                if (eofWatcher != null) {
259                    scws = eofWatcher.streamAbort(toAbortStream);
260                }
261                if (scws) {
262                    toAbortStream.close();
263                }
264            } finally {
265                wrappedStream = null;
266            }
267        }
268    }
269
270    /**
271     * Same as {@link #close close()}.
272     */
273    @Override
274    public void releaseConnection() throws IOException {
275        close();
276    }
277
278    /**
279     * Aborts this stream.
280     * This is a special version of {@link #close close()} which prevents
281     * re-use of the underlying connection, if any. Calling this method
282     * indicates that there should be no attempt to read until the end of
283     * the stream.
284     */
285    @Override
286    public void abortConnection() throws IOException {
287        // tolerate multiple calls
288        selfClosed = true;
289        checkAbort();
290    }
291
292}
293