001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.codecs; 029 030import java.io.IOException; 031import java.nio.ByteBuffer; 032import java.nio.channels.WritableByteChannel; 033 034import org.apache.http.impl.io.HttpTransportMetricsImpl; 035import org.apache.http.nio.ContentEncoder; 036import org.apache.http.nio.reactor.SessionOutputBuffer; 037import org.apache.http.util.Args; 038import org.apache.http.util.Asserts; 039 040/** 041 * Abstract {@link ContentEncoder} that serves as a base for all content 042 * encoder implementations. 043 * 044 * @since 4.0 045 */ 046public abstract class AbstractContentEncoder implements ContentEncoder { 047 048 protected final WritableByteChannel channel; 049 protected final SessionOutputBuffer buffer; 050 protected final HttpTransportMetricsImpl metrics; 051 052 /** 053 * TODO: make private 054 */ 055 protected boolean completed; 056 057 /** 058 * Creates an instance of this class. 059 * 060 * @param channel the destination channel. 061 * @param buffer the session output buffer that can be used to store 062 * session data for intermediate processing. 063 * @param metrics Transport metrics of the underlying HTTP transport. 064 */ 065 public AbstractContentEncoder( 066 final WritableByteChannel channel, 067 final SessionOutputBuffer buffer, 068 final HttpTransportMetricsImpl metrics) { 069 super(); 070 Args.notNull(channel, "Channel"); 071 Args.notNull(buffer, "Session input buffer"); 072 Args.notNull(metrics, "Transport metrics"); 073 this.buffer = buffer; 074 this.channel = channel; 075 this.metrics = metrics; 076 } 077 078 @Override 079 public boolean isCompleted() { 080 return this.completed; 081 } 082 083 @Override 084 public void complete() throws IOException { 085 this.completed = true; 086 } 087 088 protected void assertNotCompleted() { 089 Asserts.check(!this.completed, "Encoding process already completed"); 090 } 091 092 /** 093 * Flushes content of the session buffer to the channel and updates transport metrics. 094 * 095 * @return number of bytes written to the channel. 096 * 097 * @since 4.3 098 */ 099 protected int flushToChannel() throws IOException { 100 if (!this.buffer.hasData()) { 101 return 0; 102 } 103 final int bytesWritten = this.buffer.flush(this.channel); 104 if (bytesWritten > 0) { 105 this.metrics.incrementBytesTransferred(bytesWritten); 106 } 107 return bytesWritten; 108 } 109 110 /** 111 * Flushes content of the given buffer to the channel and updates transport metrics. 112 * 113 * @return number of bytes written to the channel. 114 * 115 * @since 4.3 116 */ 117 protected int writeToChannel(final ByteBuffer src) throws IOException { 118 if (!src.hasRemaining()) { 119 return 0; 120 } 121 final int bytesWritten = this.channel.write(src); 122 if (bytesWritten > 0) { 123 this.metrics.incrementBytesTransferred(bytesWritten); 124 } 125 return bytesWritten; 126 } 127 128 /** 129 * Transfers content of the source to the channel and updates transport metrics. 130 * 131 * @param src source. 132 * @param limit max number of bytes to transfer. 133 * @return number of bytes transferred. 134 * 135 * @since 4.3 136 */ 137 protected int writeToChannel(final ByteBuffer src, final int limit) throws IOException { 138 return doWriteChunk(src, limit, true); 139 } 140 141 /** 142 * Transfers content of the source to the buffer and updates transport metrics. 143 * 144 * @param src source. 145 * @param limit max number of bytes to transfer. 146 * @return number of bytes transferred. 147 * 148 * @since 4.3 149 */ 150 protected int writeToBuffer(final ByteBuffer src, final int limit) throws IOException { 151 return doWriteChunk(src, limit, false); 152 } 153 154 private int doWriteChunk( 155 final ByteBuffer src, final int chunk, final boolean direct) throws IOException { 156 final int bytesWritten; 157 if (src.remaining() > chunk) { 158 final int oldLimit = src.limit(); 159 final int newLimit = oldLimit - (src.remaining() - chunk); 160 src.limit(newLimit); 161 bytesWritten = doWriteChunk(src, direct); 162 src.limit(oldLimit); 163 } else { 164 bytesWritten = doWriteChunk(src, direct); 165 } 166 return bytesWritten; 167 } 168 169 private int doWriteChunk(final ByteBuffer src, final boolean direct) throws IOException { 170 if (direct) { 171 final int bytesWritten = this.channel.write(src); 172 if (bytesWritten > 0) { 173 this.metrics.incrementBytesTransferred(bytesWritten); 174 } 175 return bytesWritten; 176 } else { 177 final int chunk = src.remaining(); 178 this.buffer.write(src); 179 return chunk; 180 } 181 } 182 183}