001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.nio.protocol; 029 030import java.io.IOException; 031import java.net.SocketTimeoutException; 032import java.util.Queue; 033import java.util.concurrent.ConcurrentLinkedQueue; 034import java.util.concurrent.atomic.AtomicBoolean; 035 036import org.apache.http.ConnectionReuseStrategy; 037import org.apache.http.ExceptionLogger; 038import org.apache.http.HttpEntity; 039import org.apache.http.HttpEntityEnclosingRequest; 040import org.apache.http.HttpException; 041import org.apache.http.HttpRequest; 042import org.apache.http.HttpResponse; 043import org.apache.http.HttpResponseFactory; 044import org.apache.http.HttpStatus; 045import org.apache.http.HttpVersion; 046import org.apache.http.MethodNotSupportedException; 047import org.apache.http.ProtocolException; 048import org.apache.http.UnsupportedHttpVersionException; 049import org.apache.http.annotation.Contract; 050import org.apache.http.annotation.ThreadingBehavior; 051import org.apache.http.concurrent.Cancellable; 052import org.apache.http.entity.ContentType; 053import org.apache.http.impl.DefaultConnectionReuseStrategy; 054import org.apache.http.impl.DefaultHttpResponseFactory; 055import org.apache.http.nio.ContentDecoder; 056import org.apache.http.nio.ContentEncoder; 057import org.apache.http.nio.NHttpConnection; 058import org.apache.http.nio.NHttpServerConnection; 059import org.apache.http.nio.NHttpServerEventHandler; 060import org.apache.http.nio.entity.NStringEntity; 061import org.apache.http.nio.reactor.SessionBufferStatus; 062import org.apache.http.params.HttpParams; 063import org.apache.http.protocol.BasicHttpContext; 064import org.apache.http.protocol.HttpContext; 065import org.apache.http.protocol.HttpCoreContext; 066import org.apache.http.protocol.HttpProcessor; 067import org.apache.http.util.Args; 068import org.apache.http.util.Asserts; 069 070/** 071 * {@code HttpAsyncService} is a fully asynchronous HTTP server side protocol 072 * handler based on the non-blocking (NIO) I/O model. 073 * {@code HttpAsyncServerProtocolHandler} translates individual events fired 074 * through the {@link NHttpServerEventHandler} interface into logically related 075 * HTTP message exchanges. 076 * <p> 077 * Upon receiving an incoming request {@code HttpAsyncService} verifies 078 * the message for compliance with the server expectations using 079 * {@link HttpAsyncExpectationVerifier}, if provided, and then 080 * {@link HttpAsyncRequestHandlerMapper} is used to map the request 081 * to a particular {@link HttpAsyncRequestHandler} intended to handle 082 * the request with the given URI. The protocol handler uses the selected 083 * {@link HttpAsyncRequestHandler} instance to process the incoming request 084 * and to generate an outgoing response. 085 * <p> 086 * {@code HttpAsyncService} relies on {@link HttpProcessor} to generate 087 * mandatory protocol headers for all outgoing messages and apply common, 088 * cross-cutting message transformations to all incoming and outgoing messages, 089 * whereas individual {@link HttpAsyncRequestHandler}s are expected 090 * to implement application specific content generation and processing. 091 * <p> 092 * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response 093 * immediately. They can defer transmission of an HTTP response back to 094 * the client without blocking the I/O thread by delegating the process of 095 * request handling to another service or a worker thread. HTTP response can 096 * be submitted as a later a later point of time once response content becomes 097 * available. 098 * 099 * @since 4.2 100 */ 101@SuppressWarnings("deprecation") 102@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) 103public class HttpAsyncService implements NHttpServerEventHandler { 104 105 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state"; 106 107 private final HttpProcessor httpProcessor; 108 private final ConnectionReuseStrategy connStrategy; 109 private final HttpResponseFactory responseFactory; 110 private final HttpAsyncRequestHandlerMapper handlerMapper; 111 private final HttpAsyncExpectationVerifier expectationVerifier; 112 private final ExceptionLogger exceptionLogger; 113 114 /** 115 * Creates new instance of {@code HttpAsyncServerProtocolHandler}. 116 * 117 * @param httpProcessor HTTP protocol processor (required). 118 * @param connStrategy Connection re-use strategy (required). 119 * @param responseFactory HTTP response factory (required). 120 * @param handlerResolver Request handler resolver. 121 * @param expectationVerifier Request expectation verifier (optional). 122 * @param params HTTP parameters (required). 123 * 124 * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor, 125 * ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper, 126 * HttpAsyncExpectationVerifier)} 127 */ 128 @Deprecated 129 public HttpAsyncService( 130 final HttpProcessor httpProcessor, 131 final ConnectionReuseStrategy connStrategy, 132 final HttpResponseFactory responseFactory, 133 final HttpAsyncRequestHandlerResolver handlerResolver, 134 final HttpAsyncExpectationVerifier expectationVerifier, 135 final HttpParams params) { 136 this(httpProcessor, 137 connStrategy, 138 responseFactory, 139 new HttpAsyncRequestHandlerResolverAdapter(handlerResolver), 140 expectationVerifier); 141 } 142 143 /** 144 * Creates new instance of {@code HttpAsyncServerProtocolHandler}. 145 * 146 * @param httpProcessor HTTP protocol processor (required). 147 * @param connStrategy Connection re-use strategy (required). 148 * @param handlerResolver Request handler resolver. 149 * @param params HTTP parameters (required). 150 * 151 * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor, 152 * ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper, 153 * HttpAsyncExpectationVerifier)} 154 */ 155 @Deprecated 156 public HttpAsyncService( 157 final HttpProcessor httpProcessor, 158 final ConnectionReuseStrategy connStrategy, 159 final HttpAsyncRequestHandlerResolver handlerResolver, 160 final HttpParams params) { 161 this(httpProcessor, 162 connStrategy, 163 DefaultHttpResponseFactory.INSTANCE, 164 new HttpAsyncRequestHandlerResolverAdapter(handlerResolver), 165 null); 166 } 167 168 /** 169 * Creates new instance of {@code HttpAsyncServerProtocolHandler}. 170 * 171 * @param httpProcessor HTTP protocol processor. 172 * @param connStrategy Connection re-use strategy. If {@code null} 173 * {@link DefaultConnectionReuseStrategy#INSTANCE} will be used. 174 * @param responseFactory HTTP response factory. If {@code null} 175 * {@link DefaultHttpResponseFactory#INSTANCE} will be used. 176 * @param handlerMapper Request handler mapper. 177 * @param expectationVerifier Request expectation verifier. May be {@code null}. 178 * 179 * @since 4.3 180 */ 181 public HttpAsyncService( 182 final HttpProcessor httpProcessor, 183 final ConnectionReuseStrategy connStrategy, 184 final HttpResponseFactory responseFactory, 185 final HttpAsyncRequestHandlerMapper handlerMapper, 186 final HttpAsyncExpectationVerifier expectationVerifier) { 187 this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null); 188 } 189 190 /** 191 * Creates new instance of {@code HttpAsyncServerProtocolHandler}. 192 * 193 * @param httpProcessor HTTP protocol processor. 194 * @param connStrategy Connection re-use strategy. If {@code null} 195 * {@link DefaultConnectionReuseStrategy#INSTANCE} will be used. 196 * @param responseFactory HTTP response factory. If {@code null} 197 * {@link DefaultHttpResponseFactory#INSTANCE} will be used. 198 * @param handlerMapper Request handler mapper. 199 * @param expectationVerifier Request expectation verifier. May be {@code null}. 200 * @param exceptionLogger Exception logger. If {@code null} 201 * {@link ExceptionLogger#NO_OP} will be used. Please note that the exception 202 * logger will be only used to log I/O exception thrown while closing 203 * {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}). 204 * 205 * @since 4.4 206 */ 207 public HttpAsyncService( 208 final HttpProcessor httpProcessor, 209 final ConnectionReuseStrategy connStrategy, 210 final HttpResponseFactory responseFactory, 211 final HttpAsyncRequestHandlerMapper handlerMapper, 212 final HttpAsyncExpectationVerifier expectationVerifier, 213 final ExceptionLogger exceptionLogger) { 214 super(); 215 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); 216 this.connStrategy = connStrategy != null ? connStrategy : 217 DefaultConnectionReuseStrategy.INSTANCE; 218 this.responseFactory = responseFactory != null ? responseFactory : 219 DefaultHttpResponseFactory.INSTANCE; 220 this.handlerMapper = handlerMapper; 221 this.expectationVerifier = expectationVerifier; 222 this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP; 223 } 224 225 /** 226 * Creates new instance of {@code HttpAsyncServerProtocolHandler}. 227 * 228 * @param httpProcessor HTTP protocol processor. 229 * @param handlerMapper Request handler mapper. 230 * 231 * @since 4.3 232 */ 233 public HttpAsyncService( 234 final HttpProcessor httpProcessor, 235 final HttpAsyncRequestHandlerMapper handlerMapper) { 236 this(httpProcessor, null, null, handlerMapper, null); 237 } 238 239 /** 240 * Creates new instance of {@code HttpAsyncServerProtocolHandler}. 241 * 242 * @param httpProcessor HTTP protocol processor. 243 * @param handlerMapper Request handler mapper. 244 * @param exceptionLogger Exception logger. If {@code null} 245 * {@link ExceptionLogger#NO_OP} will be used. Please note that the exception 246 * logger will be only used to log I/O exception thrown while closing 247 * {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}). 248 * 249 * @since 4.4 250 */ 251 public HttpAsyncService( 252 final HttpProcessor httpProcessor, 253 final HttpAsyncRequestHandlerMapper handlerMapper, 254 final ExceptionLogger exceptionLogger) { 255 this(httpProcessor, null, null, handlerMapper, null, exceptionLogger); 256 } 257 258 @Override 259 public void connected(final NHttpServerConnection conn) { 260 final State state = new State(); 261 conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state); 262 } 263 264 @Override 265 public void closed(final NHttpServerConnection conn) { 266 final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE); 267 if (state != null) { 268 state.setTerminated(); 269 closeHandlers(state); 270 final Cancellable cancellable = state.getCancellable(); 271 if (cancellable != null) { 272 cancellable.cancel(); 273 } 274 } 275 } 276 277 @Override 278 public void exception( 279 final NHttpServerConnection conn, final Exception cause) { 280 log(cause); 281 final State state = getState(conn); 282 if (state == null) { 283 shutdownConnection(conn); 284 return; 285 } 286 state.setTerminated(); 287 closeHandlers(state, cause); 288 final Cancellable cancellable = state.getCancellable(); 289 if (cancellable != null) { 290 cancellable.cancel(); 291 } 292 final Queue<PipelineEntry> pipeline = state.getPipeline(); 293 if (!pipeline.isEmpty() 294 || conn.isResponseSubmitted() 295 || state.getResponseState().compareTo(MessageState.INIT) > 0) { 296 // There is not much that we can do if a response 297 // has already been submitted or pipelining is being used. 298 shutdownConnection(conn); 299 } else { 300 try { 301 final Incoming incoming = state.getIncoming(); 302 final HttpRequest request = incoming != null ? incoming.getRequest() : null; 303 final HttpContext context = incoming != null ? incoming.getContext() : new BasicHttpContext(); 304 final HttpAsyncResponseProducer responseProducer = handleException(cause, context); 305 final HttpResponse response = responseProducer.generateResponse(); 306 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context); 307 state.setResponseState(MessageState.INIT); 308 state.setOutgoing(outgoing); 309 commitFinalResponse(conn, state); 310 } catch (final Exception ex) { 311 shutdownConnection(conn); 312 closeHandlers(state); 313 if (ex instanceof RuntimeException) { 314 throw (RuntimeException) ex; 315 } else { 316 log(ex); 317 } 318 } 319 } 320 } 321 322 @Override 323 public void requestReceived( 324 final NHttpServerConnection conn) throws IOException, HttpException { 325 final State state = getState(conn); 326 Asserts.notNull(state, "Connection state"); 327 Asserts.check(state.getRequestState() == MessageState.READY, 328 "Unexpected request state %s", state.getRequestState()); 329 330 final HttpRequest request = conn.getHttpRequest(); 331 final HttpContext context = new BasicHttpContext(); 332 333 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request); 334 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn); 335 this.httpProcessor.process(request, context); 336 337 final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request); 338 final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context); 339 consumer.requestReceived(request); 340 341 final Incoming incoming = new Incoming(request, requestHandler, consumer, context); 342 state.setIncoming(incoming); 343 344 if (request instanceof HttpEntityEnclosingRequest) { 345 346 // If 100-continue is expected make sure 347 // there is no pending response data, no pipelined requests or buffered input 348 if (((HttpEntityEnclosingRequest) request).expectContinue() 349 && state.getResponseState() == MessageState.READY 350 && state.getPipeline().isEmpty() 351 && !(conn instanceof SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) { 352 353 state.setRequestState(MessageState.ACK_EXPECTED); 354 final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1, 355 HttpStatus.SC_CONTINUE, context); 356 if (this.expectationVerifier != null) { 357 conn.suspendInput(); 358 conn.suspendOutput(); 359 final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl( 360 request, ack, state, conn, context); 361 this.expectationVerifier.verify(httpAsyncExchange, context); 362 } else { 363 conn.submitResponse(ack); 364 state.setRequestState(MessageState.BODY_STREAM); 365 } 366 } else { 367 state.setRequestState(MessageState.BODY_STREAM); 368 } 369 } else { 370 // No request content is expected. Process request right away 371 completeRequest(incoming, conn, state); 372 } 373 } 374 375 @Override 376 public void inputReady( 377 final NHttpServerConnection conn, 378 final ContentDecoder decoder) throws IOException, HttpException { 379 final State state = getState(conn); 380 Asserts.notNull(state, "Connection state"); 381 Asserts.check(state.getRequestState() == MessageState.BODY_STREAM, 382 "Unexpected request state %s", state.getRequestState()); 383 384 final Incoming incoming = state.getIncoming(); 385 Asserts.notNull(incoming, "Incoming request"); 386 final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer(); 387 consumer.consumeContent(decoder, conn); 388 if (decoder.isCompleted()) { 389 completeRequest(incoming, conn, state); 390 } 391 } 392 393 @Override 394 public void responseReady( 395 final NHttpServerConnection conn) throws IOException, HttpException { 396 final State state = getState(conn); 397 Asserts.notNull(state, "Connection state"); 398 Asserts.check(state.getResponseState() == MessageState.READY || 399 state.getResponseState() == MessageState.INIT, 400 "Unexpected response state %s", state.getResponseState()); 401 402 if (state.getRequestState() == MessageState.ACK_EXPECTED) { 403 final Outgoing outgoing; 404 synchronized (state) { 405 outgoing = state.getOutgoing(); 406 if (outgoing == null) { 407 conn.suspendOutput(); 408 return; 409 } 410 } 411 final HttpResponse response = outgoing.getResponse(); 412 final int status = response.getStatusLine().getStatusCode(); 413 if (status == 100) { 414 final HttpContext context = outgoing.getContext(); 415 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer(); 416 try { 417 // Make sure 100 response has no entity 418 response.setEntity(null); 419 conn.requestInput(); 420 state.setRequestState(MessageState.BODY_STREAM); 421 state.setOutgoing(null); 422 conn.submitResponse(response); 423 responseProducer.responseCompleted(context); 424 } finally { 425 responseProducer.close(); 426 } 427 } else if (status >= 400) { 428 conn.resetInput(); 429 state.setRequestState(MessageState.READY); 430 commitFinalResponse(conn, state); 431 } else { 432 throw new HttpException("Invalid response: " + response.getStatusLine()); 433 } 434 } else { 435 if (state.getResponseState() == MessageState.READY) { 436 final Queue<PipelineEntry> pipeline = state.getPipeline(); 437 final PipelineEntry pipelineEntry = pipeline.poll(); 438 if (pipelineEntry == null) { 439 conn.suspendOutput(); 440 return; 441 } 442 state.setResponseState(MessageState.INIT); 443 final Object result = pipelineEntry.getResult(); 444 final HttpRequest request = pipelineEntry.getRequest(); 445 final HttpContext context = pipelineEntry.getContext(); 446 if (result != null) { 447 final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1, 448 HttpStatus.SC_OK, context); 449 final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl( 450 request, response, state, conn, context); 451 final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler(); 452 conn.suspendOutput(); 453 try { 454 handler.handle(result, httpExchange, context); 455 } catch (final RuntimeException ex) { 456 throw ex; 457 } catch (final Exception ex) { 458 pipeline.add(new PipelineEntry( 459 request, 460 null, 461 ex, 462 handler, 463 context)); 464 state.setResponseState(MessageState.READY); 465 responseReady(conn); 466 return; 467 } 468 } else { 469 final Exception exception = pipelineEntry.getException(); 470 final HttpAsyncResponseProducer responseProducer = handleException( 471 exception != null ? exception : new HttpException("Internal error processing request"), 472 context); 473 final HttpResponse error = responseProducer.generateResponse(); 474 state.setOutgoing(new Outgoing(request, error, responseProducer, context)); 475 } 476 } 477 if (state.getResponseState() == MessageState.INIT) { 478 final Outgoing outgoing; 479 synchronized (state) { 480 outgoing = state.getOutgoing(); 481 if (outgoing == null) { 482 conn.suspendOutput(); 483 return; 484 } 485 } 486 final HttpResponse response = outgoing.getResponse(); 487 final int status = response.getStatusLine().getStatusCode(); 488 if (status >= 200) { 489 commitFinalResponse(conn, state); 490 } else { 491 throw new HttpException("Invalid response: " + response.getStatusLine()); 492 } 493 } 494 } 495 } 496 497 @Override 498 public void outputReady( 499 final NHttpServerConnection conn, 500 final ContentEncoder encoder) throws HttpException, IOException { 501 final State state = getState(conn); 502 Asserts.notNull(state, "Connection state"); 503 Asserts.check(state.getResponseState() == MessageState.BODY_STREAM, 504 "Unexpected response state %s", state.getResponseState()); 505 506 final Outgoing outgoing = state.getOutgoing(); 507 Asserts.notNull(outgoing, "Outgoing response"); 508 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer(); 509 510 responseProducer.produceContent(encoder, conn); 511 512 if (encoder.isCompleted()) { 513 completeResponse(outgoing, conn, state); 514 } 515 } 516 517 @Override 518 public void endOfInput(final NHttpServerConnection conn) throws IOException { 519 // Closing connection in an orderly manner and 520 // waiting for output buffer to get flushed. 521 // Do not want to wait indefinitely, though, in case 522 // the opposite end is not reading 523 if (conn.getSocketTimeout() <= 0) { 524 conn.setSocketTimeout(1000); 525 } 526 conn.close(); 527 } 528 529 @Override 530 public void timeout(final NHttpServerConnection conn) throws IOException { 531 final State state = getState(conn); 532 if (state != null) { 533 exception(conn, new SocketTimeoutException( 534 String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn))); 535 } 536 if (conn.getStatus() == NHttpConnection.ACTIVE) { 537 conn.close(); 538 if (conn.getStatus() == NHttpConnection.CLOSING) { 539 // Give the connection some grace time to 540 // close itself nicely 541 conn.setSocketTimeout(250); 542 } 543 } else { 544 conn.shutdown(); 545 } 546 } 547 548 private State getState(final NHttpConnection conn) { 549 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE); 550 } 551 552 /** 553 * This method can be used to log I/O exception thrown while closing 554 * {@link java.io.Closeable} objects (such as 555 * {@link org.apache.http.HttpConnection}). 556 * 557 * @param ex I/O exception thrown by {@link java.io.Closeable#close()} 558 */ 559 protected void log(final Exception ex) { 560 this.exceptionLogger.log(ex); 561 } 562 563 private void shutdownConnection(final NHttpConnection conn) { 564 try { 565 conn.shutdown(); 566 } catch (final IOException ex) { 567 log(ex); 568 } 569 } 570 571 private void closeHandlers(final State state, final Exception ex) { 572 final HttpAsyncRequestConsumer<Object> consumer = 573 state.getIncoming() != null ? state.getIncoming().getConsumer() : null; 574 if (consumer != null) { 575 try { 576 consumer.failed(ex); 577 } finally { 578 try { 579 consumer.close(); 580 } catch (final IOException ioex) { 581 log(ioex); 582 } 583 } 584 } 585 final HttpAsyncResponseProducer producer = 586 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null; 587 if (producer != null) { 588 try { 589 producer.failed(ex); 590 } finally { 591 try { 592 producer.close(); 593 } catch (final IOException ioex) { 594 log(ioex); 595 } 596 } 597 } 598 } 599 600 private void closeHandlers(final State state) { 601 final HttpAsyncRequestConsumer<Object> consumer = 602 state.getIncoming() != null ? state.getIncoming().getConsumer() : null; 603 if (consumer != null) { 604 try { 605 consumer.close(); 606 } catch (final IOException ioex) { 607 log(ioex); 608 } 609 } 610 final HttpAsyncResponseProducer producer = 611 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null; 612 if (producer != null) { 613 try { 614 producer.close(); 615 } catch (final IOException ioex) { 616 log(ioex); 617 } 618 } 619 } 620 621 protected HttpAsyncResponseProducer handleException( 622 final Exception ex, final HttpContext context) { 623 String message = ex.getMessage(); 624 if (message == null) { 625 message = ex.toString(); 626 } 627 final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1, 628 toStatusCode(ex, context), context); 629 return new ErrorResponseProducer(response, 630 new NStringEntity(message, ContentType.DEFAULT_TEXT), false); 631 } 632 633 protected int toStatusCode(final Exception ex, final HttpContext context) { 634 final int code; 635 if (ex instanceof MethodNotSupportedException) { 636 code = HttpStatus.SC_NOT_IMPLEMENTED; 637 } else if (ex instanceof UnsupportedHttpVersionException) { 638 code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED; 639 } else if (ex instanceof ProtocolException) { 640 code = HttpStatus.SC_BAD_REQUEST; 641 } else if (ex instanceof SocketTimeoutException) { 642 code = HttpStatus.SC_GATEWAY_TIMEOUT; 643 } else { 644 code = HttpStatus.SC_INTERNAL_SERVER_ERROR; 645 } 646 return code; 647 } 648 649 /** 650 * This method can be used to handle callback set up happened after 651 * response submission. 652 * 653 * @param cancellable Request cancellation callback. 654 * @param context Request context. 655 * 656 * @since 4.4 657 */ 658 protected void handleAlreadySubmittedResponse( 659 final Cancellable cancellable, final HttpContext context) { 660 throw new IllegalStateException("Response already submitted"); 661 } 662 663 /** 664 * This method can be used to handle double response submission. 665 * 666 * @param responseProducer Response producer for second response. 667 * @param context Request context. 668 * 669 * @since 4.4 670 */ 671 protected void handleAlreadySubmittedResponse( 672 final HttpAsyncResponseProducer responseProducer, 673 final HttpContext context) { 674 throw new IllegalStateException("Response already submitted"); 675 } 676 677 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) { 678 if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) { 679 return false; 680 } 681 final int status = response.getStatusLine().getStatusCode(); 682 return status >= HttpStatus.SC_OK 683 && status != HttpStatus.SC_NO_CONTENT 684 && status != HttpStatus.SC_NOT_MODIFIED 685 && status != HttpStatus.SC_RESET_CONTENT; 686 } 687 688 private void completeRequest( 689 final Incoming incoming, 690 final NHttpServerConnection conn, 691 final State state) throws IOException, HttpException { 692 state.setRequestState(MessageState.READY); 693 state.setIncoming(null); 694 695 final PipelineEntry pipelineEntry; 696 final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer(); 697 try { 698 final HttpContext context = incoming.getContext(); 699 consumer.requestCompleted(context); 700 pipelineEntry = new PipelineEntry( 701 incoming.getRequest(), 702 consumer.getResult(), 703 consumer.getException(), 704 incoming.getHandler(), 705 context); 706 } finally { 707 consumer.close(); 708 } 709 final Queue<PipelineEntry> pipeline = state.getPipeline(); 710 pipeline.add(pipelineEntry); 711 if (state.getResponseState() == MessageState.READY) { 712 conn.requestOutput(); 713 } 714 } 715 716 private void commitFinalResponse( 717 final NHttpServerConnection conn, 718 final State state) throws IOException, HttpException { 719 final Outgoing outgoing = state.getOutgoing(); 720 Asserts.notNull(outgoing, "Outgoing response"); 721 final HttpRequest request = outgoing.getRequest(); 722 final HttpResponse response = outgoing.getResponse(); 723 final HttpContext context = outgoing.getContext(); 724 725 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response); 726 this.httpProcessor.process(response, context); 727 728 HttpEntity entity = response.getEntity(); 729 if (entity != null && !canResponseHaveBody(request, response)) { 730 response.setEntity(null); 731 entity = null; 732 } 733 734 conn.submitResponse(response); 735 736 if (entity == null) { 737 completeResponse(outgoing, conn, state); 738 } else { 739 state.setResponseState(MessageState.BODY_STREAM); 740 } 741 } 742 743 private void completeResponse( 744 final Outgoing outgoing, 745 final NHttpServerConnection conn, 746 final State state) throws IOException, HttpException { 747 final HttpContext context = outgoing.getContext(); 748 final HttpResponse response = outgoing.getResponse(); 749 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer(); 750 try { 751 responseProducer.responseCompleted(context); 752 state.setOutgoing(null); 753 state.setCancellable(null); 754 state.setResponseState(MessageState.READY); 755 } finally { 756 responseProducer.close(); 757 } 758 if (!this.connStrategy.keepAlive(response, context)) { 759 conn.close(); 760 } else { 761 conn.requestInput(); 762 } 763 } 764 765 @SuppressWarnings("unchecked") 766 private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) { 767 HttpAsyncRequestHandler<Object> handler = null; 768 if (this.handlerMapper != null) { 769 handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request); 770 } 771 if (handler == null) { 772 handler = NullRequestHandler.INSTANCE; 773 } 774 return handler; 775 } 776 777 static class Incoming { 778 779 private final HttpRequest request; 780 private final HttpAsyncRequestHandler<Object> handler; 781 private final HttpAsyncRequestConsumer<Object> consumer; 782 private final HttpContext context; 783 784 Incoming( 785 final HttpRequest request, 786 final HttpAsyncRequestHandler<Object> handler, 787 final HttpAsyncRequestConsumer<Object> consumer, 788 final HttpContext context) { 789 this.request = request; 790 this.handler = handler; 791 this.consumer = consumer; 792 this.context = context; 793 } 794 795 public HttpRequest getRequest() { 796 return this.request; 797 } 798 799 public HttpAsyncRequestHandler<Object> getHandler() { 800 return this.handler; 801 } 802 803 public HttpAsyncRequestConsumer<Object> getConsumer() { 804 return this.consumer; 805 } 806 807 public HttpContext getContext() { 808 return this.context; 809 } 810 } 811 812 static class Outgoing { 813 814 private final HttpRequest request; 815 private final HttpResponse response; 816 private final HttpAsyncResponseProducer producer; 817 private final HttpContext context; 818 819 Outgoing( 820 final HttpRequest request, 821 final HttpResponse response, 822 final HttpAsyncResponseProducer producer, 823 final HttpContext context) { 824 this.request = request; 825 this.response = response; 826 this.producer = producer; 827 this.context = context; 828 } 829 830 public HttpRequest getRequest() { 831 return this.request; 832 } 833 834 public HttpResponse getResponse() { 835 return this.response; 836 } 837 838 public HttpAsyncResponseProducer getProducer() { 839 return this.producer; 840 } 841 842 public HttpContext getContext() { 843 return this.context; 844 } 845 } 846 847 static class PipelineEntry { 848 849 private final HttpRequest request; 850 private final Object result; 851 private final Exception exception; 852 private final HttpAsyncRequestHandler<Object> handler; 853 private final HttpContext context; 854 855 PipelineEntry( 856 final HttpRequest request, 857 final Object result, 858 final Exception exception, 859 final HttpAsyncRequestHandler<Object> handler, 860 final HttpContext context) { 861 this.request = request; 862 this.result = result; 863 this.exception = exception; 864 this.handler = handler; 865 this.context = context; 866 } 867 868 public HttpRequest getRequest() { 869 return this.request; 870 } 871 872 public Object getResult() { 873 return this.result; 874 } 875 876 public Exception getException() { 877 return this.exception; 878 } 879 880 public HttpAsyncRequestHandler<Object> getHandler() { 881 return this.handler; 882 } 883 884 public HttpContext getContext() { 885 return this.context; 886 } 887 888 } 889 890 static class State { 891 892 private final Queue<PipelineEntry> pipeline; 893 private volatile boolean terminated; 894 private volatile MessageState requestState; 895 private volatile MessageState responseState; 896 private volatile Incoming incoming; 897 private volatile Outgoing outgoing; 898 private volatile Cancellable cancellable; 899 900 State() { 901 super(); 902 this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>(); 903 this.requestState = MessageState.READY; 904 this.responseState = MessageState.READY; 905 } 906 907 public boolean isTerminated() { 908 return this.terminated; 909 } 910 911 public void setTerminated() { 912 this.terminated = true; 913 } 914 915 public MessageState getRequestState() { 916 return this.requestState; 917 } 918 919 public void setRequestState(final MessageState state) { 920 this.requestState = state; 921 } 922 923 public MessageState getResponseState() { 924 return this.responseState; 925 } 926 927 public void setResponseState(final MessageState state) { 928 this.responseState = state; 929 } 930 931 public Incoming getIncoming() { 932 return this.incoming; 933 } 934 935 public void setIncoming(final Incoming incoming) { 936 this.incoming = incoming; 937 } 938 939 public Outgoing getOutgoing() { 940 return this.outgoing; 941 } 942 943 public void setOutgoing(final Outgoing outgoing) { 944 this.outgoing = outgoing; 945 } 946 947 public Cancellable getCancellable() { 948 return this.cancellable; 949 } 950 951 public void setCancellable(final Cancellable cancellable) { 952 this.cancellable = cancellable; 953 } 954 955 public Queue<PipelineEntry> getPipeline() { 956 return this.pipeline; 957 } 958 959 @Override 960 public String toString() { 961 final StringBuilder buf = new StringBuilder(); 962 buf.append("[incoming "); 963 buf.append(this.requestState); 964 if (this.incoming != null) { 965 buf.append(" "); 966 buf.append(this.incoming.getRequest().getRequestLine()); 967 } 968 buf.append("; outgoing "); 969 buf.append(this.responseState); 970 if (this.outgoing != null) { 971 buf.append(" "); 972 buf.append(this.outgoing.getResponse().getStatusLine()); 973 } 974 buf.append("]"); 975 return buf.toString(); 976 } 977 978 } 979 980 class HttpAsyncExchangeImpl implements HttpAsyncExchange { 981 982 private final AtomicBoolean completed = new AtomicBoolean(); 983 private final HttpRequest request; 984 private final HttpResponse response; 985 private final State state; 986 private final NHttpServerConnection conn; 987 private final HttpContext context; 988 989 public HttpAsyncExchangeImpl( 990 final HttpRequest request, 991 final HttpResponse response, 992 final State state, 993 final NHttpServerConnection conn, 994 final HttpContext context) { 995 super(); 996 this.request = request; 997 this.response = response; 998 this.state = state; 999 this.conn = conn; 1000 this.context = context; 1001 } 1002 1003 @Override 1004 public HttpRequest getRequest() { 1005 return this.request; 1006 } 1007 1008 @Override 1009 public HttpResponse getResponse() { 1010 return this.response; 1011 } 1012 1013 @Override 1014 public void setCallback(final Cancellable cancellable) { 1015 if (this.completed.get()) { 1016 handleAlreadySubmittedResponse(cancellable, context); 1017 } else if (this.state.isTerminated() && cancellable != null) { 1018 cancellable.cancel(); 1019 } else { 1020 this.state.setCancellable(cancellable); 1021 } 1022 } 1023 1024 @Override 1025 public void submitResponse(final HttpAsyncResponseProducer responseProducer) { 1026 Args.notNull(responseProducer, "Response producer"); 1027 if (this.completed.getAndSet(true)) { 1028 handleAlreadySubmittedResponse(responseProducer, context); 1029 } else if (!this.state.isTerminated()) { 1030 final HttpResponse response = responseProducer.generateResponse(); 1031 final Outgoing outgoing = new Outgoing( 1032 this.request, response, responseProducer, this.context); 1033 1034 synchronized (this.state) { 1035 this.state.setOutgoing(outgoing); 1036 this.state.setCancellable(null); 1037 this.conn.requestOutput(); 1038 } 1039 1040 } else { 1041 try { 1042 responseProducer.close(); 1043 } catch (final IOException ex) { 1044 log(ex); 1045 } 1046 } 1047 } 1048 1049 @Override 1050 public void submitResponse() { 1051 submitResponse(new BasicAsyncResponseProducer(this.response)); 1052 } 1053 1054 @Override 1055 public boolean isCompleted() { 1056 return this.completed.get(); 1057 } 1058 1059 @Override 1060 public void setTimeout(final int timeout) { 1061 this.conn.setSocketTimeout(timeout); 1062 } 1063 1064 @Override 1065 public int getTimeout() { 1066 return this.conn.getSocketTimeout(); 1067 } 1068 1069 } 1070 1071 /** 1072 * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper. 1073 */ 1074 @Deprecated 1075 private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper { 1076 1077 private final HttpAsyncRequestHandlerResolver resolver; 1078 1079 public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) { 1080 this.resolver = resolver; 1081 } 1082 1083 @Override 1084 public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) { 1085 return resolver.lookup(request.getRequestLine().getUri()); 1086 } 1087 1088 } 1089 1090 /** 1091 * Gets the HttpResponseFactory for this service. 1092 * 1093 * @since 4.4.8 1094 */ 1095 protected HttpResponseFactory getResponseFactory() { 1096 return responseFactory; 1097 } 1098 1099}