001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.nio.protocol; 029 030import java.io.IOException; 031import java.util.concurrent.Future; 032import java.util.concurrent.atomic.AtomicBoolean; 033 034import org.apache.http.ConnectionClosedException; 035import org.apache.http.ConnectionReuseStrategy; 036import org.apache.http.HttpException; 037import org.apache.http.HttpRequest; 038import org.apache.http.HttpResponse; 039import org.apache.http.concurrent.BasicFuture; 040import org.apache.http.concurrent.FutureCallback; 041import org.apache.http.impl.DefaultConnectionReuseStrategy; 042import org.apache.http.nio.ContentDecoder; 043import org.apache.http.nio.ContentEncoder; 044import org.apache.http.nio.IOControl; 045import org.apache.http.nio.NHttpClientConnection; 046import org.apache.http.protocol.HttpContext; 047import org.apache.http.protocol.HttpCoreContext; 048import org.apache.http.protocol.HttpProcessor; 049import org.apache.http.util.Args; 050 051/** 052 * Basic implementation of {@link HttpAsyncClientExchangeHandler} that executes 053 * a single HTTP request / response exchange. 054 * 055 * @param <T> the result type of request execution. 056 * @since 4.3 057 */ 058public class BasicAsyncClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler { 059 060 private final HttpAsyncRequestProducer requestProducer; 061 private final HttpAsyncResponseConsumer<T> responseConsumer; 062 private final BasicFuture<T> future; 063 private final HttpContext localContext; 064 private final NHttpClientConnection conn; 065 private final HttpProcessor httppocessor; 066 private final ConnectionReuseStrategy connReuseStrategy; 067 private final AtomicBoolean requestSent; 068 private final AtomicBoolean keepAlive; 069 private final AtomicBoolean closed; 070 071 /** 072 * Creates new instance of BasicAsyncRequestExecutionHandler. 073 * 074 * @param requestProducer the request producer. 075 * @param responseConsumer the response consumer. 076 * @param callback the future callback invoked when the operation is completed. 077 * @param localContext the local execution context. 078 * @param conn the actual connection. 079 * @param httppocessor the HTTP protocol processor. 080 * @param connReuseStrategy the connection re-use strategy. 081 */ 082 public BasicAsyncClientExchangeHandler( 083 final HttpAsyncRequestProducer requestProducer, 084 final HttpAsyncResponseConsumer<T> responseConsumer, 085 final FutureCallback<T> callback, 086 final HttpContext localContext, 087 final NHttpClientConnection conn, 088 final HttpProcessor httppocessor, 089 final ConnectionReuseStrategy connReuseStrategy) { 090 super(); 091 this.requestProducer = Args.notNull(requestProducer, "Request producer"); 092 this.responseConsumer = Args.notNull(responseConsumer, "Response consumer"); 093 this.future = new BasicFuture<T>(callback); 094 this.localContext = Args.notNull(localContext, "HTTP context"); 095 this.conn = Args.notNull(conn, "HTTP connection"); 096 this.httppocessor = Args.notNull(httppocessor, "HTTP processor"); 097 this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy : 098 DefaultConnectionReuseStrategy.INSTANCE; 099 this.requestSent = new AtomicBoolean(false); 100 this.keepAlive = new AtomicBoolean(false); 101 this.closed = new AtomicBoolean(false); 102 } 103 104 /** 105 * Creates new instance of BasicAsyncRequestExecutionHandler. 106 * 107 * @param requestProducer the request producer. 108 * @param responseConsumer the response consumer. 109 * @param localContext the local execution context. 110 * @param conn the actual connection. 111 * @param httppocessor the HTTP protocol processor. 112 */ 113 public BasicAsyncClientExchangeHandler( 114 final HttpAsyncRequestProducer requestProducer, 115 final HttpAsyncResponseConsumer<T> responseConsumer, 116 final HttpContext localContext, 117 final NHttpClientConnection conn, 118 final HttpProcessor httppocessor) { 119 this(requestProducer, responseConsumer, null, localContext, conn, httppocessor, null); 120 } 121 122 public Future<T> getFuture() { 123 return this.future; 124 } 125 126 private void releaseResources() { 127 try { 128 this.responseConsumer.close(); 129 } catch (final IOException ex) { 130 } 131 try { 132 this.requestProducer.close(); 133 } catch (final IOException ex) { 134 } 135 } 136 137 @Override 138 public void close() throws IOException { 139 if (this.closed.compareAndSet(false, true)) { 140 releaseResources(); 141 if (!this.future.isDone()) { 142 this.future.cancel(); 143 } 144 } 145 } 146 147 @Override 148 public HttpRequest generateRequest() throws IOException, HttpException { 149 if (isDone()) { 150 return null; 151 } 152 final HttpRequest request = this.requestProducer.generateRequest(); 153 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request); 154 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn); 155 this.httppocessor.process(request, this.localContext); 156 return request; 157 } 158 159 @Override 160 public void produceContent( 161 final ContentEncoder encoder, final IOControl ioctrl) throws IOException { 162 this.requestProducer.produceContent(encoder, ioctrl); 163 } 164 165 @Override 166 public void requestCompleted() { 167 this.requestProducer.requestCompleted(this.localContext); 168 this.requestSent.set(true); 169 } 170 171 @Override 172 public void responseReceived(final HttpResponse response) throws IOException, HttpException { 173 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response); 174 this.httppocessor.process(response, this.localContext); 175 this.responseConsumer.responseReceived(response); 176 this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext)); 177 } 178 179 @Override 180 public void consumeContent( 181 final ContentDecoder decoder, final IOControl ioctrl) throws IOException { 182 this.responseConsumer.consumeContent(decoder, ioctrl); 183 } 184 185 @Override 186 public void responseCompleted() throws IOException { 187 try { 188 if (!this.keepAlive.get()) { 189 this.conn.close(); 190 } 191 this.responseConsumer.responseCompleted(this.localContext); 192 final T result = this.responseConsumer.getResult(); 193 final Exception ex = this.responseConsumer.getException(); 194 if (result != null) { 195 this.future.completed(result); 196 } else { 197 this.future.failed(ex); 198 } 199 if (this.closed.compareAndSet(false, true)) { 200 releaseResources(); 201 } 202 } catch (final RuntimeException ex) { 203 failed(ex); 204 throw ex; 205 } 206 } 207 208 @Override 209 public void inputTerminated() { 210 failed(new ConnectionClosedException("Connection closed")); 211 } 212 213 @Override 214 public void failed(final Exception ex) { 215 if (this.closed.compareAndSet(false, true)) { 216 try { 217 if (!this.requestSent.get()) { 218 this.requestProducer.failed(ex); 219 } 220 this.responseConsumer.failed(ex); 221 } finally { 222 try { 223 this.future.failed(ex); 224 } finally { 225 releaseResources(); 226 } 227 } 228 } 229 } 230 231 @Override 232 public boolean cancel() { 233 if (this.closed.compareAndSet(false, true)) { 234 try { 235 try { 236 return this.responseConsumer.cancel(); 237 } finally { 238 this.future.cancel(); 239 } 240 } finally { 241 releaseResources(); 242 } 243 } 244 return false; 245 } 246 247 @Override 248 public boolean isDone() { 249 return this.responseConsumer.isDone(); 250 } 251 252}