001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.nio.protocol;
029
030import java.io.Closeable;
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Queue;
035import java.util.concurrent.ConcurrentLinkedQueue;
036import java.util.concurrent.Future;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicReference;
039
040import org.apache.http.ConnectionClosedException;
041import org.apache.http.ConnectionReuseStrategy;
042import org.apache.http.HttpException;
043import org.apache.http.HttpRequest;
044import org.apache.http.HttpResponse;
045import org.apache.http.concurrent.BasicFuture;
046import org.apache.http.concurrent.FutureCallback;
047import org.apache.http.impl.DefaultConnectionReuseStrategy;
048import org.apache.http.nio.ContentDecoder;
049import org.apache.http.nio.ContentEncoder;
050import org.apache.http.nio.IOControl;
051import org.apache.http.nio.NHttpClientConnection;
052import org.apache.http.protocol.HttpContext;
053import org.apache.http.protocol.HttpCoreContext;
054import org.apache.http.protocol.HttpProcessor;
055import org.apache.http.util.Args;
056import org.apache.http.util.Asserts;
057
058/**
059 * Pipelining implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}
060 * that executes a series of pipelined HTTP requests.
061 *
062 * @param <T> the result type of request execution.
063 * @since 4.4
064 */
065@Pipelined()
066public class PipeliningClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
067
068    private final Queue<HttpAsyncRequestProducer> requestProducerQueue;
069    private final Queue<HttpAsyncResponseConsumer<T>> responseConsumerQueue;
070    private final Queue<HttpRequest> requestQueue;
071    private final Queue<T> resultQueue;
072    private final BasicFuture<List<T>> future;
073    private final HttpContext localContext;
074    private final NHttpClientConnection conn;
075    private final HttpProcessor httppocessor;
076    private final ConnectionReuseStrategy connReuseStrategy;
077
078    private final AtomicReference<HttpAsyncRequestProducer> requestProducerRef;
079    private final AtomicReference<HttpAsyncResponseConsumer<T>> responseConsumerRef;
080    private final AtomicBoolean keepAlive;
081    private final AtomicBoolean closed;
082
083    /**
084     * Creates new instance of {@code PipeliningClientExchangeHandler}.
085     *
086     * @param requestProducers the request producers.
087     * @param responseConsumers the response consumers.
088     * @param callback the future callback invoked when the operation is completed.
089     * @param localContext the local execution context.
090     * @param conn the actual connection.
091     * @param httppocessor the HTTP protocol processor.
092     * @param connReuseStrategy the connection re-use strategy.
093     */
094    public PipeliningClientExchangeHandler(
095            final List<? extends HttpAsyncRequestProducer> requestProducers,
096            final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
097            final FutureCallback<List<T>> callback,
098            final HttpContext localContext,
099            final NHttpClientConnection conn,
100            final HttpProcessor httppocessor,
101            final ConnectionReuseStrategy connReuseStrategy) {
102        super();
103        Args.notEmpty(requestProducers, "Request producer list");
104        Args.notEmpty(responseConsumers, "Response consumer list");
105        Args.check(requestProducers.size() == responseConsumers.size(),
106                "Number of request producers does not match that of response consumers");
107        this.requestProducerQueue = new ConcurrentLinkedQueue<HttpAsyncRequestProducer>(requestProducers);
108        this.responseConsumerQueue = new ConcurrentLinkedQueue<HttpAsyncResponseConsumer<T>>(responseConsumers);
109        this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
110        this.resultQueue = new ConcurrentLinkedQueue<T>();
111        this.future = new BasicFuture<List<T>>(callback);
112        this.localContext = Args.notNull(localContext, "HTTP context");
113        this.conn = Args.notNull(conn, "HTTP connection");
114        this.httppocessor = Args.notNull(httppocessor, "HTTP processor");
115        this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
116            DefaultConnectionReuseStrategy.INSTANCE;
117        this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
118        this.requestProducerRef = new AtomicReference<HttpAsyncRequestProducer>(null);
119        this.responseConsumerRef = new AtomicReference<HttpAsyncResponseConsumer<T>>(null);
120        this.keepAlive = new AtomicBoolean(false);
121        this.closed = new AtomicBoolean(false);
122    }
123
124    /**
125     * Creates new instance of {@code PipeliningClientExchangeHandler}.
126     *
127     * @param requestProducers the request producers.
128     * @param responseConsumers the response consumers.
129     * @param localContext the local execution context.
130     * @param conn the actual connection.
131     * @param httppocessor the HTTP protocol processor.
132     */
133    public PipeliningClientExchangeHandler(
134            final List<? extends HttpAsyncRequestProducer> requestProducers,
135            final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
136            final HttpContext localContext,
137            final NHttpClientConnection conn,
138            final HttpProcessor httppocessor) {
139        this(requestProducers, responseConsumers, null, localContext, conn, httppocessor, null);
140    }
141
142    public Future<List<T>> getFuture() {
143        return this.future;
144    }
145
146    private static void closeQuietly(final Closeable closeable) {
147        if (closeable != null) {
148            try {
149                closeable.close();
150            } catch (final IOException ex) {
151            }
152        }
153    }
154
155    private void releaseResources() {
156        closeQuietly(this.requestProducerRef.getAndSet(null));
157        closeQuietly(this.responseConsumerRef.getAndSet(null));
158        while (!this.requestProducerQueue.isEmpty()) {
159            closeQuietly(this.requestProducerQueue.remove());
160        }
161        while (!this.responseConsumerQueue.isEmpty()) {
162            closeQuietly(this.responseConsumerQueue.remove());
163        }
164        this.requestQueue.clear();
165        this.resultQueue.clear();
166    }
167
168    @Override
169    public void close() throws IOException {
170        if (this.closed.compareAndSet(false, true)) {
171            releaseResources();
172            if (!this.future.isDone()) {
173                this.future.cancel();
174            }
175        }
176    }
177
178    @Override
179    public HttpRequest generateRequest() throws IOException, HttpException {
180        Asserts.check(this.requestProducerRef.get() == null, "Inconsistent state: request producer is not null");
181        final HttpAsyncRequestProducer requestProducer = this.requestProducerQueue.poll();
182        if (requestProducer == null) {
183            return null;
184        }
185        this.requestProducerRef.set(requestProducer);
186        final HttpRequest request = requestProducer.generateRequest();
187        this.httppocessor.process(request, this.localContext);
188        this.requestQueue.add(request);
189        return request;
190    }
191
192    @Override
193    public void produceContent(
194            final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
195        final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
196        Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
197        requestProducer.produceContent(encoder, ioctrl);
198    }
199
200    @Override
201    public void requestCompleted() {
202        final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.getAndSet(null);
203        Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
204        requestProducer.requestCompleted(this.localContext);
205    }
206
207    @Override
208    public void responseReceived(final HttpResponse response) throws IOException, HttpException {
209        Asserts.check(this.responseConsumerRef.get() == null, "Inconsistent state: response consumer is not null");
210
211        final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerQueue.poll();
212        Asserts.check(responseConsumer != null, "Inconsistent state: response consumer queue is empty");
213        this.responseConsumerRef.set(responseConsumer);
214
215        final HttpRequest request = this.requestQueue.poll();
216        Asserts.check(request != null, "Inconsistent state: request queue is empty");
217
218        this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
219        this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
220        this.httppocessor.process(response, this.localContext);
221
222        responseConsumer.responseReceived(response);
223        this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
224    }
225
226    @Override
227    public void consumeContent(
228            final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
229        final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
230        Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
231        responseConsumer.consumeContent(decoder, ioctrl);
232    }
233
234    @Override
235    public void responseCompleted() throws IOException {
236        final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.getAndSet(null);
237        Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
238        try {
239            if (!this.keepAlive.get()) {
240                this.conn.close();
241            }
242            responseConsumer.responseCompleted(this.localContext);
243            final T result = responseConsumer.getResult();
244            final Exception ex = responseConsumer.getException();
245            if (result != null) {
246                this.resultQueue.add(result);
247            } else {
248                this.future.failed(ex);
249                this.conn.shutdown();
250            }
251            if (!conn.isOpen()) {
252                if (this.closed.compareAndSet(false, true)) {
253                    releaseResources();
254                }
255            }
256            if (!this.future.isDone() && this.responseConsumerQueue.isEmpty()) {
257                this.future.completed(new ArrayList<T>(this.resultQueue));
258                this.resultQueue.clear();
259            }
260        } catch (final RuntimeException ex) {
261            failed(ex);
262            throw ex;
263        }
264    }
265
266    @Override
267    public void inputTerminated() {
268        failed(new ConnectionClosedException("Connection closed"));
269    }
270
271    @Override
272    public void failed(final Exception ex) {
273        if (this.closed.compareAndSet(false, true)) {
274            try {
275                final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
276                if (requestProducer != null) {
277                    requestProducer.failed(ex);
278                }
279                final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
280                if (responseConsumer != null) {
281                    responseConsumer.failed(ex);
282                }
283            } finally {
284                try {
285                    this.future.failed(ex);
286                } finally {
287                    releaseResources();
288                }
289            }
290        }
291    }
292
293    @Override
294    public boolean cancel() {
295        if (this.closed.compareAndSet(false, true)) {
296            try {
297                try {
298                    final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
299                    return responseConsumer != null && responseConsumer.cancel();
300                } finally {
301                    this.future.cancel();
302                }
303            } finally {
304                releaseResources();
305            }
306        }
307        return false;
308    }
309
310    @Override
311    public boolean isDone() {
312        return this.future.isDone();
313    }
314
315}