001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.codecs;
029
030import java.io.IOException;
031import java.nio.ByteBuffer;
032import java.nio.channels.WritableByteChannel;
033
034import org.apache.http.impl.io.HttpTransportMetricsImpl;
035import org.apache.http.nio.ContentEncoder;
036import org.apache.http.nio.reactor.SessionOutputBuffer;
037import org.apache.http.util.Args;
038import org.apache.http.util.Asserts;
039
040/**
041 * Abstract {@link ContentEncoder} that serves as a base for all content
042 * encoder implementations.
043 *
044 * @since 4.0
045 */
046public abstract class AbstractContentEncoder implements ContentEncoder {
047
048    protected final WritableByteChannel channel;
049    protected final SessionOutputBuffer buffer;
050    protected final HttpTransportMetricsImpl metrics;
051
052    /**
053     * TODO: make private
054     */
055    protected boolean completed;
056
057    /**
058     * Creates an instance of this class.
059     *
060     * @param channel the destination channel.
061     * @param buffer the session output buffer that can be used to store
062     *    session data for intermediate processing.
063     * @param metrics Transport metrics of the underlying HTTP transport.
064     */
065    public AbstractContentEncoder(
066            final WritableByteChannel channel,
067            final SessionOutputBuffer buffer,
068            final HttpTransportMetricsImpl metrics) {
069        super();
070        Args.notNull(channel, "Channel");
071        Args.notNull(buffer, "Session input buffer");
072        Args.notNull(metrics, "Transport metrics");
073        this.buffer = buffer;
074        this.channel = channel;
075        this.metrics = metrics;
076    }
077
078    @Override
079    public boolean isCompleted() {
080        return this.completed;
081    }
082
083    @Override
084    public void complete() throws IOException {
085        this.completed = true;
086    }
087
088    protected void assertNotCompleted() {
089        Asserts.check(!this.completed, "Encoding process already completed");
090    }
091
092    /**
093     * Flushes content of the session buffer to the channel and updates transport metrics.
094     *
095     * @return number of bytes written to the channel.
096     *
097     * @since 4.3
098     */
099    protected int flushToChannel() throws IOException {
100        if (!this.buffer.hasData()) {
101            return 0;
102        }
103        final int bytesWritten = this.buffer.flush(this.channel);
104        if (bytesWritten > 0) {
105            this.metrics.incrementBytesTransferred(bytesWritten);
106        }
107        return bytesWritten;
108    }
109
110    /**
111     * Flushes content of the given buffer to the channel and updates transport metrics.
112     *
113     * @return number of bytes written to the channel.
114     *
115     * @since 4.3
116     */
117    protected int writeToChannel(final ByteBuffer src) throws IOException {
118        if (!src.hasRemaining()) {
119            return 0;
120        }
121        final int bytesWritten = this.channel.write(src);
122        if (bytesWritten > 0) {
123            this.metrics.incrementBytesTransferred(bytesWritten);
124        }
125        return bytesWritten;
126    }
127
128    /**
129     * Transfers content of the source to the channel and updates transport metrics.
130     *
131     * @param src source.
132     * @param limit max number of bytes to transfer.
133     * @return number of bytes transferred.
134     *
135     * @since 4.3
136     */
137    protected int writeToChannel(final ByteBuffer src, final int limit) throws IOException {
138        return doWriteChunk(src, limit, true);
139    }
140
141    /**
142     * Transfers content of the source to the buffer and updates transport metrics.
143     *
144     * @param src source.
145     * @param limit max number of bytes to transfer.
146     * @return number of bytes transferred.
147     *
148     * @since 4.3
149     */
150    protected int writeToBuffer(final ByteBuffer src, final int limit) throws IOException {
151        return doWriteChunk(src, limit, false);
152    }
153
154    private int doWriteChunk(
155        final ByteBuffer src, final int chunk, final boolean direct) throws IOException {
156        final int bytesWritten;
157        if (src.remaining() > chunk) {
158            final int oldLimit = src.limit();
159            final int newLimit = oldLimit - (src.remaining() - chunk);
160            src.limit(newLimit);
161            bytesWritten = doWriteChunk(src, direct);
162            src.limit(oldLimit);
163        } else {
164            bytesWritten = doWriteChunk(src, direct);
165        }
166        return bytesWritten;
167    }
168
169    private int doWriteChunk(final ByteBuffer src, final boolean direct) throws IOException {
170        if (direct) {
171            final int bytesWritten = this.channel.write(src);
172            if (bytesWritten > 0) {
173                this.metrics.incrementBytesTransferred(bytesWritten);
174            }
175            return bytesWritten;
176        } else {
177            final int chunk = src.remaining();
178            this.buffer.write(src);
179            return chunk;
180        }
181    }
182
183}