001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027package org.apache.http.nio.protocol;
028
029import java.io.Closeable;
030import java.io.IOException;
031import java.util.List;
032import java.util.concurrent.Future;
033
034import org.apache.http.ConnectionClosedException;
035import org.apache.http.ConnectionReuseStrategy;
036import org.apache.http.ExceptionLogger;
037import org.apache.http.HttpHost;
038import org.apache.http.annotation.ThreadingBehavior;
039import org.apache.http.annotation.Contract;
040import org.apache.http.concurrent.BasicFuture;
041import org.apache.http.concurrent.FutureCallback;
042import org.apache.http.impl.DefaultConnectionReuseStrategy;
043import org.apache.http.nio.NHttpClientConnection;
044import org.apache.http.params.HttpParams;
045import org.apache.http.pool.ConnPool;
046import org.apache.http.pool.PoolEntry;
047import org.apache.http.protocol.BasicHttpContext;
048import org.apache.http.protocol.HttpContext;
049import org.apache.http.protocol.HttpProcessor;
050import org.apache.http.util.Args;
051
052/**
053 * {@code HttpAsyncRequester} is a utility class that can be used
054 * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
055 * of asynchronous HTTP requests.
056 *
057 * @see HttpAsyncRequestExecutor
058 *
059 * @since 4.2
060 */
061@SuppressWarnings("deprecation")
062@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
063public class HttpAsyncRequester {
064
065    private final HttpProcessor httpprocessor;
066    private final ConnectionReuseStrategy connReuseStrategy;
067    private final ExceptionLogger exceptionLogger;
068
069    /**
070     * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor,
071     *   ConnectionReuseStrategy)}
072     */
073    @Deprecated
074    public HttpAsyncRequester(
075            final HttpProcessor httpprocessor,
076            final ConnectionReuseStrategy reuseStrategy,
077            final HttpParams params) {
078        this(httpprocessor, reuseStrategy);
079    }
080
081    /**
082     * Creates new instance of {@code HttpAsyncRequester}.
083     * @param httpprocessor HTTP protocol processor.
084     * @param connReuseStrategy Connection re-use strategy. If {@code null}
085     *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
086     * @param exceptionLogger Exception logger. If {@code null}
087     *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
088     *   logger will be only used to log I/O exception thrown while closing
089     *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
090     *
091     * @since 4.4
092     */
093    public HttpAsyncRequester(
094            final HttpProcessor httpprocessor,
095            final ConnectionReuseStrategy connReuseStrategy,
096            final ExceptionLogger exceptionLogger) {
097        super();
098        this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor");
099        this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
100                DefaultConnectionReuseStrategy.INSTANCE;
101        this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
102    }
103
104    /**
105     * Creates new instance of HttpAsyncRequester.
106     *
107     * @since 4.3
108     */
109    public HttpAsyncRequester(
110            final HttpProcessor httpprocessor,
111            final ConnectionReuseStrategy connReuseStrategy) {
112        this(httpprocessor, connReuseStrategy, (ExceptionLogger) null);
113    }
114
115    /**
116     * Creates new instance of HttpAsyncRequester.
117     *
118     * @since 4.3
119     */
120    public HttpAsyncRequester(final HttpProcessor httpprocessor) {
121        this(httpprocessor, null);
122    }
123
124    /**
125     * Initiates asynchronous HTTP request execution.
126     *
127     * @param <T> the result type of request execution.
128     * @param requestProducer request producer.
129     * @param responseConsumer response consumer.
130     * @param conn underlying HTTP connection.
131     * @param context HTTP context
132     * @param callback future callback.
133     * @return future representing pending completion of the operation.
134     */
135    public <T> Future<T> execute(
136            final HttpAsyncRequestProducer requestProducer,
137            final HttpAsyncResponseConsumer<T> responseConsumer,
138            final NHttpClientConnection conn,
139            final HttpContext context,
140            final FutureCallback<T> callback) {
141        Args.notNull(requestProducer, "HTTP request producer");
142        Args.notNull(responseConsumer, "HTTP response consumer");
143        Args.notNull(conn, "HTTP connection");
144        Args.notNull(context, "HTTP context");
145        final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
146                requestProducer, responseConsumer, callback, context, conn,
147                this.httpprocessor, this.connReuseStrategy);
148        initExecution(handler, conn);
149        return handler.getFuture();
150    }
151
152    private void initExecution(
153            final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
154
155        final HttpContext context = conn.getContext();
156        synchronized (context) {
157            context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
158            if (!conn.isOpen()) {
159                handler.failed(new ConnectionClosedException("Connection closed"));
160            } else {
161                conn.requestOutput();
162            }
163        }
164        if (handler.isDone()) {
165            try {
166                handler.close();
167            } catch (final IOException ex) {
168                log(ex);
169            }
170        }
171    }
172
173    /**
174     * Initiates asynchronous HTTP request execution.
175     *
176     * @param <T> the result type of request execution.
177     * @param requestProducer request producer.
178     * @param responseConsumer response consumer.
179     * @param conn underlying HTTP connection.
180     * @param context HTTP context
181     * @return future representing pending completion of the operation.
182     */
183    public <T> Future<T> execute(
184            final HttpAsyncRequestProducer requestProducer,
185            final HttpAsyncResponseConsumer<T> responseConsumer,
186            final NHttpClientConnection conn,
187            final HttpContext context) {
188        return execute(requestProducer, responseConsumer, conn, context, null);
189    }
190
191    /**
192     * Initiates asynchronous HTTP request execution.
193     *
194     * @param <T> the result type of request execution.
195     * @param requestProducer request producer.
196     * @param responseConsumer response consumer.
197     * @param conn underlying HTTP connection.
198     * @return future representing pending completion of the operation.
199     */
200    public <T> Future<T> execute(
201            final HttpAsyncRequestProducer requestProducer,
202            final HttpAsyncResponseConsumer<T> responseConsumer,
203            final NHttpClientConnection conn) {
204        return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
205    }
206
207    /**
208     * Initiates asynchronous HTTP request execution.
209     *
210     * @param <T> the result type of request execution.
211     * @param <E> the connection pool entry type.
212     * @param requestProducer request producer.
213     * @param responseConsumer response consumer.
214     * @param connPool pool of persistent reusable connections.
215     * @param context HTTP context
216     * @param callback future callback.
217     * @return future representing pending completion of the operation.
218     */
219    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
220            final HttpAsyncRequestProducer requestProducer,
221            final HttpAsyncResponseConsumer<T> responseConsumer,
222            final ConnPool<HttpHost, E> connPool,
223            final HttpContext context,
224            final FutureCallback<T> callback) {
225        Args.notNull(requestProducer, "HTTP request producer");
226        Args.notNull(responseConsumer, "HTTP response consumer");
227        Args.notNull(connPool, "HTTP connection pool");
228        Args.notNull(context, "HTTP context");
229        final BasicFuture<T> future = new BasicFuture<T>(callback);
230        final HttpHost target = requestProducer.getTarget();
231        connPool.lease(target, null, new ConnRequestCallback<T, E>(
232                future, requestProducer, responseConsumer, connPool, context));
233        return future;
234    }
235
236    /**
237     * Initiates asynchronous pipelined HTTP request execution.
238     *
239     * @param <T> the result type of request execution.
240     * @param <E> the connection pool entry type.
241     * @param target target host.
242     * @param requestProducers list of request producers.
243     * @param responseConsumers list of response consumers.
244     * @param connPool pool of persistent reusable connections.
245     * @param context HTTP context
246     * @param callback future callback.
247     * @return future representing pending completion of the operation.
248     *
249     * @since 4.4
250     */
251    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
252            final HttpHost target,
253            final List<? extends HttpAsyncRequestProducer> requestProducers,
254            final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
255            final ConnPool<HttpHost, E> connPool,
256            final HttpContext context,
257            final FutureCallback<List<T>> callback) {
258        Args.notNull(target, "HTTP target");
259        Args.notEmpty(requestProducers, "Request producer list");
260        Args.notEmpty(responseConsumers, "Response consumer list");
261        Args.notNull(connPool, "HTTP connection pool");
262        Args.notNull(context, "HTTP context");
263        final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
264        connPool.lease(target, null, new ConnPipelinedRequestCallback<T, E>(
265                future, requestProducers, responseConsumers, connPool, context));
266        return future;
267    }
268
269    /**
270     * Initiates asynchronous HTTP request execution. This method automatically releases
271     * the given pool entry once request execution is completed (successfully or unsuccessfully).
272     *
273     * @param <T> the result type of request execution.
274     * @param <E> the connection pool entry type.
275     * @param requestProducer request producer.
276     * @param responseConsumer response consumer.
277     * @param poolEntry leased pool entry. It will be automatically released
278     *   back to the pool when execution is completed.
279     * @param connPool pool of persistent reusable connections.
280     * @param context HTTP context
281     * @param callback future callback.
282     * @return future representing pending completion of the operation.
283     *
284     * @since 4.3
285     */
286    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
287            final HttpAsyncRequestProducer requestProducer,
288            final HttpAsyncResponseConsumer<T> responseConsumer,
289            final E poolEntry,
290            final ConnPool<HttpHost, E> connPool,
291            final HttpContext context,
292            final FutureCallback<T> callback) {
293        Args.notNull(requestProducer, "HTTP request producer");
294        Args.notNull(responseConsumer, "HTTP response consumer");
295        Args.notNull(connPool, "HTTP connection pool");
296        Args.notNull(poolEntry, "Pool entry");
297        Args.notNull(context, "HTTP context");
298        final BasicFuture<T> future = new BasicFuture<T>(callback);
299        final NHttpClientConnection conn = poolEntry.getConnection();
300        final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
301                requestProducer, responseConsumer,
302                new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
303                context, conn,
304                this.httpprocessor, this.connReuseStrategy);
305        initExecution(handler, conn);
306        return future;
307    }
308
309    /**
310     * Initiates asynchronous pipelined HTTP request execution. This method automatically releases
311     * the given pool entry once request execution is completed (successfully or unsuccessfully).
312     *
313     * @param <T> the result type of request execution.
314     * @param <E> the connection pool entry type.
315     * @param requestProducers list of request producers.
316     * @param responseConsumers list of response consumers.
317     * @param poolEntry leased pool entry. It will be automatically released
318     *   back to the pool when execution is completed.
319     * @param connPool pool of persistent reusable connections.
320     * @param context HTTP context
321     * @param callback future callback.
322     * @return future representing pending completion of the operation.
323     *
324     * @since 4.4
325     */
326    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
327            final List<HttpAsyncRequestProducer> requestProducers,
328            final List<HttpAsyncResponseConsumer<T>> responseConsumers,
329            final E poolEntry,
330            final ConnPool<HttpHost, E> connPool,
331            final HttpContext context,
332            final FutureCallback<List<T>> callback) {
333        Args.notEmpty(requestProducers, "Request producer list");
334        Args.notEmpty(responseConsumers, "Response consumer list");
335        Args.notNull(connPool, "HTTP connection pool");
336        Args.notNull(poolEntry, "Pool entry");
337        Args.notNull(context, "HTTP context");
338        final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
339        final NHttpClientConnection conn = poolEntry.getConnection();
340        final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
341                requestProducers, responseConsumers,
342                new RequestExecutionCallback<List<T>, E>(future, poolEntry, connPool),
343                context, conn,
344                this.httpprocessor, this.connReuseStrategy);
345        initExecution(handler, conn);
346        return future;
347    }
348
349    /**
350     * Initiates asynchronous HTTP request execution.
351     *
352     * @param <T> the result type of request execution.
353     * @param <E> the connection pool entry type.
354     * @param requestProducer request producer.
355     * @param responseConsumer response consumer.
356     * @param connPool pool of persistent reusable connections.
357     * @param context HTTP context
358     * @return future representing pending completion of the operation.
359     */
360    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
361            final HttpAsyncRequestProducer requestProducer,
362            final HttpAsyncResponseConsumer<T> responseConsumer,
363            final ConnPool<HttpHost, E> connPool,
364            final HttpContext context) {
365        return execute(requestProducer, responseConsumer, connPool, context, null);
366    }
367
368    /**
369     * Initiates asynchronous HTTP request execution.
370     *
371     * @param <T> the result type of request execution.
372     * @param <E> the connection pool entry type.
373     * @param requestProducer request producer.
374     * @param responseConsumer response consumer.
375     * @param connPool pool of persistent reusable connections.
376     * @return future representing pending completion of the operation.
377     */
378    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
379            final HttpAsyncRequestProducer requestProducer,
380            final HttpAsyncResponseConsumer<T> responseConsumer,
381            final ConnPool<HttpHost, E> connPool) {
382        return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
383    }
384
385    class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
386
387        private final BasicFuture<T> requestFuture;
388        private final HttpAsyncRequestProducer requestProducer;
389        private final HttpAsyncResponseConsumer<T> responseConsumer;
390        private final ConnPool<HttpHost, E> connPool;
391        private final HttpContext context;
392
393        ConnRequestCallback(
394                final BasicFuture<T> requestFuture,
395                final HttpAsyncRequestProducer requestProducer,
396                final HttpAsyncResponseConsumer<T> responseConsumer,
397                final ConnPool<HttpHost, E> connPool,
398                final HttpContext context) {
399            super();
400            this.requestFuture = requestFuture;
401            this.requestProducer = requestProducer;
402            this.responseConsumer = responseConsumer;
403            this.connPool = connPool;
404            this.context = context;
405        }
406
407        @Override
408        public void completed(final E result) {
409            if (this.requestFuture.isDone()) {
410                this.connPool.release(result, true);
411                return;
412            }
413            final NHttpClientConnection conn = result.getConnection();
414            final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
415                    this.requestProducer, this.responseConsumer,
416                    new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
417                    this.context, conn, httpprocessor, connReuseStrategy);
418            initExecution(handler, conn);
419        }
420
421        @Override
422        public void failed(final Exception ex) {
423            try {
424                try {
425                    this.responseConsumer.failed(ex);
426                } finally {
427                    releaseResources();
428                }
429            } finally {
430                this.requestFuture.failed(ex);
431            }
432        }
433
434        @Override
435        public void cancelled() {
436            try {
437                try {
438                    this.responseConsumer.cancel();
439                } finally {
440                    releaseResources();
441                }
442            } finally {
443                this.requestFuture.cancel(true);
444            }
445        }
446
447        public void releaseResources() {
448            close(requestProducer);
449            close(responseConsumer);
450        }
451
452    }
453
454    class ConnPipelinedRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
455
456        private final BasicFuture<List<T>> requestFuture;
457        private final List<? extends HttpAsyncRequestProducer> requestProducers;
458        private final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers;
459        private final ConnPool<HttpHost, E> connPool;
460        private final HttpContext context;
461
462        ConnPipelinedRequestCallback(
463                final BasicFuture<List<T>> requestFuture,
464                final List<? extends HttpAsyncRequestProducer> requestProducers,
465                final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
466                final ConnPool<HttpHost, E> connPool,
467                final HttpContext context) {
468            super();
469            this.requestFuture = requestFuture;
470            this.requestProducers = requestProducers;
471            this.responseConsumers = responseConsumers;
472            this.connPool = connPool;
473            this.context = context;
474        }
475
476        @Override
477        public void completed(final E result) {
478            if (this.requestFuture.isDone()) {
479                this.connPool.release(result, true);
480                return;
481            }
482            final NHttpClientConnection conn = result.getConnection();
483            final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
484                    this.requestProducers, this.responseConsumers,
485                    new RequestExecutionCallback<List<T>, E>(this.requestFuture, result, this.connPool),
486                    this.context, conn, httpprocessor, connReuseStrategy);
487            initExecution(handler, conn);
488        }
489
490        @Override
491        public void failed(final Exception ex) {
492            try {
493                try {
494                    for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
495                        responseConsumer.failed(ex);
496                    }
497                } finally {
498                    releaseResources();
499                }
500            } finally {
501                this.requestFuture.failed(ex);
502            }
503        }
504
505        @Override
506        public void cancelled() {
507            try {
508                try {
509                    for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
510                        responseConsumer.cancel();
511                    }
512                } finally {
513                    releaseResources();
514                }
515            } finally {
516                this.requestFuture.cancel(true);
517            }
518        }
519
520        public void releaseResources() {
521            for (final HttpAsyncRequestProducer requestProducer: this.requestProducers) {
522                close(requestProducer);
523            }
524            for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
525                close(responseConsumer);
526            }
527        }
528
529    }
530
531    class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
532                                               implements FutureCallback<T> {
533
534        private final BasicFuture<T> future;
535        private final E poolEntry;
536        private final ConnPool<HttpHost, E> connPool;
537
538        RequestExecutionCallback(
539                final BasicFuture<T> future,
540                final E poolEntry,
541                final ConnPool<HttpHost, E> connPool) {
542            super();
543            this.future = future;
544            this.poolEntry = poolEntry;
545            this.connPool = connPool;
546        }
547
548        @Override
549        public void completed(final T result) {
550            try {
551                this.connPool.release(this.poolEntry, true);
552            } finally {
553                this.future.completed(result);
554            }
555        }
556
557        @Override
558        public void failed(final Exception ex) {
559            try {
560                this.connPool.release(this.poolEntry, false);
561            } finally {
562                this.future.failed(ex);
563            }
564        }
565
566        @Override
567        public void cancelled() {
568            try {
569                this.connPool.release(this.poolEntry, false);
570            } finally {
571                this.future.cancel(true);
572            }
573        }
574
575    }
576
577    /**
578     * This method can be used to log I/O exception thrown while closing
579     * {@link java.io.Closeable} objects (such as
580     * {@link org.apache.http.HttpConnection}}).
581     *
582     * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
583     */
584    protected void log(final Exception ex) {
585        this.exceptionLogger.log(ex);
586    }
587
588    private void close(final Closeable closeable) {
589        try {
590            closeable.close();
591        } catch (final IOException ex) {
592            log(ex);
593        }
594    }
595
596}