001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.nio.protocol; 029 030import java.io.IOException; 031import java.net.SocketTimeoutException; 032import java.util.Queue; 033import java.util.concurrent.ConcurrentLinkedQueue; 034 035import org.apache.http.ConnectionClosedException; 036import org.apache.http.ExceptionLogger; 037import org.apache.http.HttpEntity; 038import org.apache.http.HttpEntityEnclosingRequest; 039import org.apache.http.HttpException; 040import org.apache.http.HttpRequest; 041import org.apache.http.HttpResponse; 042import org.apache.http.HttpStatus; 043import org.apache.http.HttpVersion; 044import org.apache.http.ProtocolException; 045import org.apache.http.ProtocolVersion; 046import org.apache.http.annotation.Contract; 047import org.apache.http.annotation.ThreadingBehavior; 048import org.apache.http.nio.ContentDecoder; 049import org.apache.http.nio.ContentEncoder; 050import org.apache.http.nio.NHttpClientConnection; 051import org.apache.http.nio.NHttpClientEventHandler; 052import org.apache.http.nio.NHttpConnection; 053import org.apache.http.protocol.HttpContext; 054import org.apache.http.util.Args; 055import org.apache.http.util.Asserts; 056 057/** 058 * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side 059 * protocol handler based on the NIO (non-blocking) I/O model. 060 * {@code HttpAsyncRequestExecutor} translates individual events fired through 061 * the {@link NHttpClientEventHandler} interface into logically related HTTP 062 * message exchanges. 063 * <p> The caller is expected to pass an instance of 064 * {@link HttpAsyncClientExchangeHandler} to be used for the next series 065 * of HTTP message exchanges through the connection context using 066 * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered 067 * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method 068 * returns {@code true}. The {@link HttpAsyncRequester} utility class can 069 * be used to facilitate initiation of asynchronous HTTP request execution. 070 * <p> 071 * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of 072 * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol 073 * headers for all outgoing messages and apply common, cross-cutting message 074 * transformations to all incoming and outgoing messages. 075 * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of 076 * application specific content generation and processing to 077 * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}. 078 * 079 * @see HttpAsyncClientExchangeHandler 080 * 081 * @since 4.2 082 */ 083@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) 084public class HttpAsyncRequestExecutor implements NHttpClientEventHandler { 085 086 public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000; 087 public static final String HTTP_HANDLER = "http.nio.exchange-handler"; 088 089 private final int waitForContinue; 090 private final ExceptionLogger exceptionLogger; 091 092 /** 093 * Creates new instance of {@code HttpAsyncRequestExecutor}. 094 * @param waitForContinue wait for continue time period. 095 * @param exceptionLogger Exception logger. If {@code null} 096 * {@link ExceptionLogger#NO_OP} will be used. Please note that the exception 097 * logger will be only used to log I/O exception thrown while closing 098 * {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}). 099 * 100 * @since 4.4 101 */ 102 public HttpAsyncRequestExecutor( 103 final int waitForContinue, 104 final ExceptionLogger exceptionLogger) { 105 super(); 106 this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time"); 107 this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP; 108 } 109 110 /** 111 * Creates new instance of HttpAsyncRequestExecutor. 112 * 113 * @since 4.3 114 */ 115 public HttpAsyncRequestExecutor(final int waitForContinue) { 116 this(waitForContinue, null); 117 } 118 119 public HttpAsyncRequestExecutor() { 120 this(DEFAULT_WAIT_FOR_CONTINUE, null); 121 } 122 123 @Override 124 public void connected( 125 final NHttpClientConnection conn, 126 final Object attachment) throws IOException, HttpException { 127 final State state = new State(); 128 final HttpContext context = conn.getContext(); 129 context.setAttribute(HTTP_EXCHANGE_STATE, state); 130 requestReady(conn); 131 } 132 133 @Override 134 public void closed(final NHttpClientConnection conn) { 135 final State state = getState(conn); 136 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 137 if (state != null) { 138 if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) { 139 if (handler != null) { 140 handler.failed(new ConnectionClosedException("Connection closed unexpectedly")); 141 } 142 } 143 } 144 if (state == null || (handler != null && handler.isDone())) { 145 closeHandler(handler); 146 } 147 } 148 149 @Override 150 public void exception( 151 final NHttpClientConnection conn, final Exception cause) { 152 shutdownConnection(conn); 153 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 154 if (handler != null) { 155 handler.failed(cause); 156 } else { 157 log(cause); 158 } 159 } 160 161 @Override 162 public void requestReady( 163 final NHttpClientConnection conn) throws IOException, HttpException { 164 final State state = getState(conn); 165 Asserts.notNull(state, "Connection state"); 166 Asserts.check(state.getRequestState() == MessageState.READY || 167 state.getRequestState() == MessageState.COMPLETED, 168 "Unexpected request state %s", state.getRequestState()); 169 170 if (state.getRequestState() == MessageState.COMPLETED) { 171 conn.suspendOutput(); 172 return; 173 } 174 final HttpContext context = conn.getContext(); 175 final HttpAsyncClientExchangeHandler handler; 176 synchronized (context) { 177 handler = getHandler(conn); 178 if (handler == null || handler.isDone()) { 179 conn.suspendOutput(); 180 return; 181 } 182 } 183 final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null; 184 185 final HttpRequest request = handler.generateRequest(); 186 if (request == null) { 187 conn.suspendOutput(); 188 return; 189 } 190 final ProtocolVersion version = request.getRequestLine().getProtocolVersion(); 191 if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) { 192 throw new ProtocolException(version + " cannot be used with request pipelining"); 193 } 194 state.setRequest(request); 195 if (pipelined) { 196 state.getRequestQueue().add(request); 197 } 198 if (request instanceof HttpEntityEnclosingRequest) { 199 final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue(); 200 if (expectContinue && pipelined) { 201 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining"); 202 } 203 conn.submitRequest(request); 204 if (expectContinue) { 205 final int timeout = conn.getSocketTimeout(); 206 state.setTimeout(timeout); 207 conn.setSocketTimeout(this.waitForContinue); 208 state.setRequestState(MessageState.ACK_EXPECTED); 209 } else { 210 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); 211 if (entity != null) { 212 state.setRequestState(MessageState.BODY_STREAM); 213 } else { 214 handler.requestCompleted(); 215 state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED); 216 } 217 } 218 } else { 219 conn.submitRequest(request); 220 handler.requestCompleted(); 221 state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED); 222 } 223 } 224 225 @Override 226 public void outputReady( 227 final NHttpClientConnection conn, 228 final ContentEncoder encoder) throws IOException, HttpException { 229 final State state = getState(conn); 230 Asserts.notNull(state, "Connection state"); 231 Asserts.check(state.getRequestState() == MessageState.BODY_STREAM || 232 state.getRequestState() == MessageState.ACK_EXPECTED, 233 "Unexpected request state %s", state.getRequestState()); 234 235 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 236 Asserts.notNull(handler, "Client exchange handler"); 237 if (state.getRequestState() == MessageState.ACK_EXPECTED) { 238 conn.suspendOutput(); 239 return; 240 } 241 handler.produceContent(encoder, conn); 242 if (encoder.isCompleted()) { 243 handler.requestCompleted(); 244 final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null; 245 state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED); 246 } 247 } 248 249 @Override 250 public void responseReceived( 251 final NHttpClientConnection conn) throws HttpException, IOException { 252 final State state = getState(conn); 253 Asserts.notNull(state, "Connection state"); 254 Asserts.check(state.getResponseState() == MessageState.READY, 255 "Unexpected request state %s", state.getResponseState()); 256 257 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 258 Asserts.notNull(handler, "Client exchange handler"); 259 260 final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null; 261 final HttpRequest request; 262 if (pipelined) { 263 request = state.getRequestQueue().poll(); 264 Asserts.notNull(request, "HTTP request"); 265 } else { 266 request = state.getRequest(); 267 if (request == null) { 268 throw new HttpException("Out of sequence response"); 269 } 270 } 271 272 final HttpResponse response = conn.getHttpResponse(); 273 274 final int statusCode = response.getStatusLine().getStatusCode(); 275 if (statusCode < HttpStatus.SC_OK) { 276 // 1xx intermediate response 277 if (statusCode != HttpStatus.SC_CONTINUE) { 278 throw new ProtocolException( 279 "Unexpected response: " + response.getStatusLine()); 280 } 281 if (state.getRequestState() == MessageState.ACK_EXPECTED) { 282 final int timeout = state.getTimeout(); 283 conn.setSocketTimeout(timeout); 284 conn.requestOutput(); 285 state.setRequestState(MessageState.BODY_STREAM); 286 } 287 return; 288 } 289 state.setResponse(response); 290 if (state.getRequestState() == MessageState.ACK_EXPECTED) { 291 final int timeout = state.getTimeout(); 292 conn.setSocketTimeout(timeout); 293 conn.resetOutput(); 294 state.setRequestState(MessageState.COMPLETED); 295 } else if (state.getRequestState() == MessageState.BODY_STREAM) { 296 // Early response 297 if (statusCode >= 400) { 298 conn.resetOutput(); 299 conn.suspendOutput(); 300 state.setRequestState(MessageState.COMPLETED); 301 state.invalidate(); 302 } 303 } 304 305 if (canResponseHaveBody(request, response)) { 306 handler.responseReceived(response); 307 state.setResponseState(MessageState.BODY_STREAM); 308 } else { 309 response.setEntity(null); 310 handler.responseReceived(response); 311 conn.resetInput(); 312 processResponse(conn, state, handler); 313 } 314 } 315 316 @Override 317 public void inputReady( 318 final NHttpClientConnection conn, 319 final ContentDecoder decoder) throws IOException, HttpException { 320 final State state = getState(conn); 321 Asserts.notNull(state, "Connection state"); 322 Asserts.check(state.getResponseState() == MessageState.BODY_STREAM, 323 "Unexpected request state %s", state.getResponseState()); 324 325 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 326 Asserts.notNull(handler, "Client exchange handler"); 327 handler.consumeContent(decoder, conn); 328 if (decoder.isCompleted()) { 329 processResponse(conn, state, handler); 330 } 331 } 332 333 @Override 334 public void endOfInput(final NHttpClientConnection conn) throws IOException { 335 final State state = getState(conn); 336 final HttpContext context = conn.getContext(); 337 synchronized (context) { 338 if (state != null) { 339 if (state.getRequestState().compareTo(MessageState.READY) != 0) { 340 state.invalidate(); 341 } 342 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 343 if (handler != null) { 344 if (state.isValid()) { 345 handler.inputTerminated(); 346 } else { 347 handler.failed(new ConnectionClosedException("Connection closed")); 348 } 349 } 350 } 351 // Closing connection in an orderly manner and 352 // waiting for output buffer to get flushed. 353 // Do not want to wait indefinitely, though, in case 354 // the opposite end is not reading 355 if (conn.getSocketTimeout() <= 0) { 356 conn.setSocketTimeout(1000); 357 } 358 conn.close(); 359 } 360 } 361 362 @Override 363 public void timeout( 364 final NHttpClientConnection conn) throws IOException { 365 final State state = getState(conn); 366 if (state != null) { 367 if (state.getRequestState() == MessageState.ACK_EXPECTED) { 368 final int timeout = state.getTimeout(); 369 conn.setSocketTimeout(timeout); 370 conn.requestOutput(); 371 state.setRequestState(MessageState.BODY_STREAM); 372 state.setTimeout(0); 373 return; 374 } 375 state.invalidate(); 376 final HttpAsyncClientExchangeHandler handler = getHandler(conn); 377 if (handler != null) { 378 handler.failed(new SocketTimeoutException( 379 String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn))); 380 handler.close(); 381 } 382 } 383 if (conn.getStatus() == NHttpConnection.ACTIVE) { 384 conn.close(); 385 if (conn.getStatus() == NHttpConnection.CLOSING) { 386 // Give the connection some grace time to 387 // close itself nicely 388 conn.setSocketTimeout(250); 389 } 390 } else { 391 conn.shutdown(); 392 } 393 } 394 395 /** 396 * This method can be used to log I/O exception thrown while closing 397 * {@link java.io.Closeable} objects (such as 398 * {@link org.apache.http.HttpConnection}}). 399 * 400 * @param ex I/O exception thrown by {@link java.io.Closeable#close()} 401 */ 402 protected void log(final Exception ex) { 403 this.exceptionLogger.log(ex); 404 } 405 406 private static State getState(final NHttpConnection conn) { 407 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE); 408 } 409 410 private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) { 411 return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER); 412 } 413 414 private void shutdownConnection(final NHttpConnection conn) { 415 try { 416 conn.shutdown(); 417 } catch (final IOException ex) { 418 log(ex); 419 } 420 } 421 422 private void closeHandler(final HttpAsyncClientExchangeHandler handler) { 423 if (handler != null) { 424 try { 425 handler.close(); 426 } catch (final IOException ioex) { 427 log(ioex); 428 } 429 } 430 } 431 432 private void processResponse( 433 final NHttpClientConnection conn, 434 final State state, 435 final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException { 436 if (!state.isValid()) { 437 conn.close(); 438 } 439 handler.responseCompleted(); 440 441 final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null; 442 if (!pipelined) { 443 state.setRequestState(MessageState.READY); 444 state.setRequest(null); 445 } 446 state.setResponseState(MessageState.READY); 447 state.setResponse(null); 448 if (!handler.isDone() && conn.isOpen()) { 449 conn.requestOutput(); 450 } 451 } 452 453 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) { 454 455 final String method = request.getRequestLine().getMethod(); 456 final int status = response.getStatusLine().getStatusCode(); 457 458 if (method.equalsIgnoreCase("HEAD")) { 459 return false; 460 } 461 if (method.equalsIgnoreCase("CONNECT") && status < 300) { 462 return false; 463 } 464 return status >= HttpStatus.SC_OK 465 && status != HttpStatus.SC_NO_CONTENT 466 && status != HttpStatus.SC_NOT_MODIFIED 467 && status != HttpStatus.SC_RESET_CONTENT; 468 } 469 470 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state"; 471 472 static class State { 473 474 private final Queue<HttpRequest> requestQueue; 475 private volatile MessageState requestState; 476 private volatile MessageState responseState; 477 private volatile HttpRequest request; 478 private volatile HttpResponse response; 479 private volatile boolean valid; 480 private volatile int timeout; 481 482 State() { 483 super(); 484 this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>(); 485 this.valid = true; 486 this.requestState = MessageState.READY; 487 this.responseState = MessageState.READY; 488 } 489 490 public MessageState getRequestState() { 491 return this.requestState; 492 } 493 494 public void setRequestState(final MessageState state) { 495 this.requestState = state; 496 } 497 498 public MessageState getResponseState() { 499 return this.responseState; 500 } 501 502 public void setResponseState(final MessageState state) { 503 this.responseState = state; 504 } 505 506 public HttpRequest getRequest() { 507 return this.request; 508 } 509 510 public void setRequest(final HttpRequest request) { 511 this.request = request; 512 } 513 514 public HttpResponse getResponse() { 515 return this.response; 516 } 517 518 public void setResponse(final HttpResponse response) { 519 this.response = response; 520 } 521 522 public Queue<HttpRequest> getRequestQueue() { 523 return this.requestQueue; 524 } 525 526 public int getTimeout() { 527 return this.timeout; 528 } 529 530 public void setTimeout(final int timeout) { 531 this.timeout = timeout; 532 } 533 534 public boolean isValid() { 535 return this.valid; 536 } 537 538 public void invalidate() { 539 this.valid = false; 540 } 541 542 @Override 543 public String toString() { 544 final StringBuilder buf = new StringBuilder(); 545 buf.append("request state: "); 546 buf.append(this.requestState); 547 buf.append("; request: "); 548 if (this.request != null) { 549 buf.append(this.request.getRequestLine()); 550 } 551 buf.append("; response state: "); 552 buf.append(this.responseState); 553 buf.append("; response: "); 554 if (this.response != null) { 555 buf.append(this.response.getStatusLine()); 556 } 557 buf.append("; valid: "); 558 buf.append(this.valid); 559 buf.append(";"); 560 return buf.toString(); 561 } 562 563 } 564 565}