001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.codecs; 029 030import java.io.IOException; 031import java.nio.ByteBuffer; 032import java.nio.channels.FileChannel; 033import java.nio.channels.WritableByteChannel; 034 035import org.apache.http.impl.io.HttpTransportMetricsImpl; 036import org.apache.http.nio.FileContentEncoder; 037import org.apache.http.nio.reactor.SessionOutputBuffer; 038import org.apache.http.util.Args; 039 040/** 041 * Content encoder that cuts off after a defined number of bytes. This class 042 * is used to send content of HTTP messages where the end of the content entity 043 * is determined by the value of the {@code Content-Length header}. 044 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE} 045 * long. 046 * <p> 047 * This decoder is optimized to transfer data directly from 048 * a {@link FileChannel} to the underlying I/O session's channel whenever 049 * possible avoiding intermediate buffering in the session buffer. 050 * 051 * @since 4.0 052 */ 053public class LengthDelimitedEncoder extends AbstractContentEncoder 054 implements FileContentEncoder { 055 056 private final long contentLength; 057 private final int fragHint; 058 059 private long remaining; 060 061 /** 062 * @since 4.3 063 * 064 * @param channel underlying channel. 065 * @param buffer session buffer. 066 * @param metrics transport metrics. 067 * @param contentLength content length. 068 * @param fragementSizeHint fragment size hint defining an minimal size of a fragment 069 * that should be written out directly to the channel bypassing the session buffer. 070 * Value {@code 0} disables fragment buffering. 071 */ 072 public LengthDelimitedEncoder( 073 final WritableByteChannel channel, 074 final SessionOutputBuffer buffer, 075 final HttpTransportMetricsImpl metrics, 076 final long contentLength, 077 final int fragementSizeHint) { 078 super(channel, buffer, metrics); 079 Args.notNegative(contentLength, "Content length"); 080 this.contentLength = contentLength; 081 this.fragHint = fragementSizeHint > 0 ? fragementSizeHint : 0; 082 this.remaining = contentLength; 083 } 084 085 public LengthDelimitedEncoder( 086 final WritableByteChannel channel, 087 final SessionOutputBuffer buffer, 088 final HttpTransportMetricsImpl metrics, 089 final long contentLength) { 090 this(channel, buffer, metrics, contentLength, 0); 091 } 092 093 private int nextChunk(final ByteBuffer src) { 094 return (int) Math.min(Math.min(this.remaining, Integer.MAX_VALUE), src.remaining()); 095 } 096 097 @Override 098 public int write(final ByteBuffer src) throws IOException { 099 if (src == null) { 100 return 0; 101 } 102 assertNotCompleted(); 103 104 int total = 0; 105 while (src.hasRemaining() && this.remaining > 0) { 106 if (this.buffer.hasData() || this.fragHint > 0) { 107 final int chunk = nextChunk(src); 108 if (chunk <= this.fragHint) { 109 final int capacity = this.fragHint - this.buffer.length(); 110 if (capacity > 0) { 111 final int limit = Math.min(capacity, chunk); 112 final int bytesWritten = writeToBuffer(src, limit); 113 this.remaining -= bytesWritten; 114 total += bytesWritten; 115 } 116 } 117 } 118 if (this.buffer.hasData()) { 119 final int chunk = nextChunk(src); 120 if (this.buffer.length() >= this.fragHint || chunk > 0) { 121 final int bytesWritten = flushToChannel(); 122 if (bytesWritten == 0) { 123 break; 124 } 125 } 126 } 127 if (!this.buffer.hasData()) { 128 final int chunk = nextChunk(src); 129 if (chunk > this.fragHint) { 130 final int bytesWritten = writeToChannel(src, chunk); 131 this.remaining -= bytesWritten; 132 total += bytesWritten; 133 if (bytesWritten == 0) { 134 break; 135 } 136 } 137 } 138 } 139 if (this.remaining <= 0) { 140 super.complete(); 141 } 142 return total; 143 } 144 145 @Override 146 public long transfer( 147 final FileChannel src, 148 final long position, 149 final long count) throws IOException { 150 151 if (src == null) { 152 return 0; 153 } 154 assertNotCompleted(); 155 156 flushToChannel(); 157 if (this.buffer.hasData()) { 158 return 0; 159 } 160 161 final long chunk = Math.min(this.remaining, count); 162 final long bytesWritten = src.transferTo(position, chunk, this.channel); 163 if (bytesWritten > 0) { 164 this.metrics.incrementBytesTransferred(bytesWritten); 165 } 166 this.remaining -= bytesWritten; 167 if (this.remaining <= 0) { 168 super.complete(); 169 } 170 return bytesWritten; 171 } 172 173 @Override 174 public String toString() { 175 final StringBuilder sb = new StringBuilder(); 176 sb.append("[content length: "); 177 sb.append(this.contentLength); 178 sb.append("; pos: "); 179 sb.append(this.contentLength - this.remaining); 180 sb.append("; completed: "); 181 sb.append(isCompleted()); 182 sb.append("]"); 183 return sb.toString(); 184 } 185 186}