001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.reactor; 029 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.net.Socket; 033import java.nio.channels.Channel; 034import java.nio.channels.ClosedChannelException; 035import java.nio.channels.ClosedSelectorException; 036import java.nio.channels.SelectableChannel; 037import java.nio.channels.SelectionKey; 038import java.nio.channels.Selector; 039import java.util.ArrayList; 040import java.util.Date; 041import java.util.List; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.atomic.AtomicLong; 044 045import org.apache.http.nio.params.NIOReactorPNames; 046import org.apache.http.nio.reactor.IOEventDispatch; 047import org.apache.http.nio.reactor.IOReactor; 048import org.apache.http.nio.reactor.IOReactorException; 049import org.apache.http.nio.reactor.IOReactorExceptionHandler; 050import org.apache.http.nio.reactor.IOReactorStatus; 051import org.apache.http.params.BasicHttpParams; 052import org.apache.http.params.CoreConnectionPNames; 053import org.apache.http.params.HttpParams; 054import org.apache.http.util.Args; 055import org.apache.http.util.Asserts; 056 057/** 058 * Generic implementation of {@link IOReactor} that can run multiple 059 * {@link BaseIOReactor} instance in separate worker threads and distribute 060 * newly created I/O session equally across those I/O reactors for a more 061 * optimal resource utilization and a better I/O performance. Usually it is 062 * recommended to have one worker I/O reactor per physical CPU core. 063 * <p> 064 * <strong>Important note about exception handling</strong> 065 * <p> 066 * Protocol specific exceptions as well as those I/O exceptions thrown in the 067 * course of interaction with the session's channel are to be expected are to be 068 * dealt with by specific protocol handlers. These exceptions may result in 069 * termination of an individual session but should not affect the I/O reactor 070 * and all other active sessions. There are situations, however, when the I/O 071 * reactor itself encounters an internal problem such as an I/O exception in 072 * the underlying NIO classes or an unhandled runtime exception. Those types of 073 * exceptions are usually fatal and will cause the I/O reactor to shut down 074 * automatically. 075 * <p> 076 * There is a possibility to override this behavior and prevent I/O reactors 077 * from shutting down automatically in case of a runtime exception or an I/O 078 * exception in internal classes. This can be accomplished by providing a custom 079 * implementation of the {@link IOReactorExceptionHandler} interface. 080 * <p> 081 * If an I/O reactor is unable to automatically recover from an I/O or a runtime 082 * exception it will enter the shutdown mode. First off, it cancel all pending 083 * new session requests. Then it will attempt to close all active I/O sessions 084 * gracefully giving them some time to flush pending output data and terminate 085 * cleanly. Lastly, it will forcibly shut down those I/O sessions that still 086 * remain active after the grace period. This is a fairly complex process, where 087 * many things can fail at the same time and many different exceptions can be 088 * thrown in the course of the shutdown process. The I/O reactor will record all 089 * exceptions thrown during the shutdown process, including the original one 090 * that actually caused the shutdown in the first place, in an audit log. One 091 * can obtain the audit log using {@link #getAuditLog()}, examine exceptions 092 * thrown by the I/O reactor prior and in the course of the reactor shutdown 093 * and decide whether it is safe to restart the I/O reactor. 094 * 095 * @since 4.0 096 */ 097@SuppressWarnings("deprecation") 098public abstract class AbstractMultiworkerIOReactor implements IOReactor { 099 100 protected volatile IOReactorStatus status; 101 102 /** 103 * @deprecated (4.2) 104 */ 105 @Deprecated 106 protected final HttpParams params; 107 protected final IOReactorConfig config; 108 protected final Selector selector; 109 protected final long selectTimeout; 110 protected final boolean interestOpsQueueing; 111 112 private final int workerCount; 113 private final ThreadFactory threadFactory; 114 private final BaseIOReactor[] dispatchers; 115 private final Worker[] workers; 116 private final Thread[] threads; 117 private final Object statusLock; 118 119 //TODO: make final 120 protected IOReactorExceptionHandler exceptionHandler; 121 protected List<ExceptionEvent> auditLog; 122 123 private int currentWorker = 0; 124 125 /** 126 * Creates an instance of AbstractMultiworkerIOReactor with the given configuration. 127 * 128 * @param config I/O reactor configuration. 129 * @param threadFactory the factory to create threads. 130 * Can be {@code null}. 131 * @throws IOReactorException in case if a non-recoverable I/O error. 132 * 133 * @since 4.2 134 */ 135 public AbstractMultiworkerIOReactor( 136 final IOReactorConfig config, 137 final ThreadFactory threadFactory) throws IOReactorException { 138 super(); 139 this.config = config != null ? config : IOReactorConfig.DEFAULT; 140 this.params = new BasicHttpParams(); 141 try { 142 this.selector = Selector.open(); 143 } catch (final IOException ex) { 144 throw new IOReactorException("Failure opening selector", ex); 145 } 146 this.selectTimeout = this.config.getSelectInterval(); 147 this.interestOpsQueueing = this.config.isInterestOpQueued(); 148 this.statusLock = new Object(); 149 if (threadFactory != null) { 150 this.threadFactory = threadFactory; 151 } else { 152 this.threadFactory = new DefaultThreadFactory(); 153 } 154 this.auditLog = new ArrayList<ExceptionEvent>(); 155 this.workerCount = this.config.getIoThreadCount(); 156 this.dispatchers = new BaseIOReactor[workerCount]; 157 this.workers = new Worker[workerCount]; 158 this.threads = new Thread[workerCount]; 159 this.status = IOReactorStatus.INACTIVE; 160 } 161 162 /** 163 * Creates an instance of AbstractMultiworkerIOReactor with default configuration. 164 * 165 * @throws IOReactorException in case if a non-recoverable I/O error. 166 * 167 * @since 4.2 168 */ 169 public AbstractMultiworkerIOReactor() throws IOReactorException { 170 this(null, null); 171 } 172 173 @Deprecated 174 static IOReactorConfig convert(final int workerCount, final HttpParams params) { 175 Args.notNull(params, "HTTP parameters"); 176 return IOReactorConfig.custom() 177 .setSelectInterval(params.getLongParameter(NIOReactorPNames.SELECT_INTERVAL, 1000)) 178 .setShutdownGracePeriod(params.getLongParameter(NIOReactorPNames.GRACE_PERIOD, 500)) 179 .setInterestOpQueued(params.getBooleanParameter(NIOReactorPNames.SELECT_INTERVAL, false)) 180 .setIoThreadCount(workerCount) 181 .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0)) 182 .setConnectTimeout(params.getIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 0)) 183 .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0)) 184 .setSoReuseAddress(params.getBooleanParameter(CoreConnectionPNames.SO_REUSEADDR, false)) 185 .setSoKeepAlive(params.getBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE, false)) 186 .setSoLinger(params.getIntParameter(CoreConnectionPNames.SO_LINGER, -1)) 187 .setTcpNoDelay(params.getBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)) 188 .build(); 189 } 190 191 /** 192 * Creates an instance of AbstractMultiworkerIOReactor. 193 * 194 * @param workerCount number of worker I/O reactors. 195 * @param threadFactory the factory to create threads. 196 * Can be {@code null}. 197 * @param params HTTP parameters. 198 * @throws IOReactorException in case if a non-recoverable I/O error. 199 * 200 * @deprecated (4.2) use {@link AbstractMultiworkerIOReactor#AbstractMultiworkerIOReactor(IOReactorConfig, ThreadFactory)} 201 */ 202 @Deprecated 203 public AbstractMultiworkerIOReactor( 204 final int workerCount, 205 final ThreadFactory threadFactory, 206 final HttpParams params) throws IOReactorException { 207 this(convert(workerCount, params), threadFactory); 208 } 209 210 @Override 211 public IOReactorStatus getStatus() { 212 return this.status; 213 } 214 215 /** 216 * Returns the audit log containing exceptions thrown by the I/O reactor 217 * prior and in the course of the reactor shutdown. 218 * 219 * @return audit log. 220 */ 221 public List<ExceptionEvent> getAuditLog() { 222 synchronized (this.auditLog) { 223 return new ArrayList<ExceptionEvent>(this.auditLog); 224 } 225 } 226 227 /** 228 * Adds the given {@link Throwable} object with the given time stamp 229 * to the audit log. 230 * 231 * @param ex the exception thrown by the I/O reactor. 232 * @param timestamp the time stamp of the exception. Can be 233 * {@code null} in which case the current date / time will be used. 234 */ 235 protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) { 236 if (ex == null) { 237 return; 238 } 239 synchronized (this.auditLog) { 240 this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date())); 241 } 242 } 243 244 /** 245 * Adds the given {@link Throwable} object to the audit log. 246 * 247 * @param ex the exception thrown by the I/O reactor. 248 */ 249 protected void addExceptionEvent(final Throwable ex) { 250 addExceptionEvent(ex, null); 251 } 252 253 /** 254 * Sets exception handler for this I/O reactor. 255 * 256 * @param exceptionHandler the exception handler. 257 */ 258 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) { 259 this.exceptionHandler = exceptionHandler; 260 } 261 262 /** 263 * Triggered to process I/O events registered by the main {@link Selector}. 264 * <p> 265 * Super-classes can implement this method to react to the event. 266 * 267 * @param count event count. 268 * @throws IOReactorException in case if a non-recoverable I/O error. 269 */ 270 protected abstract void processEvents(int count) throws IOReactorException; 271 272 /** 273 * Triggered to cancel pending session requests. 274 * <p> 275 * Super-classes can implement this method to react to the event. 276 * 277 * @throws IOReactorException in case if a non-recoverable I/O error. 278 */ 279 protected abstract void cancelRequests() throws IOReactorException; 280 281 /** 282 * Activates the main I/O reactor as well as all worker I/O reactors. 283 * The I/O main reactor will start reacting to I/O events and triggering 284 * notification methods. The worker I/O reactor in their turn will start 285 * reacting to I/O events and dispatch I/O event notifications to the given 286 * {@link IOEventDispatch} interface. 287 * <p> 288 * This method will enter the infinite I/O select loop on 289 * the {@link Selector} instance associated with this I/O reactor and used 290 * to manage creation of new I/O channels. Once a new I/O channel has been 291 * created the processing of I/O events on that channel will be delegated 292 * to one of the worker I/O reactors. 293 * <p> 294 * The method will remain blocked unto the I/O reactor is shut down or the 295 * execution thread is interrupted. 296 * 297 * @see #processEvents(int) 298 * @see #cancelRequests() 299 * 300 * @throws InterruptedIOException if the dispatch thread is interrupted. 301 * @throws IOReactorException in case if a non-recoverable I/O error. 302 */ 303 @Override 304 public void execute( 305 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException { 306 Args.notNull(eventDispatch, "Event dispatcher"); 307 synchronized (this.statusLock) { 308 if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) { 309 this.status = IOReactorStatus.SHUT_DOWN; 310 this.statusLock.notifyAll(); 311 return; 312 } 313 Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0, 314 "Illegal state %s", this.status); 315 this.status = IOReactorStatus.ACTIVE; 316 // Start I/O dispatchers 317 for (int i = 0; i < this.dispatchers.length; i++) { 318 final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing); 319 dispatcher.setExceptionHandler(exceptionHandler); 320 this.dispatchers[i] = dispatcher; 321 } 322 for (int i = 0; i < this.workerCount; i++) { 323 final BaseIOReactor dispatcher = this.dispatchers[i]; 324 this.workers[i] = new Worker(dispatcher, eventDispatch); 325 this.threads[i] = this.threadFactory.newThread(this.workers[i]); 326 } 327 } 328 try { 329 330 for (int i = 0; i < this.workerCount; i++) { 331 if (this.status != IOReactorStatus.ACTIVE) { 332 return; 333 } 334 this.threads[i].start(); 335 } 336 337 for (;;) { 338 final int readyCount; 339 try { 340 readyCount = this.selector.select(this.selectTimeout); 341 } catch (final InterruptedIOException ex) { 342 throw ex; 343 } catch (final IOException ex) { 344 throw new IOReactorException("Unexpected selector failure", ex); 345 } 346 347 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) { 348 processEvents(readyCount); 349 } 350 351 // Verify I/O dispatchers 352 for (int i = 0; i < this.workerCount; i++) { 353 final Worker worker = this.workers[i]; 354 final Throwable ex = worker.getThrowable(); 355 if (ex != null) { 356 throw new IOReactorException( 357 "I/O dispatch worker terminated abnormally", ex); 358 } 359 } 360 361 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) { 362 break; 363 } 364 } 365 366 } catch (final ClosedSelectorException ex) { 367 addExceptionEvent(ex); 368 } catch (final IOReactorException ex) { 369 if (ex.getCause() != null) { 370 addExceptionEvent(ex.getCause()); 371 } 372 throw ex; 373 } finally { 374 doShutdown(); 375 synchronized (this.statusLock) { 376 this.status = IOReactorStatus.SHUT_DOWN; 377 this.statusLock.notifyAll(); 378 } 379 } 380 } 381 382 /** 383 * Activates the shutdown sequence for this reactor. This method will cancel 384 * all pending session requests, close out all active I/O channels, 385 * make an attempt to terminate all worker I/O reactors gracefully, 386 * and finally force-terminate those I/O reactors that failed to 387 * terminate after the specified grace period. 388 * 389 * @throws InterruptedIOException if the shutdown sequence has been 390 * interrupted. 391 */ 392 protected void doShutdown() throws InterruptedIOException { 393 synchronized (this.statusLock) { 394 if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) { 395 return; 396 } 397 this.status = IOReactorStatus.SHUTTING_DOWN; 398 } 399 try { 400 cancelRequests(); 401 } catch (final IOReactorException ex) { 402 if (ex.getCause() != null) { 403 addExceptionEvent(ex.getCause()); 404 } 405 } 406 this.selector.wakeup(); 407 408 // Close out all channels 409 if (this.selector.isOpen()) { 410 for (final SelectionKey key : this.selector.keys()) { 411 try { 412 final Channel channel = key.channel(); 413 if (channel != null) { 414 channel.close(); 415 } 416 } catch (final IOException ex) { 417 addExceptionEvent(ex); 418 } 419 } 420 // Stop dispatching I/O events 421 try { 422 this.selector.close(); 423 } catch (final IOException ex) { 424 addExceptionEvent(ex); 425 } 426 } 427 428 // Attempt to shut down I/O dispatchers gracefully 429 for (int i = 0; i < this.workerCount; i++) { 430 final BaseIOReactor dispatcher = this.dispatchers[i]; 431 dispatcher.gracefulShutdown(); 432 } 433 434 final long gracePeriod = this.config.getShutdownGracePeriod(); 435 436 try { 437 // Force shut down I/O dispatchers if they fail to terminate 438 // in time 439 for (int i = 0; i < this.workerCount; i++) { 440 final BaseIOReactor dispatcher = this.dispatchers[i]; 441 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) { 442 dispatcher.awaitShutdown(gracePeriod); 443 } 444 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) { 445 try { 446 dispatcher.hardShutdown(); 447 } catch (final IOReactorException ex) { 448 if (ex.getCause() != null) { 449 addExceptionEvent(ex.getCause()); 450 } 451 } 452 } 453 } 454 // Join worker threads 455 for (int i = 0; i < this.workerCount; i++) { 456 final Thread t = this.threads[i]; 457 if (t != null) { 458 t.join(gracePeriod); 459 } 460 } 461 } catch (final InterruptedException ex) { 462 throw new InterruptedIOException(ex.getMessage()); 463 } 464 } 465 466 /** 467 * Assigns the given channel entry to one of the worker I/O reactors. 468 * 469 * @param entry the channel entry. 470 */ 471 protected void addChannel(final ChannelEntry entry) { 472 // Distribute new channels among the workers 473 final int i = Math.abs(this.currentWorker++ % this.workerCount); 474 this.dispatchers[i].addChannel(entry); 475 } 476 477 /** 478 * Registers the given channel with the main {@link Selector}. 479 * 480 * @param channel the channel. 481 * @param ops interest ops. 482 * @return selection key. 483 * @throws ClosedChannelException if the channel has been already closed. 484 */ 485 protected SelectionKey registerChannel( 486 final SelectableChannel channel, final int ops) throws ClosedChannelException { 487 return channel.register(this.selector, ops); 488 } 489 490 /** 491 * Prepares the given {@link Socket} by resetting some of its properties. 492 * 493 * @param socket the socket 494 * @throws IOException in case of an I/O error. 495 */ 496 protected void prepareSocket(final Socket socket) throws IOException { 497 socket.setTcpNoDelay(this.config.isTcpNoDelay()); 498 socket.setKeepAlive(this.config.isSoKeepalive()); 499 if (this.config.getSoTimeout() > 0) { 500 socket.setSoTimeout(this.config.getSoTimeout()); 501 } 502 if (this.config.getSndBufSize() > 0) { 503 socket.setSendBufferSize(this.config.getSndBufSize()); 504 } 505 if (this.config.getRcvBufSize() > 0) { 506 socket.setReceiveBufferSize(this.config.getRcvBufSize()); 507 } 508 final int linger = this.config.getSoLinger(); 509 if (linger >= 0) { 510 socket.setSoLinger(true, linger); 511 } 512 } 513 514 /** 515 * Blocks for the given period of time in milliseconds awaiting 516 * the completion of the reactor shutdown. If the value of 517 * {@code timeout} is set to {@code 0} this method blocks 518 * indefinitely. 519 * 520 * @param timeout the maximum wait time. 521 * @throws InterruptedException if interrupted. 522 */ 523 protected void awaitShutdown(final long timeout) throws InterruptedException { 524 synchronized (this.statusLock) { 525 final long deadline = System.currentTimeMillis() + timeout; 526 long remaining = timeout; 527 while (this.status != IOReactorStatus.SHUT_DOWN) { 528 this.statusLock.wait(remaining); 529 if (timeout > 0) { 530 remaining = deadline - System.currentTimeMillis(); 531 if (remaining <= 0) { 532 break; 533 } 534 } 535 } 536 } 537 } 538 539 @Override 540 public void shutdown() throws IOException { 541 shutdown(2000); 542 } 543 544 @Override 545 public void shutdown(final long waitMs) throws IOException { 546 synchronized (this.statusLock) { 547 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) { 548 return; 549 } 550 if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) { 551 this.status = IOReactorStatus.SHUT_DOWN; 552 cancelRequests(); 553 this.selector.close(); 554 return; 555 } 556 this.status = IOReactorStatus.SHUTDOWN_REQUEST; 557 } 558 this.selector.wakeup(); 559 try { 560 awaitShutdown(waitMs); 561 } catch (final InterruptedException ignore) { 562 } 563 } 564 565 static void closeChannel(final Channel channel) { 566 try { 567 channel.close(); 568 } catch (final IOException ignore) { 569 } 570 } 571 572 static class Worker implements Runnable { 573 574 final BaseIOReactor dispatcher; 575 final IOEventDispatch eventDispatch; 576 577 private volatile Throwable exception; 578 579 public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) { 580 super(); 581 this.dispatcher = dispatcher; 582 this.eventDispatch = eventDispatch; 583 } 584 585 @Override 586 public void run() { 587 try { 588 this.dispatcher.execute(this.eventDispatch); 589 } catch (final Error ex) { 590 this.exception = ex; 591 throw ex; 592 } catch (final Exception ex) { 593 this.exception = ex; 594 } 595 } 596 597 public Throwable getThrowable() { 598 return this.exception; 599 } 600 601 } 602 603 static class DefaultThreadFactory implements ThreadFactory { 604 605 private final static AtomicLong COUNT = new AtomicLong(1); 606 607 @Override 608 public Thread newThread(final Runnable r) { 609 return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement()); 610 } 611 612 } 613 614}