001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.io;
029
030import java.io.IOException;
031import java.io.InputStream;
032
033import org.apache.http.ConnectionClosedException;
034import org.apache.http.Header;
035import org.apache.http.HttpException;
036import org.apache.http.MalformedChunkCodingException;
037import org.apache.http.TruncatedChunkException;
038import org.apache.http.config.MessageConstraints;
039import org.apache.http.io.BufferInfo;
040import org.apache.http.io.SessionInputBuffer;
041import org.apache.http.util.Args;
042import org.apache.http.util.CharArrayBuffer;
043
044/**
045 * Implements chunked transfer coding. The content is received in small chunks.
046 * Entities transferred using this input stream can be of unlimited length.
047 * After the stream is read to the end, it provides access to the trailers,
048 * if any.
049 * <p>
050 * Note that this class NEVER closes the underlying stream, even when close
051 * gets called.  Instead, it will read until the "end" of its chunking on
052 * close, which allows for the seamless execution of subsequent HTTP 1.1
053 * requests, while not requiring the client to remember to read the entire
054 * contents of the response.
055 *
056 *
057 * @since 4.0
058 *
059 */
060public class ChunkedInputStream extends InputStream {
061
062    private static final int CHUNK_LEN               = 1;
063    private static final int CHUNK_DATA              = 2;
064    private static final int CHUNK_CRLF              = 3;
065    private static final int CHUNK_INVALID           = Integer.MAX_VALUE;
066
067    private static final int BUFFER_SIZE = 2048;
068
069    /** The session input buffer */
070    private final SessionInputBuffer in;
071    private final CharArrayBuffer buffer;
072    private final MessageConstraints constraints;
073
074    private int state;
075
076    /** The chunk size */
077    private long chunkSize;
078
079    /** The current position within the current chunk */
080    private long pos;
081
082    /** True if we've reached the end of stream */
083    private boolean eof = false;
084
085    /** True if this stream is closed */
086    private boolean closed = false;
087
088    private Header[] footers = new Header[] {};
089
090    /**
091     * Wraps session input stream and reads chunk coded input.
092     *
093     * @param in The session input buffer
094     * @param constraints Message constraints. If {@code null}
095     *   {@link MessageConstraints#DEFAULT} will be used.
096     *
097     * @since 4.4
098     */
099    public ChunkedInputStream(final SessionInputBuffer in, final MessageConstraints constraints) {
100        super();
101        this.in = Args.notNull(in, "Session input buffer");
102        this.pos = 0L;
103        this.buffer = new CharArrayBuffer(16);
104        this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT;
105        this.state = CHUNK_LEN;
106    }
107
108    /**
109     * Wraps session input stream and reads chunk coded input.
110     *
111     * @param in The session input buffer
112     */
113    public ChunkedInputStream(final SessionInputBuffer in) {
114        this(in, null);
115    }
116
117    @Override
118    public int available() throws IOException {
119        if (this.in instanceof BufferInfo) {
120            final int len = ((BufferInfo) this.in).length();
121            return (int) Math.min(len, this.chunkSize - this.pos);
122        } else {
123            return 0;
124        }
125    }
126
127    /**
128     * <p> Returns all the data in a chunked stream in coalesced form. A chunk
129     * is followed by a CRLF. The method returns -1 as soon as a chunksize of 0
130     * is detected.</p>
131     *
132     * <p> Trailer headers are read automatically at the end of the stream and
133     * can be obtained with the getResponseFooters() method.</p>
134     *
135     * @return -1 of the end of the stream has been reached or the next data
136     * byte
137     * @throws IOException in case of an I/O error
138     */
139    @Override
140    public int read() throws IOException {
141        if (this.closed) {
142            throw new IOException("Attempted read from closed stream.");
143        }
144        if (this.eof) {
145            return -1;
146        }
147        if (state != CHUNK_DATA) {
148            nextChunk();
149            if (this.eof) {
150                return -1;
151            }
152        }
153        final int b = in.read();
154        if (b != -1) {
155            pos++;
156            if (pos >= chunkSize) {
157                state = CHUNK_CRLF;
158            }
159        }
160        return b;
161    }
162
163    /**
164     * Read some bytes from the stream.
165     * @param b The byte array that will hold the contents from the stream.
166     * @param off The offset into the byte array at which bytes will start to be
167     * placed.
168     * @param len the maximum number of bytes that can be returned.
169     * @return The number of bytes returned or -1 if the end of stream has been
170     * reached.
171     * @throws IOException in case of an I/O error
172     */
173    @Override
174    public int read (final byte[] b, final int off, final int len) throws IOException {
175
176        if (closed) {
177            throw new IOException("Attempted read from closed stream.");
178        }
179
180        if (eof) {
181            return -1;
182        }
183        if (state != CHUNK_DATA) {
184            nextChunk();
185            if (eof) {
186                return -1;
187            }
188        }
189        final int bytesRead = in.read(b, off, (int) Math.min(len, chunkSize - pos));
190        if (bytesRead != -1) {
191            pos += bytesRead;
192            if (pos >= chunkSize) {
193                state = CHUNK_CRLF;
194            }
195            return bytesRead;
196        } else {
197            eof = true;
198            throw new TruncatedChunkException("Truncated chunk "
199                    + "( expected size: " + chunkSize
200                    + "; actual size: " + pos + ")");
201        }
202    }
203
204    /**
205     * Read some bytes from the stream.
206     * @param b The byte array that will hold the contents from the stream.
207     * @return The number of bytes returned or -1 if the end of stream has been
208     * reached.
209     * @throws IOException in case of an I/O error
210     */
211    @Override
212    public int read (final byte[] b) throws IOException {
213        return read(b, 0, b.length);
214    }
215
216    /**
217     * Read the next chunk.
218     * @throws IOException in case of an I/O error
219     */
220    private void nextChunk() throws IOException {
221        if (state == CHUNK_INVALID) {
222            throw new MalformedChunkCodingException("Corrupt data stream");
223        }
224        try {
225            chunkSize = getChunkSize();
226            if (chunkSize < 0L) {
227                throw new MalformedChunkCodingException("Negative chunk size");
228            }
229            state = CHUNK_DATA;
230            pos = 0L;
231            if (chunkSize == 0L) {
232                eof = true;
233                parseTrailerHeaders();
234            }
235        } catch (final MalformedChunkCodingException ex) {
236            state = CHUNK_INVALID;
237            throw ex;
238        }
239    }
240
241    /**
242     * Expects the stream to start with a chunksize in hex with optional
243     * comments after a semicolon. The line must end with a CRLF: "a3; some
244     * comment\r\n" Positions the stream at the start of the next line.
245     */
246    private long getChunkSize() throws IOException {
247        final int st = this.state;
248        switch (st) {
249        case CHUNK_CRLF:
250            this.buffer.clear();
251            final int bytesRead1 = this.in.readLine(this.buffer);
252            if (bytesRead1 == -1) {
253                throw new MalformedChunkCodingException(
254                    "CRLF expected at end of chunk");
255            }
256            if (!this.buffer.isEmpty()) {
257                throw new MalformedChunkCodingException(
258                    "Unexpected content at the end of chunk");
259            }
260            state = CHUNK_LEN;
261            //$FALL-THROUGH$
262        case CHUNK_LEN:
263            this.buffer.clear();
264            final int bytesRead2 = this.in.readLine(this.buffer);
265            if (bytesRead2 == -1) {
266                throw new ConnectionClosedException("Premature end of chunk coded message body: " +
267                        "closing chunk expected");
268            }
269            int separator = this.buffer.indexOf(';');
270            if (separator < 0) {
271                separator = this.buffer.length();
272            }
273            final String s = this.buffer.substringTrimmed(0, separator);
274            try {
275                return Long.parseLong(s, 16);
276            } catch (final NumberFormatException e) {
277                throw new MalformedChunkCodingException("Bad chunk header: " + s);
278            }
279        default:
280            throw new IllegalStateException("Inconsistent codec state");
281        }
282    }
283
284    /**
285     * Reads and stores the Trailer headers.
286     * @throws IOException in case of an I/O error
287     */
288    private void parseTrailerHeaders() throws IOException {
289        try {
290            this.footers = AbstractMessageParser.parseHeaders(in,
291                    constraints.getMaxHeaderCount(),
292                    constraints.getMaxLineLength(),
293                    null);
294        } catch (final HttpException ex) {
295            final IOException ioe = new MalformedChunkCodingException("Invalid footer: "
296                    + ex.getMessage());
297            ioe.initCause(ex);
298            throw ioe;
299        }
300    }
301
302    /**
303     * Upon close, this reads the remainder of the chunked message,
304     * leaving the underlying socket at a position to start reading the
305     * next response without scanning.
306     * @throws IOException in case of an I/O error
307     */
308    @Override
309    public void close() throws IOException {
310        if (!closed) {
311            try {
312                if (!eof && state != CHUNK_INVALID) {
313                    // read and discard the remainder of the message
314                    final byte buff[] = new byte[BUFFER_SIZE];
315                    while (read(buff) >= 0) {
316                    }
317                }
318            } finally {
319                eof = true;
320                closed = true;
321            }
322        }
323    }
324
325    public Header[] getFooters() {
326        return this.footers.clone();
327    }
328
329}