001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.nio.protocol;
029
030import java.io.IOException;
031import java.net.SocketTimeoutException;
032import java.util.Queue;
033import java.util.concurrent.ConcurrentLinkedQueue;
034
035import org.apache.http.ConnectionClosedException;
036import org.apache.http.ExceptionLogger;
037import org.apache.http.HttpEntity;
038import org.apache.http.HttpEntityEnclosingRequest;
039import org.apache.http.HttpException;
040import org.apache.http.HttpRequest;
041import org.apache.http.HttpResponse;
042import org.apache.http.HttpStatus;
043import org.apache.http.HttpVersion;
044import org.apache.http.ProtocolException;
045import org.apache.http.ProtocolVersion;
046import org.apache.http.annotation.Contract;
047import org.apache.http.annotation.ThreadingBehavior;
048import org.apache.http.nio.ContentDecoder;
049import org.apache.http.nio.ContentEncoder;
050import org.apache.http.nio.NHttpClientConnection;
051import org.apache.http.nio.NHttpClientEventHandler;
052import org.apache.http.nio.NHttpConnection;
053import org.apache.http.protocol.HttpContext;
054import org.apache.http.util.Args;
055import org.apache.http.util.Asserts;
056
057/**
058 * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side
059 * protocol handler based on the NIO (non-blocking) I/O model.
060 * {@code HttpAsyncRequestExecutor} translates individual events fired through
061 * the {@link NHttpClientEventHandler} interface into logically related HTTP
062 * message exchanges.
063 * <p> The caller is expected to pass an instance of
064 * {@link HttpAsyncClientExchangeHandler} to be used for the next series
065 * of HTTP message exchanges through the connection context using
066 * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
067 * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
068 * returns {@code true}. The {@link HttpAsyncRequester} utility class can
069 * be used to facilitate initiation of asynchronous HTTP request execution.
070 * <p>
071 * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of
072 * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
073 * headers for all outgoing messages and apply common, cross-cutting message
074 * transformations to all incoming and outgoing messages.
075 * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of
076 * application specific content generation and processing to
077 * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
078 *
079 * @see HttpAsyncClientExchangeHandler
080 *
081 * @since 4.2
082 */
083@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
084public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
085
086    public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
087    public static final String HTTP_HANDLER = "http.nio.exchange-handler";
088
089    private final int waitForContinue;
090    private final ExceptionLogger exceptionLogger;
091
092    /**
093     * Creates new instance of {@code HttpAsyncRequestExecutor}.
094     * @param waitForContinue wait for continue time period.
095     * @param exceptionLogger Exception logger. If {@code null}
096     *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
097     *   logger will be only used to log I/O exception thrown while closing
098     *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
099     *
100     * @since 4.4
101     */
102    public HttpAsyncRequestExecutor(
103            final int waitForContinue,
104            final ExceptionLogger exceptionLogger) {
105        super();
106        this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
107        this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
108    }
109
110    /**
111     * Creates new instance of HttpAsyncRequestExecutor.
112     *
113     * @since 4.3
114     */
115    public HttpAsyncRequestExecutor(final int waitForContinue) {
116        this(waitForContinue, null);
117    }
118
119    public HttpAsyncRequestExecutor() {
120        this(DEFAULT_WAIT_FOR_CONTINUE, null);
121    }
122
123    @Override
124    public void connected(
125            final NHttpClientConnection conn,
126            final Object attachment) throws IOException, HttpException {
127        final State state = new State();
128        final HttpContext context = conn.getContext();
129        context.setAttribute(HTTP_EXCHANGE_STATE, state);
130        requestReady(conn);
131    }
132
133    @Override
134    public void closed(final NHttpClientConnection conn) {
135        final State state = getState(conn);
136        final HttpAsyncClientExchangeHandler handler = getHandler(conn);
137        if (state != null) {
138            if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
139                if (handler != null) {
140                    handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
141                }
142            }
143        }
144        if (state == null || (handler != null && handler.isDone())) {
145            closeHandler(handler);
146        }
147    }
148
149    @Override
150    public void exception(
151            final NHttpClientConnection conn, final Exception cause) {
152        shutdownConnection(conn);
153        final HttpAsyncClientExchangeHandler handler = getHandler(conn);
154        if (handler != null) {
155            handler.failed(cause);
156        } else {
157            log(cause);
158        }
159    }
160
161    @Override
162    public void requestReady(
163            final NHttpClientConnection conn) throws IOException, HttpException {
164        final State state = getState(conn);
165        Asserts.notNull(state, "Connection state");
166        Asserts.check(state.getRequestState() == MessageState.READY ||
167                        state.getRequestState() == MessageState.COMPLETED,
168                "Unexpected request state %s", state.getRequestState());
169
170        if (state.getRequestState() == MessageState.COMPLETED) {
171            conn.suspendOutput();
172            return;
173        }
174        final HttpContext context = conn.getContext();
175        final HttpAsyncClientExchangeHandler handler;
176        synchronized (context) {
177            handler = getHandler(conn);
178            if (handler == null || handler.isDone()) {
179                conn.suspendOutput();
180                return;
181            }
182        }
183        final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
184
185        final HttpRequest request = handler.generateRequest();
186        if (request == null) {
187            conn.suspendOutput();
188            return;
189        }
190        final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
191        if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
192            throw new ProtocolException(version + " cannot be used with request pipelining");
193        }
194        state.setRequest(request);
195        if (pipelined) {
196            state.getRequestQueue().add(request);
197        }
198        if (request instanceof HttpEntityEnclosingRequest) {
199            final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
200            if (expectContinue && pipelined) {
201                throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
202            }
203            conn.submitRequest(request);
204            if (expectContinue) {
205                final int timeout = conn.getSocketTimeout();
206                state.setTimeout(timeout);
207                conn.setSocketTimeout(this.waitForContinue);
208                state.setRequestState(MessageState.ACK_EXPECTED);
209            } else {
210                final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
211                if (entity != null) {
212                    state.setRequestState(MessageState.BODY_STREAM);
213                } else {
214                    handler.requestCompleted();
215                    state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
216                }
217            }
218        } else {
219            conn.submitRequest(request);
220            handler.requestCompleted();
221            state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
222        }
223    }
224
225    @Override
226    public void outputReady(
227            final NHttpClientConnection conn,
228            final ContentEncoder encoder) throws IOException, HttpException {
229        final State state = getState(conn);
230        Asserts.notNull(state, "Connection state");
231        Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
232                        state.getRequestState() == MessageState.ACK_EXPECTED,
233                "Unexpected request state %s", state.getRequestState());
234
235        final HttpAsyncClientExchangeHandler handler = getHandler(conn);
236        Asserts.notNull(handler, "Client exchange handler");
237        if (state.getRequestState() == MessageState.ACK_EXPECTED) {
238            conn.suspendOutput();
239            return;
240        }
241        handler.produceContent(encoder, conn);
242        if (encoder.isCompleted()) {
243            handler.requestCompleted();
244            final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
245            state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
246        }
247    }
248
249    @Override
250    public void responseReceived(
251            final NHttpClientConnection conn) throws HttpException, IOException {
252        final State state = getState(conn);
253        Asserts.notNull(state, "Connection state");
254        Asserts.check(state.getResponseState() == MessageState.READY,
255                "Unexpected request state %s", state.getResponseState());
256
257        final HttpAsyncClientExchangeHandler handler = getHandler(conn);
258        Asserts.notNull(handler, "Client exchange handler");
259
260        final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
261        final HttpRequest request;
262        if (pipelined) {
263            request = state.getRequestQueue().poll();
264            Asserts.notNull(request, "HTTP request");
265        } else {
266            request = state.getRequest();
267            if (request == null) {
268                throw new HttpException("Out of sequence response");
269            }
270        }
271
272        final HttpResponse response = conn.getHttpResponse();
273
274        final int statusCode = response.getStatusLine().getStatusCode();
275        if (statusCode < HttpStatus.SC_OK) {
276            // 1xx intermediate response
277            if (statusCode != HttpStatus.SC_CONTINUE) {
278                throw new ProtocolException(
279                        "Unexpected response: " + response.getStatusLine());
280            }
281            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
282                final int timeout = state.getTimeout();
283                conn.setSocketTimeout(timeout);
284                conn.requestOutput();
285                state.setRequestState(MessageState.BODY_STREAM);
286            }
287            return;
288        }
289        state.setResponse(response);
290        if (state.getRequestState() == MessageState.ACK_EXPECTED) {
291            final int timeout = state.getTimeout();
292            conn.setSocketTimeout(timeout);
293            conn.resetOutput();
294            state.setRequestState(MessageState.COMPLETED);
295        } else if (state.getRequestState() == MessageState.BODY_STREAM) {
296            // Early response
297            if (statusCode >= 400) {
298                conn.resetOutput();
299                conn.suspendOutput();
300                state.setRequestState(MessageState.COMPLETED);
301                state.invalidate();
302            }
303        }
304
305        if (canResponseHaveBody(request, response)) {
306            handler.responseReceived(response);
307            state.setResponseState(MessageState.BODY_STREAM);
308        } else {
309            response.setEntity(null);
310            handler.responseReceived(response);
311            conn.resetInput();
312            processResponse(conn, state, handler);
313        }
314    }
315
316    @Override
317    public void inputReady(
318            final NHttpClientConnection conn,
319            final ContentDecoder decoder) throws IOException, HttpException {
320        final State state = getState(conn);
321        Asserts.notNull(state, "Connection state");
322        Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
323                "Unexpected request state %s", state.getResponseState());
324
325        final HttpAsyncClientExchangeHandler handler = getHandler(conn);
326        Asserts.notNull(handler, "Client exchange handler");
327        handler.consumeContent(decoder, conn);
328        if (decoder.isCompleted()) {
329            processResponse(conn, state, handler);
330        }
331    }
332
333    @Override
334    public void endOfInput(final NHttpClientConnection conn) throws IOException {
335        final State state = getState(conn);
336        final HttpContext context = conn.getContext();
337        synchronized (context) {
338            if (state != null) {
339                if (state.getRequestState().compareTo(MessageState.READY) != 0) {
340                    state.invalidate();
341                }
342                final HttpAsyncClientExchangeHandler handler = getHandler(conn);
343                if (handler != null) {
344                    if (state.isValid()) {
345                        handler.inputTerminated();
346                    } else {
347                        handler.failed(new ConnectionClosedException("Connection closed"));
348                    }
349                }
350            }
351            // Closing connection in an orderly manner and
352            // waiting for output buffer to get flushed.
353            // Do not want to wait indefinitely, though, in case
354            // the opposite end is not reading
355            if (conn.getSocketTimeout() <= 0) {
356                conn.setSocketTimeout(1000);
357            }
358            conn.close();
359        }
360    }
361
362    @Override
363    public void timeout(
364            final NHttpClientConnection conn) throws IOException {
365        final State state = getState(conn);
366        if (state != null) {
367            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
368                final int timeout = state.getTimeout();
369                conn.setSocketTimeout(timeout);
370                conn.requestOutput();
371                state.setRequestState(MessageState.BODY_STREAM);
372                state.setTimeout(0);
373                return;
374            }
375            state.invalidate();
376            final HttpAsyncClientExchangeHandler handler = getHandler(conn);
377            if (handler != null) {
378                handler.failed(new SocketTimeoutException(
379                        String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
380                handler.close();
381            }
382        }
383        if (conn.getStatus() == NHttpConnection.ACTIVE) {
384            conn.close();
385            if (conn.getStatus() == NHttpConnection.CLOSING) {
386                // Give the connection some grace time to
387                // close itself nicely
388                conn.setSocketTimeout(250);
389            }
390        } else {
391            conn.shutdown();
392        }
393    }
394
395    /**
396     * This method can be used to log I/O exception thrown while closing
397     * {@link java.io.Closeable} objects (such as
398     * {@link org.apache.http.HttpConnection}}).
399     *
400     * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
401     */
402    protected void log(final Exception ex) {
403        this.exceptionLogger.log(ex);
404    }
405
406    private static State getState(final NHttpConnection conn) {
407        return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
408    }
409
410    private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
411        return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
412    }
413
414    private void shutdownConnection(final NHttpConnection conn) {
415        try {
416            conn.shutdown();
417        } catch (final IOException ex) {
418            log(ex);
419        }
420    }
421
422    private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
423        if (handler != null) {
424            try {
425                handler.close();
426            } catch (final IOException ioex) {
427                log(ioex);
428            }
429        }
430    }
431
432    private void processResponse(
433            final NHttpClientConnection conn,
434            final State state,
435            final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
436        if (!state.isValid()) {
437            conn.close();
438        }
439        handler.responseCompleted();
440
441        final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
442        if (!pipelined) {
443            state.setRequestState(MessageState.READY);
444            state.setRequest(null);
445        }
446        state.setResponseState(MessageState.READY);
447        state.setResponse(null);
448        if (!handler.isDone() && conn.isOpen()) {
449            conn.requestOutput();
450        }
451    }
452
453    private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
454
455        final String method = request.getRequestLine().getMethod();
456        final int status = response.getStatusLine().getStatusCode();
457
458        if (method.equalsIgnoreCase("HEAD")) {
459            return false;
460        }
461        if (method.equalsIgnoreCase("CONNECT") && status < 300) {
462            return false;
463        }
464        return status >= HttpStatus.SC_OK
465            && status != HttpStatus.SC_NO_CONTENT
466            && status != HttpStatus.SC_NOT_MODIFIED
467            && status != HttpStatus.SC_RESET_CONTENT;
468    }
469
470    static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
471
472    static class State {
473
474        private final Queue<HttpRequest> requestQueue;
475        private volatile MessageState requestState;
476        private volatile MessageState responseState;
477        private volatile HttpRequest request;
478        private volatile HttpResponse response;
479        private volatile boolean valid;
480        private volatile int timeout;
481
482        State() {
483            super();
484            this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
485            this.valid = true;
486            this.requestState = MessageState.READY;
487            this.responseState = MessageState.READY;
488        }
489
490        public MessageState getRequestState() {
491            return this.requestState;
492        }
493
494        public void setRequestState(final MessageState state) {
495            this.requestState = state;
496        }
497
498        public MessageState getResponseState() {
499            return this.responseState;
500        }
501
502        public void setResponseState(final MessageState state) {
503            this.responseState = state;
504        }
505
506        public HttpRequest getRequest() {
507            return this.request;
508        }
509
510        public void setRequest(final HttpRequest request) {
511            this.request = request;
512        }
513
514        public HttpResponse getResponse() {
515            return this.response;
516        }
517
518        public void setResponse(final HttpResponse response) {
519            this.response = response;
520        }
521
522        public Queue<HttpRequest> getRequestQueue() {
523            return this.requestQueue;
524        }
525
526        public int getTimeout() {
527            return this.timeout;
528        }
529
530        public void setTimeout(final int timeout) {
531            this.timeout = timeout;
532        }
533
534        public boolean isValid() {
535            return this.valid;
536        }
537
538        public void invalidate() {
539            this.valid = false;
540        }
541
542        @Override
543        public String toString() {
544            final StringBuilder buf = new StringBuilder();
545            buf.append("request state: ");
546            buf.append(this.requestState);
547            buf.append("; request: ");
548            if (this.request != null) {
549                buf.append(this.request.getRequestLine());
550            }
551            buf.append("; response state: ");
552            buf.append(this.responseState);
553            buf.append("; response: ");
554            if (this.response != null) {
555                buf.append(this.response.getStatusLine());
556            }
557            buf.append("; valid: ");
558            buf.append(this.valid);
559            buf.append(";");
560            return buf.toString();
561        }
562
563    }
564
565}