001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.io;
029
030import java.io.IOException;
031import java.io.InputStream;
032
033import org.apache.http.ConnectionClosedException;
034import org.apache.http.io.BufferInfo;
035import org.apache.http.io.SessionInputBuffer;
036import org.apache.http.util.Args;
037
038/**
039 * Input stream that cuts off after a defined number of bytes. This class
040 * is used to receive content of HTTP messages where the end of the content
041 * entity is determined by the value of the {@code Content-Length header}.
042 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
043 * long.
044 * <p>
045 * Note that this class NEVER closes the underlying stream, even when close
046 * gets called.  Instead, it will read until the "end" of its limit on
047 * close, which allows for the seamless execution of subsequent HTTP 1.1
048 * requests, while not requiring the client to remember to read the entire
049 * contents of the response.
050 *
051 *
052 * @since 4.0
053 */
054public class ContentLengthInputStream extends InputStream {
055
056    private static final int BUFFER_SIZE = 2048;
057    /**
058     * The maximum number of bytes that can be read from the stream. Subsequent
059     * read operations will return -1.
060     */
061    private final long contentLength;
062
063    /** The current position */
064    private long pos = 0;
065
066    /** True if the stream is closed. */
067    private boolean closed = false;
068
069    /**
070     * Wrapped input stream that all calls are delegated to.
071     */
072    private SessionInputBuffer in = null;
073
074    /**
075     * Wraps a session input buffer and cuts off output after a defined number
076     * of bytes.
077     *
078     * @param in The session input buffer
079     * @param contentLength The maximum number of bytes that can be read from
080     * the stream. Subsequent read operations will return -1.
081     */
082    public ContentLengthInputStream(final SessionInputBuffer in, final long contentLength) {
083        super();
084        this.in = Args.notNull(in, "Session input buffer");
085        this.contentLength = Args.notNegative(contentLength, "Content length");
086    }
087
088    /**
089     * <p>Reads until the end of the known length of content.</p>
090     *
091     * <p>Does not close the underlying socket input, but instead leaves it
092     * primed to parse the next response.</p>
093     * @throws IOException If an IO problem occurs.
094     */
095    @Override
096    public void close() throws IOException {
097        if (!closed) {
098            try {
099                if (pos < contentLength) {
100                    final byte buffer[] = new byte[BUFFER_SIZE];
101                    while (read(buffer) >= 0) {
102                    }
103                }
104            } finally {
105                // close after above so that we don't throw an exception trying
106                // to read after closed!
107                closed = true;
108            }
109        }
110    }
111
112    @Override
113    public int available() throws IOException {
114        if (this.in instanceof BufferInfo) {
115            final int len = ((BufferInfo) this.in).length();
116            return Math.min(len, (int) (this.contentLength - this.pos));
117        } else {
118            return 0;
119        }
120    }
121
122    /**
123     * Read the next byte from the stream
124     * @return The next byte or -1 if the end of stream has been reached.
125     * @throws IOException If an IO problem occurs
126     * @see java.io.InputStream#read()
127     */
128    @Override
129    public int read() throws IOException {
130        if (closed) {
131            throw new IOException("Attempted read from closed stream.");
132        }
133
134        if (pos >= contentLength) {
135            return -1;
136        }
137        final int b = this.in.read();
138        if (b == -1) {
139            if (pos < contentLength) {
140                throw new ConnectionClosedException(
141                        "Premature end of Content-Length delimited message body (expected: "
142                        + contentLength + "; received: " + pos);
143            }
144        } else {
145            pos++;
146        }
147        return b;
148    }
149
150    /**
151     * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
152     * also notifies the watcher when the contents have been consumed.
153     *
154     * @param b     The byte array to fill.
155     * @param off   Start filling at this position.
156     * @param len   The number of bytes to attempt to read.
157     * @return The number of bytes read, or -1 if the end of content has been
158     *  reached.
159     *
160     * @throws java.io.IOException Should an error occur on the wrapped stream.
161     */
162    @Override
163    public int read (final byte[] b, final int off, final int len) throws java.io.IOException {
164        if (closed) {
165            throw new IOException("Attempted read from closed stream.");
166        }
167
168        if (pos >= contentLength) {
169            return -1;
170        }
171
172        int chunk = len;
173        if (pos + len > contentLength) {
174            chunk = (int) (contentLength - pos);
175        }
176        final int count = this.in.read(b, off, chunk);
177        if (count == -1 && pos < contentLength) {
178            throw new ConnectionClosedException(
179                    "Premature end of Content-Length delimited message body (expected: "
180                    + contentLength + "; received: " + pos);
181        }
182        if (count > 0) {
183            pos += count;
184        }
185        return count;
186    }
187
188
189    /**
190     * Read more bytes from the stream.
191     * @param b The byte array to put the new data in.
192     * @return The number of bytes read into the buffer.
193     * @throws IOException If an IO problem occurs
194     * @see java.io.InputStream#read(byte[])
195     */
196    @Override
197    public int read(final byte[] b) throws IOException {
198        return read(b, 0, b.length);
199    }
200
201    /**
202     * Skips and discards a number of bytes from the input stream.
203     * @param n The number of bytes to skip.
204     * @return The actual number of bytes skipped. &le; 0 if no bytes
205     * are skipped.
206     * @throws IOException If an error occurs while skipping bytes.
207     * @see InputStream#skip(long)
208     */
209    @Override
210    public long skip(final long n) throws IOException {
211        if (n <= 0) {
212            return 0;
213        }
214        final byte[] buffer = new byte[BUFFER_SIZE];
215        // make sure we don't skip more bytes than are
216        // still available
217        long remaining = Math.min(n, this.contentLength - this.pos);
218        // skip and keep track of the bytes actually skipped
219        long count = 0;
220        while (remaining > 0) {
221            final int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
222            if (l == -1) {
223                break;
224            }
225            count += l;
226            remaining -= l;
227        }
228        return count;
229    }
230}