001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.codecs;
029
030import java.io.IOException;
031import java.nio.ByteBuffer;
032import java.nio.channels.FileChannel;
033import java.nio.channels.WritableByteChannel;
034
035import org.apache.http.impl.io.HttpTransportMetricsImpl;
036import org.apache.http.nio.FileContentEncoder;
037import org.apache.http.nio.reactor.SessionOutputBuffer;
038import org.apache.http.util.Args;
039
040/**
041 * Content encoder that cuts off after a defined number of bytes. This class
042 * is used to send content of HTTP messages where the end of the content entity
043 * is determined by the value of the {@code Content-Length header}.
044 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
045 * long.
046 * <p>
047 * This decoder is optimized to transfer data directly from
048 * a {@link FileChannel} to the underlying I/O session's channel whenever
049 * possible avoiding intermediate buffering in the session buffer.
050 *
051 * @since 4.0
052 */
053public class LengthDelimitedEncoder extends AbstractContentEncoder
054        implements FileContentEncoder {
055
056    private final long contentLength;
057    private final int fragHint;
058
059    private long remaining;
060
061    /**
062     * @since 4.3
063     *
064     * @param channel underlying channel.
065     * @param buffer  session buffer.
066     * @param metrics transport metrics.
067     * @param contentLength content length.
068     * @param fragementSizeHint fragment size hint defining an minimal size of a fragment
069     *   that should be written out directly to the channel bypassing the session buffer.
070     *   Value {@code 0} disables fragment buffering.
071     */
072    public LengthDelimitedEncoder(
073            final WritableByteChannel channel,
074            final SessionOutputBuffer buffer,
075            final HttpTransportMetricsImpl metrics,
076            final long contentLength,
077            final int fragementSizeHint) {
078        super(channel, buffer, metrics);
079        Args.notNegative(contentLength, "Content length");
080        this.contentLength = contentLength;
081        this.fragHint = fragementSizeHint > 0 ? fragementSizeHint : 0;
082        this.remaining = contentLength;
083    }
084
085    public LengthDelimitedEncoder(
086            final WritableByteChannel channel,
087            final SessionOutputBuffer buffer,
088            final HttpTransportMetricsImpl metrics,
089            final long contentLength) {
090        this(channel, buffer, metrics, contentLength, 0);
091    }
092
093    private int nextChunk(final ByteBuffer src) {
094        return (int) Math.min(Math.min(this.remaining, Integer.MAX_VALUE), src.remaining());
095    }
096
097    @Override
098    public int write(final ByteBuffer src) throws IOException {
099        if (src == null) {
100            return 0;
101        }
102        assertNotCompleted();
103
104        int total = 0;
105        while (src.hasRemaining() && this.remaining > 0) {
106            if (this.buffer.hasData() || this.fragHint > 0) {
107                final int chunk = nextChunk(src);
108                if (chunk <= this.fragHint) {
109                    final int capacity = this.fragHint - this.buffer.length();
110                    if (capacity > 0) {
111                        final int limit = Math.min(capacity, chunk);
112                        final int bytesWritten = writeToBuffer(src, limit);
113                        this.remaining -= bytesWritten;
114                        total += bytesWritten;
115                    }
116                }
117            }
118            if (this.buffer.hasData()) {
119                final int chunk = nextChunk(src);
120                if (this.buffer.length() >= this.fragHint || chunk > 0) {
121                    final int bytesWritten = flushToChannel();
122                    if (bytesWritten == 0) {
123                        break;
124                    }
125                }
126            }
127            if (!this.buffer.hasData()) {
128                final int chunk = nextChunk(src);
129                if (chunk > this.fragHint) {
130                    final int bytesWritten = writeToChannel(src, chunk);
131                    this.remaining -= bytesWritten;
132                    total += bytesWritten;
133                    if (bytesWritten == 0) {
134                        break;
135                    }
136                }
137            }
138        }
139        if (this.remaining <= 0) {
140            super.complete();
141        }
142        return total;
143    }
144
145    @Override
146    public long transfer(
147            final FileChannel src,
148            final long position,
149            final long count) throws IOException {
150
151        if (src == null) {
152            return 0;
153        }
154        assertNotCompleted();
155
156        flushToChannel();
157        if (this.buffer.hasData()) {
158            return 0;
159        }
160
161        final long chunk = Math.min(this.remaining, count);
162        final long bytesWritten = src.transferTo(position, chunk, this.channel);
163        if (bytesWritten > 0) {
164            this.metrics.incrementBytesTransferred(bytesWritten);
165        }
166        this.remaining -= bytesWritten;
167        if (this.remaining <= 0) {
168            super.complete();
169        }
170        return bytesWritten;
171    }
172
173    @Override
174    public String toString() {
175        final StringBuilder sb = new StringBuilder();
176        sb.append("[content length: ");
177        sb.append(this.contentLength);
178        sb.append("; pos: ");
179        sb.append(this.contentLength - this.remaining);
180        sb.append("; completed: ");
181        sb.append(isCompleted());
182        sb.append("]");
183        return sb.toString();
184    }
185
186}