001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.nio.protocol;
029
030import java.io.IOException;
031import java.net.SocketTimeoutException;
032import java.util.Queue;
033import java.util.concurrent.ConcurrentLinkedQueue;
034import java.util.concurrent.atomic.AtomicBoolean;
035
036import org.apache.http.ConnectionReuseStrategy;
037import org.apache.http.ExceptionLogger;
038import org.apache.http.HttpEntity;
039import org.apache.http.HttpEntityEnclosingRequest;
040import org.apache.http.HttpException;
041import org.apache.http.HttpRequest;
042import org.apache.http.HttpResponse;
043import org.apache.http.HttpResponseFactory;
044import org.apache.http.HttpStatus;
045import org.apache.http.HttpVersion;
046import org.apache.http.MethodNotSupportedException;
047import org.apache.http.ProtocolException;
048import org.apache.http.UnsupportedHttpVersionException;
049import org.apache.http.annotation.Contract;
050import org.apache.http.annotation.ThreadingBehavior;
051import org.apache.http.concurrent.Cancellable;
052import org.apache.http.entity.ContentType;
053import org.apache.http.impl.DefaultConnectionReuseStrategy;
054import org.apache.http.impl.DefaultHttpResponseFactory;
055import org.apache.http.nio.ContentDecoder;
056import org.apache.http.nio.ContentEncoder;
057import org.apache.http.nio.NHttpConnection;
058import org.apache.http.nio.NHttpServerConnection;
059import org.apache.http.nio.NHttpServerEventHandler;
060import org.apache.http.nio.entity.NStringEntity;
061import org.apache.http.nio.reactor.SessionBufferStatus;
062import org.apache.http.params.HttpParams;
063import org.apache.http.protocol.BasicHttpContext;
064import org.apache.http.protocol.HttpContext;
065import org.apache.http.protocol.HttpCoreContext;
066import org.apache.http.protocol.HttpProcessor;
067import org.apache.http.util.Args;
068import org.apache.http.util.Asserts;
069
070/**
071 * {@code HttpAsyncService} is a fully asynchronous HTTP server side protocol
072 * handler based on the non-blocking (NIO) I/O model.
073 * {@code HttpAsyncServerProtocolHandler} translates individual events fired
074 * through the {@link NHttpServerEventHandler} interface into logically related
075 * HTTP message exchanges.
076 * <p>
077 * Upon receiving an incoming request {@code HttpAsyncService} verifies
078 * the message for compliance with the server expectations using
079 * {@link HttpAsyncExpectationVerifier}, if provided, and then
080 * {@link HttpAsyncRequestHandlerMapper} is used to map the request
081 * to a particular {@link HttpAsyncRequestHandler} intended to handle
082 * the request with the given URI. The protocol handler uses the selected
083 * {@link HttpAsyncRequestHandler} instance to process the incoming request
084 * and to generate an outgoing response.
085 * <p>
086 * {@code HttpAsyncService} relies on {@link HttpProcessor} to generate
087 * mandatory protocol headers for all outgoing messages and apply common,
088 * cross-cutting message transformations to all incoming and outgoing messages,
089 * whereas individual {@link HttpAsyncRequestHandler}s are expected
090 * to implement application specific content generation and processing.
091 * <p>
092 * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response
093 * immediately. They can defer transmission of an HTTP response back to
094 * the client without blocking the I/O thread by delegating the process of
095 * request handling to another service or a worker thread. HTTP response can
096 * be submitted as a later a later point of time once response content becomes
097 * available.
098 *
099 * @since 4.2
100 */
101@SuppressWarnings("deprecation")
102@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
103public class HttpAsyncService implements NHttpServerEventHandler {
104
105    static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
106
107    private final HttpProcessor httpProcessor;
108    private final ConnectionReuseStrategy connStrategy;
109    private final HttpResponseFactory responseFactory;
110    private final HttpAsyncRequestHandlerMapper handlerMapper;
111    private final HttpAsyncExpectationVerifier expectationVerifier;
112    private final ExceptionLogger exceptionLogger;
113
114    /**
115     * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
116     *
117     * @param httpProcessor HTTP protocol processor (required).
118     * @param connStrategy Connection re-use strategy (required).
119     * @param responseFactory HTTP response factory (required).
120     * @param handlerResolver Request handler resolver.
121     * @param expectationVerifier Request expectation verifier (optional).
122     * @param params HTTP parameters (required).
123     *
124     * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
125     *  ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
126     *    HttpAsyncExpectationVerifier)}
127     */
128    @Deprecated
129    public HttpAsyncService(
130            final HttpProcessor httpProcessor,
131            final ConnectionReuseStrategy connStrategy,
132            final HttpResponseFactory responseFactory,
133            final HttpAsyncRequestHandlerResolver handlerResolver,
134            final HttpAsyncExpectationVerifier expectationVerifier,
135            final HttpParams params) {
136        this(httpProcessor,
137             connStrategy,
138             responseFactory,
139             new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
140             expectationVerifier);
141    }
142
143    /**
144     * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
145     *
146     * @param httpProcessor HTTP protocol processor (required).
147     * @param connStrategy Connection re-use strategy (required).
148     * @param handlerResolver Request handler resolver.
149     * @param params HTTP parameters (required).
150     *
151     * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
152     *   ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
153     *   HttpAsyncExpectationVerifier)}
154     */
155    @Deprecated
156    public HttpAsyncService(
157            final HttpProcessor httpProcessor,
158            final ConnectionReuseStrategy connStrategy,
159            final HttpAsyncRequestHandlerResolver handlerResolver,
160            final HttpParams params) {
161        this(httpProcessor,
162             connStrategy,
163             DefaultHttpResponseFactory.INSTANCE,
164             new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
165             null);
166    }
167
168    /**
169     * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
170     *
171     * @param httpProcessor HTTP protocol processor.
172     * @param connStrategy Connection re-use strategy. If {@code null}
173     *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
174     * @param responseFactory HTTP response factory. If {@code null}
175     *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
176     * @param handlerMapper Request handler mapper.
177     * @param expectationVerifier Request expectation verifier. May be {@code null}.
178     *
179     * @since 4.3
180     */
181    public HttpAsyncService(
182            final HttpProcessor httpProcessor,
183            final ConnectionReuseStrategy connStrategy,
184            final HttpResponseFactory responseFactory,
185            final HttpAsyncRequestHandlerMapper handlerMapper,
186            final HttpAsyncExpectationVerifier expectationVerifier) {
187        this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
188    }
189
190    /**
191     * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
192     *
193     * @param httpProcessor HTTP protocol processor.
194     * @param connStrategy Connection re-use strategy. If {@code null}
195     *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
196     * @param responseFactory HTTP response factory. If {@code null}
197     *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
198     * @param handlerMapper Request handler mapper.
199     * @param expectationVerifier Request expectation verifier. May be {@code null}.
200     * @param exceptionLogger Exception logger. If {@code null}
201     *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
202     *   logger will be only used to log I/O exception thrown while closing
203     *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
204     *
205     * @since 4.4
206     */
207    public HttpAsyncService(
208            final HttpProcessor httpProcessor,
209            final ConnectionReuseStrategy connStrategy,
210            final HttpResponseFactory responseFactory,
211            final HttpAsyncRequestHandlerMapper handlerMapper,
212            final HttpAsyncExpectationVerifier expectationVerifier,
213            final ExceptionLogger exceptionLogger) {
214        super();
215        this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
216        this.connStrategy = connStrategy != null ? connStrategy :
217                DefaultConnectionReuseStrategy.INSTANCE;
218        this.responseFactory = responseFactory != null ? responseFactory :
219                DefaultHttpResponseFactory.INSTANCE;
220        this.handlerMapper = handlerMapper;
221        this.expectationVerifier = expectationVerifier;
222        this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
223    }
224
225    /**
226     * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
227     *
228     * @param httpProcessor HTTP protocol processor.
229     * @param handlerMapper Request handler mapper.
230     *
231     * @since 4.3
232     */
233    public HttpAsyncService(
234            final HttpProcessor httpProcessor,
235            final HttpAsyncRequestHandlerMapper handlerMapper) {
236        this(httpProcessor, null, null, handlerMapper, null);
237    }
238
239    /**
240     * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
241     *
242     * @param httpProcessor HTTP protocol processor.
243     * @param handlerMapper Request handler mapper.
244     * @param exceptionLogger Exception logger. If {@code null}
245     *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
246     *   logger will be only used to log I/O exception thrown while closing
247     *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
248     *
249     * @since 4.4
250     */
251    public HttpAsyncService(
252            final HttpProcessor httpProcessor,
253            final HttpAsyncRequestHandlerMapper handlerMapper,
254            final ExceptionLogger exceptionLogger) {
255        this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
256    }
257
258    @Override
259    public void connected(final NHttpServerConnection conn) {
260        final State state = new State();
261        conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
262    }
263
264    @Override
265    public void closed(final NHttpServerConnection conn) {
266        final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
267        if (state != null) {
268            state.setTerminated();
269            closeHandlers(state);
270            final Cancellable cancellable = state.getCancellable();
271            if (cancellable != null) {
272                cancellable.cancel();
273            }
274        }
275    }
276
277    @Override
278    public void exception(
279            final NHttpServerConnection conn, final Exception cause) {
280        log(cause);
281        final State state = getState(conn);
282        if (state == null) {
283            shutdownConnection(conn);
284            return;
285        }
286        state.setTerminated();
287        closeHandlers(state, cause);
288        final Cancellable cancellable = state.getCancellable();
289        if (cancellable != null) {
290            cancellable.cancel();
291        }
292        final Queue<PipelineEntry> pipeline = state.getPipeline();
293        if (!pipeline.isEmpty()
294                || conn.isResponseSubmitted()
295                || state.getResponseState().compareTo(MessageState.INIT) > 0) {
296            // There is not much that we can do if a response
297            // has already been submitted or pipelining is being used.
298            shutdownConnection(conn);
299        } else {
300            try {
301                final Incoming incoming = state.getIncoming();
302                final HttpRequest request = incoming != null ? incoming.getRequest() : null;
303                final HttpContext context = incoming != null ? incoming.getContext() : new BasicHttpContext();
304                final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
305                final HttpResponse response = responseProducer.generateResponse();
306                final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
307                state.setResponseState(MessageState.INIT);
308                state.setOutgoing(outgoing);
309                commitFinalResponse(conn, state);
310            } catch (final Exception ex) {
311                shutdownConnection(conn);
312                closeHandlers(state);
313                if (ex instanceof RuntimeException) {
314                    throw (RuntimeException) ex;
315                } else {
316                    log(ex);
317                }
318            }
319        }
320    }
321
322    @Override
323    public void requestReceived(
324            final NHttpServerConnection conn) throws IOException, HttpException {
325        final State state = getState(conn);
326        Asserts.notNull(state, "Connection state");
327        Asserts.check(state.getRequestState() == MessageState.READY,
328                "Unexpected request state %s", state.getRequestState());
329
330        final HttpRequest request = conn.getHttpRequest();
331        final HttpContext context = new BasicHttpContext();
332
333        context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
334        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
335        this.httpProcessor.process(request, context);
336
337        final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
338        final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
339        consumer.requestReceived(request);
340
341        final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
342        state.setIncoming(incoming);
343
344        if (request instanceof HttpEntityEnclosingRequest) {
345
346            // If 100-continue is expected make sure
347            // there is no pending response data, no pipelined requests or buffered input
348            if (((HttpEntityEnclosingRequest) request).expectContinue()
349                        && state.getResponseState() == MessageState.READY
350                        && state.getPipeline().isEmpty()
351                        && !(conn instanceof SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
352
353                state.setRequestState(MessageState.ACK_EXPECTED);
354                final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
355                        HttpStatus.SC_CONTINUE, context);
356                if (this.expectationVerifier != null) {
357                    conn.suspendInput();
358                    conn.suspendOutput();
359                    final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
360                            request, ack, state, conn, context);
361                    this.expectationVerifier.verify(httpAsyncExchange, context);
362                } else {
363                    conn.submitResponse(ack);
364                    state.setRequestState(MessageState.BODY_STREAM);
365                }
366            } else {
367                state.setRequestState(MessageState.BODY_STREAM);
368            }
369        } else {
370            // No request content is expected. Process request right away
371            completeRequest(incoming, conn, state);
372        }
373    }
374
375    @Override
376    public void inputReady(
377            final NHttpServerConnection conn,
378            final ContentDecoder decoder) throws IOException, HttpException {
379        final State state = getState(conn);
380        Asserts.notNull(state, "Connection state");
381        Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
382                "Unexpected request state %s", state.getRequestState());
383
384        final Incoming incoming = state.getIncoming();
385        Asserts.notNull(incoming, "Incoming request");
386        final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
387        consumer.consumeContent(decoder, conn);
388        if (decoder.isCompleted()) {
389            completeRequest(incoming, conn, state);
390        }
391    }
392
393    @Override
394    public void responseReady(
395            final NHttpServerConnection conn) throws IOException, HttpException {
396        final State state = getState(conn);
397        Asserts.notNull(state, "Connection state");
398        Asserts.check(state.getResponseState() == MessageState.READY ||
399                        state.getResponseState() == MessageState.INIT,
400                "Unexpected response state %s", state.getResponseState());
401
402        if (state.getRequestState() == MessageState.ACK_EXPECTED) {
403            final Outgoing outgoing;
404            synchronized (state) {
405                outgoing = state.getOutgoing();
406                if (outgoing == null) {
407                    conn.suspendOutput();
408                    return;
409                }
410            }
411            final HttpResponse response = outgoing.getResponse();
412            final int status = response.getStatusLine().getStatusCode();
413            if (status == 100) {
414                final HttpContext context = outgoing.getContext();
415                final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
416                try {
417                    // Make sure 100 response has no entity
418                    response.setEntity(null);
419                    conn.requestInput();
420                    state.setRequestState(MessageState.BODY_STREAM);
421                    state.setOutgoing(null);
422                    conn.submitResponse(response);
423                    responseProducer.responseCompleted(context);
424                } finally {
425                    responseProducer.close();
426                }
427            } else if (status >= 400) {
428                conn.resetInput();
429                state.setRequestState(MessageState.READY);
430                commitFinalResponse(conn, state);
431            } else {
432                throw new HttpException("Invalid response: " + response.getStatusLine());
433            }
434        } else {
435            if (state.getResponseState() == MessageState.READY) {
436                final Queue<PipelineEntry> pipeline = state.getPipeline();
437                final PipelineEntry pipelineEntry = pipeline.poll();
438                if (pipelineEntry == null) {
439                    conn.suspendOutput();
440                    return;
441                }
442                state.setResponseState(MessageState.INIT);
443                final Object result = pipelineEntry.getResult();
444                final HttpRequest request = pipelineEntry.getRequest();
445                final HttpContext context = pipelineEntry.getContext();
446                if (result != null) {
447                    final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
448                            HttpStatus.SC_OK, context);
449                    final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
450                            request, response, state, conn, context);
451                    final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
452                    conn.suspendOutput();
453                    try {
454                        handler.handle(result, httpExchange, context);
455                    } catch (final RuntimeException ex) {
456                        throw ex;
457                    } catch (final Exception ex) {
458                        pipeline.add(new PipelineEntry(
459                            request,
460                            null,
461                            ex,
462                            handler,
463                            context));
464                        state.setResponseState(MessageState.READY);
465                        responseReady(conn);
466                        return;
467                    }
468                } else {
469                    final Exception exception = pipelineEntry.getException();
470                    final HttpAsyncResponseProducer responseProducer = handleException(
471                            exception != null ? exception : new HttpException("Internal error processing request"),
472                            context);
473                    final HttpResponse error = responseProducer.generateResponse();
474                    state.setOutgoing(new Outgoing(request, error, responseProducer, context));
475                }
476            }
477            if (state.getResponseState() == MessageState.INIT) {
478                final Outgoing outgoing;
479                synchronized (state) {
480                    outgoing = state.getOutgoing();
481                    if (outgoing == null) {
482                        conn.suspendOutput();
483                        return;
484                    }
485                }
486                final HttpResponse response = outgoing.getResponse();
487                final int status = response.getStatusLine().getStatusCode();
488                if (status >= 200) {
489                    commitFinalResponse(conn, state);
490                } else {
491                    throw new HttpException("Invalid response: " + response.getStatusLine());
492                }
493            }
494        }
495    }
496
497    @Override
498    public void outputReady(
499            final NHttpServerConnection conn,
500            final ContentEncoder encoder) throws HttpException, IOException {
501        final State state = getState(conn);
502        Asserts.notNull(state, "Connection state");
503        Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
504                "Unexpected response state %s", state.getResponseState());
505
506        final Outgoing outgoing = state.getOutgoing();
507        Asserts.notNull(outgoing, "Outgoing response");
508        final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
509
510        responseProducer.produceContent(encoder, conn);
511
512        if (encoder.isCompleted()) {
513            completeResponse(outgoing, conn, state);
514        }
515    }
516
517    @Override
518    public void endOfInput(final NHttpServerConnection conn) throws IOException {
519        // Closing connection in an orderly manner and
520        // waiting for output buffer to get flushed.
521        // Do not want to wait indefinitely, though, in case
522        // the opposite end is not reading
523        if (conn.getSocketTimeout() <= 0) {
524            conn.setSocketTimeout(1000);
525        }
526        conn.close();
527    }
528
529    @Override
530    public void timeout(final NHttpServerConnection conn) throws IOException {
531        final State state = getState(conn);
532        if (state != null) {
533            exception(conn, new SocketTimeoutException(
534                    String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
535        }
536        if (conn.getStatus() == NHttpConnection.ACTIVE) {
537            conn.close();
538            if (conn.getStatus() == NHttpConnection.CLOSING) {
539                // Give the connection some grace time to
540                // close itself nicely
541                conn.setSocketTimeout(250);
542            }
543        } else {
544            conn.shutdown();
545        }
546    }
547
548    private State getState(final NHttpConnection conn) {
549        return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
550    }
551
552    /**
553     * This method can be used to log I/O exception thrown while closing
554     * {@link java.io.Closeable} objects (such as
555     * {@link org.apache.http.HttpConnection}).
556     *
557     * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
558     */
559    protected void log(final Exception ex) {
560        this.exceptionLogger.log(ex);
561    }
562
563    private void shutdownConnection(final NHttpConnection conn) {
564        try {
565            conn.shutdown();
566        } catch (final IOException ex) {
567            log(ex);
568        }
569    }
570
571    private void closeHandlers(final State state, final Exception ex) {
572        final HttpAsyncRequestConsumer<Object> consumer =
573                state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
574        if (consumer != null) {
575            try {
576                consumer.failed(ex);
577            } finally {
578                try {
579                    consumer.close();
580                } catch (final IOException ioex) {
581                    log(ioex);
582                }
583            }
584        }
585        final HttpAsyncResponseProducer producer =
586                state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
587        if (producer != null) {
588            try {
589                producer.failed(ex);
590            } finally {
591                try {
592                    producer.close();
593                } catch (final IOException ioex) {
594                    log(ioex);
595                }
596            }
597        }
598    }
599
600    private void closeHandlers(final State state) {
601        final HttpAsyncRequestConsumer<Object> consumer =
602                state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
603        if (consumer != null) {
604            try {
605                consumer.close();
606            } catch (final IOException ioex) {
607                log(ioex);
608            }
609        }
610        final HttpAsyncResponseProducer producer =
611                state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
612        if (producer != null) {
613            try {
614                producer.close();
615            } catch (final IOException ioex) {
616                log(ioex);
617            }
618        }
619    }
620
621    protected HttpAsyncResponseProducer handleException(
622            final Exception ex, final HttpContext context) {
623        String message = ex.getMessage();
624        if (message == null) {
625            message = ex.toString();
626        }
627        final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
628                toStatusCode(ex, context), context);
629        return new ErrorResponseProducer(response,
630                new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
631    }
632
633    protected int toStatusCode(final Exception ex, final HttpContext context) {
634        final int code;
635        if (ex instanceof MethodNotSupportedException) {
636            code = HttpStatus.SC_NOT_IMPLEMENTED;
637        } else if (ex instanceof UnsupportedHttpVersionException) {
638            code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
639        } else if (ex instanceof ProtocolException) {
640            code = HttpStatus.SC_BAD_REQUEST;
641        } else if (ex instanceof SocketTimeoutException) {
642            code = HttpStatus.SC_GATEWAY_TIMEOUT;
643        } else {
644            code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
645        }
646        return code;
647    }
648
649    /**
650     * This method can be used to handle callback set up happened after
651     * response submission.
652     *
653     * @param cancellable Request cancellation callback.
654     * @param context Request context.
655     *
656     * @since 4.4
657     */
658    protected void handleAlreadySubmittedResponse(
659            final Cancellable cancellable, final HttpContext context) {
660        throw new IllegalStateException("Response already submitted");
661    }
662
663    /**
664     * This method can be used to handle double response submission.
665     *
666     * @param responseProducer Response producer for second response.
667     * @param context Request context.
668     *
669     * @since 4.4
670     */
671    protected void handleAlreadySubmittedResponse(
672            final HttpAsyncResponseProducer responseProducer,
673            final HttpContext context) {
674        throw new IllegalStateException("Response already submitted");
675    }
676
677    private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
678        if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
679            return false;
680        }
681        final int status = response.getStatusLine().getStatusCode();
682        return status >= HttpStatus.SC_OK
683            && status != HttpStatus.SC_NO_CONTENT
684            && status != HttpStatus.SC_NOT_MODIFIED
685            && status != HttpStatus.SC_RESET_CONTENT;
686    }
687
688    private void completeRequest(
689            final Incoming incoming,
690            final NHttpServerConnection conn,
691            final State state) throws IOException, HttpException {
692        state.setRequestState(MessageState.READY);
693        state.setIncoming(null);
694
695        final PipelineEntry pipelineEntry;
696        final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
697        try {
698            final HttpContext context = incoming.getContext();
699            consumer.requestCompleted(context);
700            pipelineEntry = new PipelineEntry(
701                    incoming.getRequest(),
702                    consumer.getResult(),
703                    consumer.getException(),
704                    incoming.getHandler(),
705                    context);
706        } finally {
707            consumer.close();
708        }
709        final Queue<PipelineEntry> pipeline = state.getPipeline();
710        pipeline.add(pipelineEntry);
711        if (state.getResponseState() == MessageState.READY) {
712            conn.requestOutput();
713        }
714    }
715
716    private void commitFinalResponse(
717            final NHttpServerConnection conn,
718            final State state) throws IOException, HttpException {
719        final Outgoing outgoing = state.getOutgoing();
720        Asserts.notNull(outgoing, "Outgoing response");
721        final HttpRequest request = outgoing.getRequest();
722        final HttpResponse response = outgoing.getResponse();
723        final HttpContext context = outgoing.getContext();
724
725        context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
726        this.httpProcessor.process(response, context);
727
728        HttpEntity entity = response.getEntity();
729        if (entity != null && !canResponseHaveBody(request, response)) {
730            response.setEntity(null);
731            entity = null;
732        }
733
734        conn.submitResponse(response);
735
736        if (entity == null) {
737            completeResponse(outgoing, conn, state);
738        } else {
739            state.setResponseState(MessageState.BODY_STREAM);
740        }
741    }
742
743    private void completeResponse(
744            final Outgoing outgoing,
745            final NHttpServerConnection conn,
746            final State state) throws IOException, HttpException {
747        final HttpContext context = outgoing.getContext();
748        final HttpResponse response = outgoing.getResponse();
749        final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
750        try {
751            responseProducer.responseCompleted(context);
752            state.setOutgoing(null);
753            state.setCancellable(null);
754            state.setResponseState(MessageState.READY);
755        } finally {
756            responseProducer.close();
757        }
758        if (!this.connStrategy.keepAlive(response, context)) {
759            conn.close();
760        } else {
761            conn.requestInput();
762        }
763    }
764
765    @SuppressWarnings("unchecked")
766    private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
767        HttpAsyncRequestHandler<Object> handler = null;
768        if (this.handlerMapper != null) {
769            handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
770        }
771        if (handler == null) {
772            handler = NullRequestHandler.INSTANCE;
773        }
774        return handler;
775    }
776
777    static class Incoming {
778
779        private final HttpRequest request;
780        private final HttpAsyncRequestHandler<Object> handler;
781        private final HttpAsyncRequestConsumer<Object> consumer;
782        private final HttpContext context;
783
784        Incoming(
785                final HttpRequest request,
786                final HttpAsyncRequestHandler<Object> handler,
787                final HttpAsyncRequestConsumer<Object> consumer,
788                final HttpContext context) {
789            this.request = request;
790            this.handler = handler;
791            this.consumer = consumer;
792            this.context = context;
793        }
794
795        public HttpRequest getRequest() {
796            return this.request;
797        }
798
799        public HttpAsyncRequestHandler<Object> getHandler() {
800            return this.handler;
801        }
802
803        public HttpAsyncRequestConsumer<Object> getConsumer() {
804            return this.consumer;
805        }
806
807        public HttpContext getContext() {
808            return this.context;
809        }
810    }
811
812    static class Outgoing {
813
814        private final HttpRequest request;
815        private final HttpResponse response;
816        private final HttpAsyncResponseProducer producer;
817        private final HttpContext context;
818
819        Outgoing(
820                final HttpRequest request,
821                final HttpResponse response,
822                final HttpAsyncResponseProducer producer,
823                final HttpContext context) {
824            this.request = request;
825            this.response = response;
826            this.producer = producer;
827            this.context = context;
828        }
829
830        public HttpRequest getRequest() {
831            return this.request;
832        }
833
834        public HttpResponse getResponse() {
835            return this.response;
836        }
837
838        public HttpAsyncResponseProducer getProducer() {
839            return this.producer;
840        }
841
842        public HttpContext getContext() {
843            return this.context;
844        }
845    }
846
847    static class PipelineEntry {
848
849        private final HttpRequest request;
850        private final Object result;
851        private final Exception exception;
852        private final HttpAsyncRequestHandler<Object> handler;
853        private final HttpContext context;
854
855        PipelineEntry(
856                final HttpRequest request,
857                final Object result,
858                final Exception exception,
859                final HttpAsyncRequestHandler<Object> handler,
860                final HttpContext context) {
861            this.request = request;
862            this.result = result;
863            this.exception = exception;
864            this.handler = handler;
865            this.context = context;
866        }
867
868        public HttpRequest getRequest() {
869            return this.request;
870        }
871
872        public Object getResult() {
873            return this.result;
874        }
875
876        public Exception getException() {
877            return this.exception;
878        }
879
880        public HttpAsyncRequestHandler<Object> getHandler() {
881            return this.handler;
882        }
883
884        public HttpContext getContext() {
885            return this.context;
886        }
887
888    }
889
890    static class State {
891
892        private final Queue<PipelineEntry> pipeline;
893        private volatile boolean terminated;
894        private volatile MessageState requestState;
895        private volatile MessageState responseState;
896        private volatile Incoming incoming;
897        private volatile Outgoing outgoing;
898        private volatile Cancellable cancellable;
899
900        State() {
901            super();
902            this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
903            this.requestState = MessageState.READY;
904            this.responseState = MessageState.READY;
905        }
906
907        public boolean isTerminated() {
908            return this.terminated;
909        }
910
911        public void setTerminated() {
912            this.terminated = true;
913        }
914
915        public MessageState getRequestState() {
916            return this.requestState;
917        }
918
919        public void setRequestState(final MessageState state) {
920            this.requestState = state;
921        }
922
923        public MessageState getResponseState() {
924            return this.responseState;
925        }
926
927        public void setResponseState(final MessageState state) {
928            this.responseState = state;
929        }
930
931        public Incoming getIncoming() {
932            return this.incoming;
933        }
934
935        public void setIncoming(final Incoming incoming) {
936            this.incoming = incoming;
937        }
938
939        public Outgoing getOutgoing() {
940            return this.outgoing;
941        }
942
943        public void setOutgoing(final Outgoing outgoing) {
944            this.outgoing = outgoing;
945        }
946
947        public Cancellable getCancellable() {
948            return this.cancellable;
949        }
950
951        public void setCancellable(final Cancellable cancellable) {
952            this.cancellable = cancellable;
953        }
954
955        public Queue<PipelineEntry> getPipeline() {
956            return this.pipeline;
957        }
958
959        @Override
960        public String toString() {
961            final StringBuilder buf = new StringBuilder();
962            buf.append("[incoming ");
963            buf.append(this.requestState);
964            if (this.incoming != null) {
965                buf.append(" ");
966                buf.append(this.incoming.getRequest().getRequestLine());
967            }
968            buf.append("; outgoing ");
969            buf.append(this.responseState);
970            if (this.outgoing != null) {
971                buf.append(" ");
972                buf.append(this.outgoing.getResponse().getStatusLine());
973            }
974            buf.append("]");
975            return buf.toString();
976        }
977
978    }
979
980    class HttpAsyncExchangeImpl implements HttpAsyncExchange {
981
982        private final AtomicBoolean completed = new AtomicBoolean();
983        private final HttpRequest request;
984        private final HttpResponse response;
985        private final State state;
986        private final NHttpServerConnection conn;
987        private final HttpContext context;
988
989        public HttpAsyncExchangeImpl(
990                final HttpRequest request,
991                final HttpResponse response,
992                final State state,
993                final NHttpServerConnection conn,
994                final HttpContext context) {
995            super();
996            this.request = request;
997            this.response = response;
998            this.state = state;
999            this.conn = conn;
1000            this.context = context;
1001        }
1002
1003        @Override
1004        public HttpRequest getRequest() {
1005            return this.request;
1006        }
1007
1008        @Override
1009        public HttpResponse getResponse() {
1010            return this.response;
1011        }
1012
1013        @Override
1014        public void setCallback(final Cancellable cancellable) {
1015            if (this.completed.get()) {
1016                handleAlreadySubmittedResponse(cancellable, context);
1017            } else if (this.state.isTerminated() && cancellable != null) {
1018                cancellable.cancel();
1019            } else {
1020                this.state.setCancellable(cancellable);
1021            }
1022        }
1023
1024        @Override
1025        public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
1026            Args.notNull(responseProducer, "Response producer");
1027            if (this.completed.getAndSet(true)) {
1028                handleAlreadySubmittedResponse(responseProducer, context);
1029            } else if (!this.state.isTerminated()) {
1030                final HttpResponse response = responseProducer.generateResponse();
1031                final Outgoing outgoing = new Outgoing(
1032                        this.request, response, responseProducer, this.context);
1033
1034                synchronized (this.state) {
1035                    this.state.setOutgoing(outgoing);
1036                    this.state.setCancellable(null);
1037                    this.conn.requestOutput();
1038                }
1039
1040            } else {
1041                try {
1042                    responseProducer.close();
1043                } catch (final IOException ex) {
1044                    log(ex);
1045                }
1046            }
1047        }
1048
1049        @Override
1050        public void submitResponse() {
1051            submitResponse(new BasicAsyncResponseProducer(this.response));
1052        }
1053
1054        @Override
1055        public boolean isCompleted() {
1056            return this.completed.get();
1057        }
1058
1059        @Override
1060        public void setTimeout(final int timeout) {
1061            this.conn.setSocketTimeout(timeout);
1062        }
1063
1064        @Override
1065        public int getTimeout() {
1066            return this.conn.getSocketTimeout();
1067        }
1068
1069    }
1070
1071    /**
1072     * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper.
1073     */
1074    @Deprecated
1075    private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1076
1077        private final HttpAsyncRequestHandlerResolver resolver;
1078
1079        public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1080            this.resolver = resolver;
1081        }
1082
1083        @Override
1084        public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1085            return resolver.lookup(request.getRequestLine().getUri());
1086        }
1087
1088    }
1089
1090    /**
1091     * Gets the HttpResponseFactory for this service.
1092     *
1093     * @since 4.4.8
1094     */
1095    protected HttpResponseFactory getResponseFactory() {
1096      return responseFactory;
1097    }
1098
1099}