001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027package org.apache.http.nio.protocol; 028 029import java.io.Closeable; 030import java.io.IOException; 031import java.util.List; 032import java.util.concurrent.Future; 033 034import org.apache.http.ConnectionClosedException; 035import org.apache.http.ConnectionReuseStrategy; 036import org.apache.http.ExceptionLogger; 037import org.apache.http.HttpHost; 038import org.apache.http.annotation.ThreadingBehavior; 039import org.apache.http.annotation.Contract; 040import org.apache.http.concurrent.BasicFuture; 041import org.apache.http.concurrent.FutureCallback; 042import org.apache.http.impl.DefaultConnectionReuseStrategy; 043import org.apache.http.nio.NHttpClientConnection; 044import org.apache.http.params.HttpParams; 045import org.apache.http.pool.ConnPool; 046import org.apache.http.pool.PoolEntry; 047import org.apache.http.protocol.BasicHttpContext; 048import org.apache.http.protocol.HttpContext; 049import org.apache.http.protocol.HttpProcessor; 050import org.apache.http.util.Args; 051 052/** 053 * {@code HttpAsyncRequester} is a utility class that can be used 054 * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution 055 * of asynchronous HTTP requests. 056 * 057 * @see HttpAsyncRequestExecutor 058 * 059 * @since 4.2 060 */ 061@SuppressWarnings("deprecation") 062@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) 063public class HttpAsyncRequester { 064 065 private final HttpProcessor httpprocessor; 066 private final ConnectionReuseStrategy connReuseStrategy; 067 private final ExceptionLogger exceptionLogger; 068 069 /** 070 * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor, 071 * ConnectionReuseStrategy)} 072 */ 073 @Deprecated 074 public HttpAsyncRequester( 075 final HttpProcessor httpprocessor, 076 final ConnectionReuseStrategy reuseStrategy, 077 final HttpParams params) { 078 this(httpprocessor, reuseStrategy); 079 } 080 081 /** 082 * Creates new instance of {@code HttpAsyncRequester}. 083 * @param httpprocessor HTTP protocol processor. 084 * @param connReuseStrategy Connection re-use strategy. If {@code null} 085 * {@link DefaultConnectionReuseStrategy#INSTANCE} will be used. 086 * @param exceptionLogger Exception logger. If {@code null} 087 * {@link ExceptionLogger#NO_OP} will be used. Please note that the exception 088 * logger will be only used to log I/O exception thrown while closing 089 * {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}). 090 * 091 * @since 4.4 092 */ 093 public HttpAsyncRequester( 094 final HttpProcessor httpprocessor, 095 final ConnectionReuseStrategy connReuseStrategy, 096 final ExceptionLogger exceptionLogger) { 097 super(); 098 this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor"); 099 this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy : 100 DefaultConnectionReuseStrategy.INSTANCE; 101 this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP; 102 } 103 104 /** 105 * Creates new instance of HttpAsyncRequester. 106 * 107 * @since 4.3 108 */ 109 public HttpAsyncRequester( 110 final HttpProcessor httpprocessor, 111 final ConnectionReuseStrategy connReuseStrategy) { 112 this(httpprocessor, connReuseStrategy, (ExceptionLogger) null); 113 } 114 115 /** 116 * Creates new instance of HttpAsyncRequester. 117 * 118 * @since 4.3 119 */ 120 public HttpAsyncRequester(final HttpProcessor httpprocessor) { 121 this(httpprocessor, null); 122 } 123 124 /** 125 * Initiates asynchronous HTTP request execution. 126 * 127 * @param <T> the result type of request execution. 128 * @param requestProducer request producer. 129 * @param responseConsumer response consumer. 130 * @param conn underlying HTTP connection. 131 * @param context HTTP context 132 * @param callback future callback. 133 * @return future representing pending completion of the operation. 134 */ 135 public <T> Future<T> execute( 136 final HttpAsyncRequestProducer requestProducer, 137 final HttpAsyncResponseConsumer<T> responseConsumer, 138 final NHttpClientConnection conn, 139 final HttpContext context, 140 final FutureCallback<T> callback) { 141 Args.notNull(requestProducer, "HTTP request producer"); 142 Args.notNull(responseConsumer, "HTTP response consumer"); 143 Args.notNull(conn, "HTTP connection"); 144 Args.notNull(context, "HTTP context"); 145 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>( 146 requestProducer, responseConsumer, callback, context, conn, 147 this.httpprocessor, this.connReuseStrategy); 148 initExecution(handler, conn); 149 return handler.getFuture(); 150 } 151 152 private void initExecution( 153 final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) { 154 155 final HttpContext context = conn.getContext(); 156 synchronized (context) { 157 context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler); 158 if (!conn.isOpen()) { 159 handler.failed(new ConnectionClosedException("Connection closed")); 160 } else { 161 conn.requestOutput(); 162 } 163 } 164 if (handler.isDone()) { 165 try { 166 handler.close(); 167 } catch (final IOException ex) { 168 log(ex); 169 } 170 } 171 } 172 173 /** 174 * Initiates asynchronous HTTP request execution. 175 * 176 * @param <T> the result type of request execution. 177 * @param requestProducer request producer. 178 * @param responseConsumer response consumer. 179 * @param conn underlying HTTP connection. 180 * @param context HTTP context 181 * @return future representing pending completion of the operation. 182 */ 183 public <T> Future<T> execute( 184 final HttpAsyncRequestProducer requestProducer, 185 final HttpAsyncResponseConsumer<T> responseConsumer, 186 final NHttpClientConnection conn, 187 final HttpContext context) { 188 return execute(requestProducer, responseConsumer, conn, context, null); 189 } 190 191 /** 192 * Initiates asynchronous HTTP request execution. 193 * 194 * @param <T> the result type of request execution. 195 * @param requestProducer request producer. 196 * @param responseConsumer response consumer. 197 * @param conn underlying HTTP connection. 198 * @return future representing pending completion of the operation. 199 */ 200 public <T> Future<T> execute( 201 final HttpAsyncRequestProducer requestProducer, 202 final HttpAsyncResponseConsumer<T> responseConsumer, 203 final NHttpClientConnection conn) { 204 return execute(requestProducer, responseConsumer, conn, new BasicHttpContext()); 205 } 206 207 /** 208 * Initiates asynchronous HTTP request execution. 209 * 210 * @param <T> the result type of request execution. 211 * @param <E> the connection pool entry type. 212 * @param requestProducer request producer. 213 * @param responseConsumer response consumer. 214 * @param connPool pool of persistent reusable connections. 215 * @param context HTTP context 216 * @param callback future callback. 217 * @return future representing pending completion of the operation. 218 */ 219 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute( 220 final HttpAsyncRequestProducer requestProducer, 221 final HttpAsyncResponseConsumer<T> responseConsumer, 222 final ConnPool<HttpHost, E> connPool, 223 final HttpContext context, 224 final FutureCallback<T> callback) { 225 Args.notNull(requestProducer, "HTTP request producer"); 226 Args.notNull(responseConsumer, "HTTP response consumer"); 227 Args.notNull(connPool, "HTTP connection pool"); 228 Args.notNull(context, "HTTP context"); 229 final BasicFuture<T> future = new BasicFuture<T>(callback); 230 final HttpHost target = requestProducer.getTarget(); 231 connPool.lease(target, null, new ConnRequestCallback<T, E>( 232 future, requestProducer, responseConsumer, connPool, context)); 233 return future; 234 } 235 236 /** 237 * Initiates asynchronous pipelined HTTP request execution. 238 * 239 * @param <T> the result type of request execution. 240 * @param <E> the connection pool entry type. 241 * @param target target host. 242 * @param requestProducers list of request producers. 243 * @param responseConsumers list of response consumers. 244 * @param connPool pool of persistent reusable connections. 245 * @param context HTTP context 246 * @param callback future callback. 247 * @return future representing pending completion of the operation. 248 * 249 * @since 4.4 250 */ 251 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined( 252 final HttpHost target, 253 final List<? extends HttpAsyncRequestProducer> requestProducers, 254 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers, 255 final ConnPool<HttpHost, E> connPool, 256 final HttpContext context, 257 final FutureCallback<List<T>> callback) { 258 Args.notNull(target, "HTTP target"); 259 Args.notEmpty(requestProducers, "Request producer list"); 260 Args.notEmpty(responseConsumers, "Response consumer list"); 261 Args.notNull(connPool, "HTTP connection pool"); 262 Args.notNull(context, "HTTP context"); 263 final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback); 264 connPool.lease(target, null, new ConnPipelinedRequestCallback<T, E>( 265 future, requestProducers, responseConsumers, connPool, context)); 266 return future; 267 } 268 269 /** 270 * Initiates asynchronous HTTP request execution. This method automatically releases 271 * the given pool entry once request execution is completed (successfully or unsuccessfully). 272 * 273 * @param <T> the result type of request execution. 274 * @param <E> the connection pool entry type. 275 * @param requestProducer request producer. 276 * @param responseConsumer response consumer. 277 * @param poolEntry leased pool entry. It will be automatically released 278 * back to the pool when execution is completed. 279 * @param connPool pool of persistent reusable connections. 280 * @param context HTTP context 281 * @param callback future callback. 282 * @return future representing pending completion of the operation. 283 * 284 * @since 4.3 285 */ 286 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute( 287 final HttpAsyncRequestProducer requestProducer, 288 final HttpAsyncResponseConsumer<T> responseConsumer, 289 final E poolEntry, 290 final ConnPool<HttpHost, E> connPool, 291 final HttpContext context, 292 final FutureCallback<T> callback) { 293 Args.notNull(requestProducer, "HTTP request producer"); 294 Args.notNull(responseConsumer, "HTTP response consumer"); 295 Args.notNull(connPool, "HTTP connection pool"); 296 Args.notNull(poolEntry, "Pool entry"); 297 Args.notNull(context, "HTTP context"); 298 final BasicFuture<T> future = new BasicFuture<T>(callback); 299 final NHttpClientConnection conn = poolEntry.getConnection(); 300 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>( 301 requestProducer, responseConsumer, 302 new RequestExecutionCallback<T, E>(future, poolEntry, connPool), 303 context, conn, 304 this.httpprocessor, this.connReuseStrategy); 305 initExecution(handler, conn); 306 return future; 307 } 308 309 /** 310 * Initiates asynchronous pipelined HTTP request execution. This method automatically releases 311 * the given pool entry once request execution is completed (successfully or unsuccessfully). 312 * 313 * @param <T> the result type of request execution. 314 * @param <E> the connection pool entry type. 315 * @param requestProducers list of request producers. 316 * @param responseConsumers list of response consumers. 317 * @param poolEntry leased pool entry. It will be automatically released 318 * back to the pool when execution is completed. 319 * @param connPool pool of persistent reusable connections. 320 * @param context HTTP context 321 * @param callback future callback. 322 * @return future representing pending completion of the operation. 323 * 324 * @since 4.4 325 */ 326 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined( 327 final List<HttpAsyncRequestProducer> requestProducers, 328 final List<HttpAsyncResponseConsumer<T>> responseConsumers, 329 final E poolEntry, 330 final ConnPool<HttpHost, E> connPool, 331 final HttpContext context, 332 final FutureCallback<List<T>> callback) { 333 Args.notEmpty(requestProducers, "Request producer list"); 334 Args.notEmpty(responseConsumers, "Response consumer list"); 335 Args.notNull(connPool, "HTTP connection pool"); 336 Args.notNull(poolEntry, "Pool entry"); 337 Args.notNull(context, "HTTP context"); 338 final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback); 339 final NHttpClientConnection conn = poolEntry.getConnection(); 340 final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>( 341 requestProducers, responseConsumers, 342 new RequestExecutionCallback<List<T>, E>(future, poolEntry, connPool), 343 context, conn, 344 this.httpprocessor, this.connReuseStrategy); 345 initExecution(handler, conn); 346 return future; 347 } 348 349 /** 350 * Initiates asynchronous HTTP request execution. 351 * 352 * @param <T> the result type of request execution. 353 * @param <E> the connection pool entry type. 354 * @param requestProducer request producer. 355 * @param responseConsumer response consumer. 356 * @param connPool pool of persistent reusable connections. 357 * @param context HTTP context 358 * @return future representing pending completion of the operation. 359 */ 360 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute( 361 final HttpAsyncRequestProducer requestProducer, 362 final HttpAsyncResponseConsumer<T> responseConsumer, 363 final ConnPool<HttpHost, E> connPool, 364 final HttpContext context) { 365 return execute(requestProducer, responseConsumer, connPool, context, null); 366 } 367 368 /** 369 * Initiates asynchronous HTTP request execution. 370 * 371 * @param <T> the result type of request execution. 372 * @param <E> the connection pool entry type. 373 * @param requestProducer request producer. 374 * @param responseConsumer response consumer. 375 * @param connPool pool of persistent reusable connections. 376 * @return future representing pending completion of the operation. 377 */ 378 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute( 379 final HttpAsyncRequestProducer requestProducer, 380 final HttpAsyncResponseConsumer<T> responseConsumer, 381 final ConnPool<HttpHost, E> connPool) { 382 return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext()); 383 } 384 385 class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> { 386 387 private final BasicFuture<T> requestFuture; 388 private final HttpAsyncRequestProducer requestProducer; 389 private final HttpAsyncResponseConsumer<T> responseConsumer; 390 private final ConnPool<HttpHost, E> connPool; 391 private final HttpContext context; 392 393 ConnRequestCallback( 394 final BasicFuture<T> requestFuture, 395 final HttpAsyncRequestProducer requestProducer, 396 final HttpAsyncResponseConsumer<T> responseConsumer, 397 final ConnPool<HttpHost, E> connPool, 398 final HttpContext context) { 399 super(); 400 this.requestFuture = requestFuture; 401 this.requestProducer = requestProducer; 402 this.responseConsumer = responseConsumer; 403 this.connPool = connPool; 404 this.context = context; 405 } 406 407 @Override 408 public void completed(final E result) { 409 if (this.requestFuture.isDone()) { 410 this.connPool.release(result, true); 411 return; 412 } 413 final NHttpClientConnection conn = result.getConnection(); 414 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>( 415 this.requestProducer, this.responseConsumer, 416 new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool), 417 this.context, conn, httpprocessor, connReuseStrategy); 418 initExecution(handler, conn); 419 } 420 421 @Override 422 public void failed(final Exception ex) { 423 try { 424 try { 425 this.responseConsumer.failed(ex); 426 } finally { 427 releaseResources(); 428 } 429 } finally { 430 this.requestFuture.failed(ex); 431 } 432 } 433 434 @Override 435 public void cancelled() { 436 try { 437 try { 438 this.responseConsumer.cancel(); 439 } finally { 440 releaseResources(); 441 } 442 } finally { 443 this.requestFuture.cancel(true); 444 } 445 } 446 447 public void releaseResources() { 448 close(requestProducer); 449 close(responseConsumer); 450 } 451 452 } 453 454 class ConnPipelinedRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> { 455 456 private final BasicFuture<List<T>> requestFuture; 457 private final List<? extends HttpAsyncRequestProducer> requestProducers; 458 private final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers; 459 private final ConnPool<HttpHost, E> connPool; 460 private final HttpContext context; 461 462 ConnPipelinedRequestCallback( 463 final BasicFuture<List<T>> requestFuture, 464 final List<? extends HttpAsyncRequestProducer> requestProducers, 465 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers, 466 final ConnPool<HttpHost, E> connPool, 467 final HttpContext context) { 468 super(); 469 this.requestFuture = requestFuture; 470 this.requestProducers = requestProducers; 471 this.responseConsumers = responseConsumers; 472 this.connPool = connPool; 473 this.context = context; 474 } 475 476 @Override 477 public void completed(final E result) { 478 if (this.requestFuture.isDone()) { 479 this.connPool.release(result, true); 480 return; 481 } 482 final NHttpClientConnection conn = result.getConnection(); 483 final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>( 484 this.requestProducers, this.responseConsumers, 485 new RequestExecutionCallback<List<T>, E>(this.requestFuture, result, this.connPool), 486 this.context, conn, httpprocessor, connReuseStrategy); 487 initExecution(handler, conn); 488 } 489 490 @Override 491 public void failed(final Exception ex) { 492 try { 493 try { 494 for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) { 495 responseConsumer.failed(ex); 496 } 497 } finally { 498 releaseResources(); 499 } 500 } finally { 501 this.requestFuture.failed(ex); 502 } 503 } 504 505 @Override 506 public void cancelled() { 507 try { 508 try { 509 for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) { 510 responseConsumer.cancel(); 511 } 512 } finally { 513 releaseResources(); 514 } 515 } finally { 516 this.requestFuture.cancel(true); 517 } 518 } 519 520 public void releaseResources() { 521 for (final HttpAsyncRequestProducer requestProducer: this.requestProducers) { 522 close(requestProducer); 523 } 524 for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) { 525 close(responseConsumer); 526 } 527 } 528 529 } 530 531 class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> 532 implements FutureCallback<T> { 533 534 private final BasicFuture<T> future; 535 private final E poolEntry; 536 private final ConnPool<HttpHost, E> connPool; 537 538 RequestExecutionCallback( 539 final BasicFuture<T> future, 540 final E poolEntry, 541 final ConnPool<HttpHost, E> connPool) { 542 super(); 543 this.future = future; 544 this.poolEntry = poolEntry; 545 this.connPool = connPool; 546 } 547 548 @Override 549 public void completed(final T result) { 550 try { 551 this.connPool.release(this.poolEntry, true); 552 } finally { 553 this.future.completed(result); 554 } 555 } 556 557 @Override 558 public void failed(final Exception ex) { 559 try { 560 this.connPool.release(this.poolEntry, false); 561 } finally { 562 this.future.failed(ex); 563 } 564 } 565 566 @Override 567 public void cancelled() { 568 try { 569 this.connPool.release(this.poolEntry, false); 570 } finally { 571 this.future.cancel(true); 572 } 573 } 574 575 } 576 577 /** 578 * This method can be used to log I/O exception thrown while closing 579 * {@link java.io.Closeable} objects (such as 580 * {@link org.apache.http.HttpConnection}}). 581 * 582 * @param ex I/O exception thrown by {@link java.io.Closeable#close()} 583 */ 584 protected void log(final Exception ex) { 585 this.exceptionLogger.log(ex); 586 } 587 588 private void close(final Closeable closeable) { 589 try { 590 closeable.close(); 591 } catch (final IOException ex) { 592 log(ex); 593 } 594 } 595 596}