001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.io; 029 030import java.io.IOException; 031import java.io.InputStream; 032 033import org.apache.http.ConnectionClosedException; 034import org.apache.http.io.BufferInfo; 035import org.apache.http.io.SessionInputBuffer; 036import org.apache.http.util.Args; 037 038/** 039 * Input stream that cuts off after a defined number of bytes. This class 040 * is used to receive content of HTTP messages where the end of the content 041 * entity is determined by the value of the {@code Content-Length header}. 042 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE} 043 * long. 044 * <p> 045 * Note that this class NEVER closes the underlying stream, even when close 046 * gets called. Instead, it will read until the "end" of its limit on 047 * close, which allows for the seamless execution of subsequent HTTP 1.1 048 * requests, while not requiring the client to remember to read the entire 049 * contents of the response. 050 * 051 * 052 * @since 4.0 053 */ 054public class ContentLengthInputStream extends InputStream { 055 056 private static final int BUFFER_SIZE = 2048; 057 /** 058 * The maximum number of bytes that can be read from the stream. Subsequent 059 * read operations will return -1. 060 */ 061 private final long contentLength; 062 063 /** The current position */ 064 private long pos = 0; 065 066 /** True if the stream is closed. */ 067 private boolean closed = false; 068 069 /** 070 * Wrapped input stream that all calls are delegated to. 071 */ 072 private SessionInputBuffer in = null; 073 074 /** 075 * Wraps a session input buffer and cuts off output after a defined number 076 * of bytes. 077 * 078 * @param in The session input buffer 079 * @param contentLength The maximum number of bytes that can be read from 080 * the stream. Subsequent read operations will return -1. 081 */ 082 public ContentLengthInputStream(final SessionInputBuffer in, final long contentLength) { 083 super(); 084 this.in = Args.notNull(in, "Session input buffer"); 085 this.contentLength = Args.notNegative(contentLength, "Content length"); 086 } 087 088 /** 089 * <p>Reads until the end of the known length of content.</p> 090 * 091 * <p>Does not close the underlying socket input, but instead leaves it 092 * primed to parse the next response.</p> 093 * @throws IOException If an IO problem occurs. 094 */ 095 @Override 096 public void close() throws IOException { 097 if (!closed) { 098 try { 099 if (pos < contentLength) { 100 final byte buffer[] = new byte[BUFFER_SIZE]; 101 while (read(buffer) >= 0) { 102 } 103 } 104 } finally { 105 // close after above so that we don't throw an exception trying 106 // to read after closed! 107 closed = true; 108 } 109 } 110 } 111 112 @Override 113 public int available() throws IOException { 114 if (this.in instanceof BufferInfo) { 115 final int len = ((BufferInfo) this.in).length(); 116 return Math.min(len, (int) (this.contentLength - this.pos)); 117 } else { 118 return 0; 119 } 120 } 121 122 /** 123 * Read the next byte from the stream 124 * @return The next byte or -1 if the end of stream has been reached. 125 * @throws IOException If an IO problem occurs 126 * @see java.io.InputStream#read() 127 */ 128 @Override 129 public int read() throws IOException { 130 if (closed) { 131 throw new IOException("Attempted read from closed stream."); 132 } 133 134 if (pos >= contentLength) { 135 return -1; 136 } 137 final int b = this.in.read(); 138 if (b == -1) { 139 if (pos < contentLength) { 140 throw new ConnectionClosedException( 141 "Premature end of Content-Length delimited message body (expected: " 142 + contentLength + "; received: " + pos); 143 } 144 } else { 145 pos++; 146 } 147 return b; 148 } 149 150 /** 151 * Does standard {@link InputStream#read(byte[], int, int)} behavior, but 152 * also notifies the watcher when the contents have been consumed. 153 * 154 * @param b The byte array to fill. 155 * @param off Start filling at this position. 156 * @param len The number of bytes to attempt to read. 157 * @return The number of bytes read, or -1 if the end of content has been 158 * reached. 159 * 160 * @throws java.io.IOException Should an error occur on the wrapped stream. 161 */ 162 @Override 163 public int read (final byte[] b, final int off, final int len) throws java.io.IOException { 164 if (closed) { 165 throw new IOException("Attempted read from closed stream."); 166 } 167 168 if (pos >= contentLength) { 169 return -1; 170 } 171 172 int chunk = len; 173 if (pos + len > contentLength) { 174 chunk = (int) (contentLength - pos); 175 } 176 final int count = this.in.read(b, off, chunk); 177 if (count == -1 && pos < contentLength) { 178 throw new ConnectionClosedException( 179 "Premature end of Content-Length delimited message body (expected: " 180 + contentLength + "; received: " + pos); 181 } 182 if (count > 0) { 183 pos += count; 184 } 185 return count; 186 } 187 188 189 /** 190 * Read more bytes from the stream. 191 * @param b The byte array to put the new data in. 192 * @return The number of bytes read into the buffer. 193 * @throws IOException If an IO problem occurs 194 * @see java.io.InputStream#read(byte[]) 195 */ 196 @Override 197 public int read(final byte[] b) throws IOException { 198 return read(b, 0, b.length); 199 } 200 201 /** 202 * Skips and discards a number of bytes from the input stream. 203 * @param n The number of bytes to skip. 204 * @return The actual number of bytes skipped. ≤ 0 if no bytes 205 * are skipped. 206 * @throws IOException If an error occurs while skipping bytes. 207 * @see InputStream#skip(long) 208 */ 209 @Override 210 public long skip(final long n) throws IOException { 211 if (n <= 0) { 212 return 0; 213 } 214 final byte[] buffer = new byte[BUFFER_SIZE]; 215 // make sure we don't skip more bytes than are 216 // still available 217 long remaining = Math.min(n, this.contentLength - this.pos); 218 // skip and keep track of the bytes actually skipped 219 long count = 0; 220 while (remaining > 0) { 221 final int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining)); 222 if (l == -1) { 223 break; 224 } 225 count += l; 226 remaining -= l; 227 } 228 return count; 229 } 230}