001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.nio.protocol;
029
030import java.io.IOException;
031import java.util.concurrent.Future;
032import java.util.concurrent.atomic.AtomicBoolean;
033
034import org.apache.http.ConnectionClosedException;
035import org.apache.http.ConnectionReuseStrategy;
036import org.apache.http.HttpException;
037import org.apache.http.HttpRequest;
038import org.apache.http.HttpResponse;
039import org.apache.http.concurrent.BasicFuture;
040import org.apache.http.concurrent.FutureCallback;
041import org.apache.http.impl.DefaultConnectionReuseStrategy;
042import org.apache.http.nio.ContentDecoder;
043import org.apache.http.nio.ContentEncoder;
044import org.apache.http.nio.IOControl;
045import org.apache.http.nio.NHttpClientConnection;
046import org.apache.http.protocol.HttpContext;
047import org.apache.http.protocol.HttpCoreContext;
048import org.apache.http.protocol.HttpProcessor;
049import org.apache.http.util.Args;
050
051/**
052 * Basic implementation of {@link HttpAsyncClientExchangeHandler} that executes
053 * a single HTTP request / response exchange.
054 *
055 * @param <T> the result type of request execution.
056 * @since 4.3
057 */
058public class BasicAsyncClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
059
060    private final HttpAsyncRequestProducer requestProducer;
061    private final HttpAsyncResponseConsumer<T> responseConsumer;
062    private final BasicFuture<T> future;
063    private final HttpContext localContext;
064    private final NHttpClientConnection conn;
065    private final HttpProcessor httppocessor;
066    private final ConnectionReuseStrategy connReuseStrategy;
067    private final AtomicBoolean requestSent;
068    private final AtomicBoolean keepAlive;
069    private final AtomicBoolean closed;
070
071    /**
072     * Creates new instance of BasicAsyncRequestExecutionHandler.
073     *
074     * @param requestProducer the request producer.
075     * @param responseConsumer the response consumer.
076     * @param callback the future callback invoked when the operation is completed.
077     * @param localContext the local execution context.
078     * @param conn the actual connection.
079     * @param httppocessor the HTTP protocol processor.
080     * @param connReuseStrategy the connection re-use strategy.
081     */
082    public BasicAsyncClientExchangeHandler(
083            final HttpAsyncRequestProducer requestProducer,
084            final HttpAsyncResponseConsumer<T> responseConsumer,
085            final FutureCallback<T> callback,
086            final HttpContext localContext,
087            final NHttpClientConnection conn,
088            final HttpProcessor httppocessor,
089            final ConnectionReuseStrategy connReuseStrategy) {
090        super();
091        this.requestProducer = Args.notNull(requestProducer, "Request producer");
092        this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
093        this.future = new BasicFuture<T>(callback);
094        this.localContext = Args.notNull(localContext, "HTTP context");
095        this.conn = Args.notNull(conn, "HTTP connection");
096        this.httppocessor = Args.notNull(httppocessor, "HTTP processor");
097        this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
098            DefaultConnectionReuseStrategy.INSTANCE;
099        this.requestSent = new AtomicBoolean(false);
100        this.keepAlive = new AtomicBoolean(false);
101        this.closed = new AtomicBoolean(false);
102    }
103
104    /**
105     * Creates new instance of BasicAsyncRequestExecutionHandler.
106     *
107     * @param requestProducer the request producer.
108     * @param responseConsumer the response consumer.
109     * @param localContext the local execution context.
110     * @param conn the actual connection.
111     * @param httppocessor the HTTP protocol processor.
112     */
113    public BasicAsyncClientExchangeHandler(
114            final HttpAsyncRequestProducer requestProducer,
115            final HttpAsyncResponseConsumer<T> responseConsumer,
116            final HttpContext localContext,
117            final NHttpClientConnection conn,
118            final HttpProcessor httppocessor) {
119        this(requestProducer, responseConsumer, null, localContext, conn, httppocessor, null);
120    }
121
122    public Future<T> getFuture() {
123        return this.future;
124    }
125
126    private void releaseResources() {
127        try {
128            this.responseConsumer.close();
129        } catch (final IOException ex) {
130        }
131        try {
132            this.requestProducer.close();
133        } catch (final IOException ex) {
134        }
135    }
136
137    @Override
138    public void close() throws IOException {
139        if (this.closed.compareAndSet(false, true)) {
140            releaseResources();
141            if (!this.future.isDone()) {
142                this.future.cancel();
143            }
144        }
145    }
146
147    @Override
148    public HttpRequest generateRequest() throws IOException, HttpException {
149        if (isDone()) {
150            return null;
151        }
152        final HttpRequest request = this.requestProducer.generateRequest();
153        this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
154        this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
155        this.httppocessor.process(request, this.localContext);
156        return request;
157    }
158
159    @Override
160    public void produceContent(
161            final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
162        this.requestProducer.produceContent(encoder, ioctrl);
163    }
164
165    @Override
166    public void requestCompleted() {
167        this.requestProducer.requestCompleted(this.localContext);
168        this.requestSent.set(true);
169    }
170
171    @Override
172    public void responseReceived(final HttpResponse response) throws IOException, HttpException {
173        this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
174        this.httppocessor.process(response, this.localContext);
175        this.responseConsumer.responseReceived(response);
176        this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
177    }
178
179    @Override
180    public void consumeContent(
181            final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
182        this.responseConsumer.consumeContent(decoder, ioctrl);
183    }
184
185    @Override
186    public void responseCompleted() throws IOException {
187        try {
188            if (!this.keepAlive.get()) {
189                this.conn.close();
190            }
191            this.responseConsumer.responseCompleted(this.localContext);
192            final T result = this.responseConsumer.getResult();
193            final Exception ex = this.responseConsumer.getException();
194            if (result != null) {
195                this.future.completed(result);
196            } else {
197                this.future.failed(ex);
198            }
199            if (this.closed.compareAndSet(false, true)) {
200                releaseResources();
201            }
202        } catch (final RuntimeException ex) {
203            failed(ex);
204            throw ex;
205        }
206    }
207
208    @Override
209    public void inputTerminated() {
210        failed(new ConnectionClosedException("Connection closed"));
211    }
212
213    @Override
214    public void failed(final Exception ex) {
215        if (this.closed.compareAndSet(false, true)) {
216            try {
217                if (!this.requestSent.get()) {
218                    this.requestProducer.failed(ex);
219                }
220                this.responseConsumer.failed(ex);
221            } finally {
222                try {
223                    this.future.failed(ex);
224                } finally {
225                    releaseResources();
226                }
227            }
228        }
229    }
230
231    @Override
232    public boolean cancel() {
233        if (this.closed.compareAndSet(false, true)) {
234            try {
235                try {
236                    return this.responseConsumer.cancel();
237                } finally {
238                    this.future.cancel();
239                }
240            } finally {
241                releaseResources();
242            }
243        }
244        return false;
245    }
246
247    @Override
248    public boolean isDone() {
249        return this.responseConsumer.isDone();
250    }
251
252}