001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.reactor;
029
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.net.Socket;
033import java.nio.channels.Channel;
034import java.nio.channels.ClosedChannelException;
035import java.nio.channels.ClosedSelectorException;
036import java.nio.channels.SelectableChannel;
037import java.nio.channels.SelectionKey;
038import java.nio.channels.Selector;
039import java.util.ArrayList;
040import java.util.Date;
041import java.util.List;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.atomic.AtomicLong;
044
045import org.apache.http.nio.params.NIOReactorPNames;
046import org.apache.http.nio.reactor.IOEventDispatch;
047import org.apache.http.nio.reactor.IOReactor;
048import org.apache.http.nio.reactor.IOReactorException;
049import org.apache.http.nio.reactor.IOReactorExceptionHandler;
050import org.apache.http.nio.reactor.IOReactorStatus;
051import org.apache.http.params.BasicHttpParams;
052import org.apache.http.params.CoreConnectionPNames;
053import org.apache.http.params.HttpParams;
054import org.apache.http.util.Args;
055import org.apache.http.util.Asserts;
056
057/**
058 * Generic implementation of {@link IOReactor} that can run multiple
059 * {@link BaseIOReactor} instance in separate worker threads and distribute
060 * newly created I/O session equally across those I/O reactors for a more
061 * optimal resource utilization and a better I/O performance. Usually it is
062 * recommended to have one worker I/O reactor per physical CPU core.
063 * <p>
064 * <strong>Important note about exception handling</strong>
065 * <p>
066 * Protocol specific exceptions as well as those I/O exceptions thrown in the
067 * course of interaction with the session's channel are to be expected are to be
068 * dealt with by specific protocol handlers. These exceptions may result in
069 * termination of an individual session but should not affect the I/O reactor
070 * and all other active sessions. There are situations, however, when the I/O
071 * reactor itself encounters an internal problem such as an I/O exception in
072 * the underlying NIO classes or an unhandled runtime exception. Those types of
073 * exceptions are usually fatal and will cause the I/O reactor to shut down
074 * automatically.
075 * <p>
076 * There is a possibility to override this behavior and prevent I/O reactors
077 * from shutting down automatically in case of a runtime exception or an I/O
078 * exception in internal classes. This can be accomplished by providing a custom
079 * implementation of the {@link IOReactorExceptionHandler} interface.
080 * <p>
081 * If an I/O reactor is unable to automatically recover from an I/O or a runtime
082 * exception it will enter the shutdown mode. First off, it cancel all pending
083 * new session requests. Then it will attempt to close all active I/O sessions
084 * gracefully giving them some time to flush pending output data and terminate
085 * cleanly. Lastly, it will forcibly shut down those I/O sessions that still
086 * remain active after the grace period. This is a fairly complex process, where
087 * many things can fail at the same time and many different exceptions can be
088 * thrown in the course of the shutdown process. The I/O reactor will record all
089 * exceptions thrown during the shutdown process, including the original one
090 * that actually caused the shutdown in the first place, in an audit log. One
091 * can obtain the audit log using {@link #getAuditLog()}, examine exceptions
092 * thrown by the I/O reactor prior and in the course of the reactor shutdown
093 * and decide whether it is safe to restart the I/O reactor.
094 *
095 * @since 4.0
096 */
097@SuppressWarnings("deprecation")
098public abstract class AbstractMultiworkerIOReactor implements IOReactor {
099
100    protected volatile IOReactorStatus status;
101
102    /**
103     * @deprecated (4.2)
104     */
105    @Deprecated
106    protected final HttpParams params;
107    protected final IOReactorConfig config;
108    protected final Selector selector;
109    protected final long selectTimeout;
110    protected final boolean interestOpsQueueing;
111
112    private final int workerCount;
113    private final ThreadFactory threadFactory;
114    private final BaseIOReactor[] dispatchers;
115    private final Worker[] workers;
116    private final Thread[] threads;
117    private final Object statusLock;
118
119    //TODO: make final
120    protected IOReactorExceptionHandler exceptionHandler;
121    protected List<ExceptionEvent> auditLog;
122
123    private int currentWorker = 0;
124
125    /**
126     * Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
127     *
128     * @param config I/O reactor configuration.
129     * @param threadFactory the factory to create threads.
130     *   Can be {@code null}.
131     * @throws IOReactorException in case if a non-recoverable I/O error.
132     *
133     * @since 4.2
134     */
135    public AbstractMultiworkerIOReactor(
136            final IOReactorConfig config,
137            final ThreadFactory threadFactory) throws IOReactorException {
138        super();
139        this.config = config != null ? config : IOReactorConfig.DEFAULT;
140        this.params = new BasicHttpParams();
141        try {
142            this.selector = Selector.open();
143        } catch (final IOException ex) {
144            throw new IOReactorException("Failure opening selector", ex);
145        }
146        this.selectTimeout = this.config.getSelectInterval();
147        this.interestOpsQueueing = this.config.isInterestOpQueued();
148        this.statusLock = new Object();
149        if (threadFactory != null) {
150            this.threadFactory = threadFactory;
151        } else {
152            this.threadFactory = new DefaultThreadFactory();
153        }
154        this.auditLog = new ArrayList<ExceptionEvent>();
155        this.workerCount = this.config.getIoThreadCount();
156        this.dispatchers = new BaseIOReactor[workerCount];
157        this.workers = new Worker[workerCount];
158        this.threads = new Thread[workerCount];
159        this.status = IOReactorStatus.INACTIVE;
160    }
161
162    /**
163     * Creates an instance of AbstractMultiworkerIOReactor with default configuration.
164     *
165     * @throws IOReactorException in case if a non-recoverable I/O error.
166     *
167     * @since 4.2
168     */
169    public AbstractMultiworkerIOReactor() throws IOReactorException {
170        this(null, null);
171    }
172
173    @Deprecated
174    static IOReactorConfig convert(final int workerCount, final HttpParams params) {
175        Args.notNull(params, "HTTP parameters");
176        return IOReactorConfig.custom()
177            .setSelectInterval(params.getLongParameter(NIOReactorPNames.SELECT_INTERVAL, 1000))
178            .setShutdownGracePeriod(params.getLongParameter(NIOReactorPNames.GRACE_PERIOD, 500))
179            .setInterestOpQueued(params.getBooleanParameter(NIOReactorPNames.SELECT_INTERVAL, false))
180            .setIoThreadCount(workerCount)
181            .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
182            .setConnectTimeout(params.getIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 0))
183            .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
184            .setSoReuseAddress(params.getBooleanParameter(CoreConnectionPNames.SO_REUSEADDR, false))
185            .setSoKeepAlive(params.getBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE, false))
186            .setSoLinger(params.getIntParameter(CoreConnectionPNames.SO_LINGER, -1))
187            .setTcpNoDelay(params.getBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true))
188            .build();
189    }
190
191    /**
192     * Creates an instance of AbstractMultiworkerIOReactor.
193     *
194     * @param workerCount number of worker I/O reactors.
195     * @param threadFactory the factory to create threads.
196     *   Can be {@code null}.
197     * @param params HTTP parameters.
198     * @throws IOReactorException in case if a non-recoverable I/O error.
199     *
200     * @deprecated (4.2) use {@link AbstractMultiworkerIOReactor#AbstractMultiworkerIOReactor(IOReactorConfig, ThreadFactory)}
201     */
202    @Deprecated
203    public AbstractMultiworkerIOReactor(
204            final int workerCount,
205            final ThreadFactory threadFactory,
206            final HttpParams params) throws IOReactorException {
207        this(convert(workerCount, params), threadFactory);
208    }
209
210    @Override
211    public IOReactorStatus getStatus() {
212        return this.status;
213    }
214
215    /**
216     * Returns the audit log containing exceptions thrown by the I/O reactor
217     * prior and in the course of the reactor shutdown.
218     *
219     * @return audit log.
220     */
221    public List<ExceptionEvent> getAuditLog() {
222        synchronized (this.auditLog) {
223            return new ArrayList<ExceptionEvent>(this.auditLog);
224        }
225    }
226
227    /**
228     * Adds the given {@link Throwable} object with the given time stamp
229     * to the audit log.
230     *
231     * @param ex the exception thrown by the I/O reactor.
232     * @param timestamp the time stamp of the exception. Can be
233     * {@code null} in which case the current date / time will be used.
234     */
235    protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
236        if (ex == null) {
237            return;
238        }
239        synchronized (this.auditLog) {
240            this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date()));
241        }
242    }
243
244    /**
245     * Adds the given {@link Throwable} object to the audit log.
246     *
247     * @param ex the exception thrown by the I/O reactor.
248     */
249    protected void addExceptionEvent(final Throwable ex) {
250        addExceptionEvent(ex, null);
251    }
252
253    /**
254     * Sets exception handler for this I/O reactor.
255     *
256     * @param exceptionHandler the exception handler.
257     */
258    public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
259        this.exceptionHandler = exceptionHandler;
260    }
261
262    /**
263     * Triggered to process I/O events registered by the main {@link Selector}.
264     * <p>
265     * Super-classes can implement this method to react to the event.
266     *
267     * @param count event count.
268     * @throws IOReactorException in case if a non-recoverable I/O error.
269     */
270    protected abstract void processEvents(int count) throws IOReactorException;
271
272    /**
273     * Triggered to cancel pending session requests.
274     * <p>
275     * Super-classes can implement this method to react to the event.
276     *
277     * @throws IOReactorException in case if a non-recoverable I/O error.
278     */
279    protected abstract void cancelRequests() throws IOReactorException;
280
281    /**
282     * Activates the main I/O reactor as well as all worker I/O reactors.
283     * The I/O main reactor will start reacting to I/O events and triggering
284     * notification methods. The worker I/O reactor in their turn will start
285     * reacting to I/O events and dispatch I/O event notifications to the given
286     * {@link IOEventDispatch} interface.
287     * <p>
288     * This method will enter the infinite I/O select loop on
289     * the {@link Selector} instance associated with this I/O reactor and used
290     * to manage creation of new I/O channels. Once a new I/O channel has been
291     * created the processing of I/O events on that channel will be delegated
292     * to one of the worker I/O reactors.
293     * <p>
294     * The method will remain blocked unto the I/O reactor is shut down or the
295     * execution thread is interrupted.
296     *
297     * @see #processEvents(int)
298     * @see #cancelRequests()
299     *
300     * @throws InterruptedIOException if the dispatch thread is interrupted.
301     * @throws IOReactorException in case if a non-recoverable I/O error.
302     */
303    @Override
304    public void execute(
305            final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
306        Args.notNull(eventDispatch, "Event dispatcher");
307        synchronized (this.statusLock) {
308            if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
309                this.status = IOReactorStatus.SHUT_DOWN;
310                this.statusLock.notifyAll();
311                return;
312            }
313            Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
314                    "Illegal state %s", this.status);
315            this.status = IOReactorStatus.ACTIVE;
316            // Start I/O dispatchers
317            for (int i = 0; i < this.dispatchers.length; i++) {
318                final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
319                dispatcher.setExceptionHandler(exceptionHandler);
320                this.dispatchers[i] = dispatcher;
321            }
322            for (int i = 0; i < this.workerCount; i++) {
323                final BaseIOReactor dispatcher = this.dispatchers[i];
324                this.workers[i] = new Worker(dispatcher, eventDispatch);
325                this.threads[i] = this.threadFactory.newThread(this.workers[i]);
326            }
327        }
328        try {
329
330            for (int i = 0; i < this.workerCount; i++) {
331                if (this.status != IOReactorStatus.ACTIVE) {
332                    return;
333                }
334                this.threads[i].start();
335            }
336
337            for (;;) {
338                final int readyCount;
339                try {
340                    readyCount = this.selector.select(this.selectTimeout);
341                } catch (final InterruptedIOException ex) {
342                    throw ex;
343                } catch (final IOException ex) {
344                    throw new IOReactorException("Unexpected selector failure", ex);
345                }
346
347                if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
348                    processEvents(readyCount);
349                }
350
351                // Verify I/O dispatchers
352                for (int i = 0; i < this.workerCount; i++) {
353                    final Worker worker = this.workers[i];
354                    final Throwable ex = worker.getThrowable();
355                    if (ex != null) {
356                        throw new IOReactorException(
357                                "I/O dispatch worker terminated abnormally", ex);
358                    }
359                }
360
361                if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
362                    break;
363                }
364            }
365
366        } catch (final ClosedSelectorException ex) {
367            addExceptionEvent(ex);
368        } catch (final IOReactorException ex) {
369            if (ex.getCause() != null) {
370                addExceptionEvent(ex.getCause());
371            }
372            throw ex;
373        } finally {
374            doShutdown();
375            synchronized (this.statusLock) {
376                this.status = IOReactorStatus.SHUT_DOWN;
377                this.statusLock.notifyAll();
378            }
379        }
380    }
381
382    /**
383     * Activates the shutdown sequence for this reactor. This method will cancel
384     * all pending session requests, close out all active I/O channels,
385     * make an attempt to terminate all worker I/O reactors gracefully,
386     * and finally force-terminate those I/O reactors that failed to
387     * terminate after the specified grace period.
388     *
389     * @throws InterruptedIOException if the shutdown sequence has been
390     *   interrupted.
391     */
392    protected void doShutdown() throws InterruptedIOException {
393        synchronized (this.statusLock) {
394            if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
395                return;
396            }
397            this.status = IOReactorStatus.SHUTTING_DOWN;
398        }
399        try {
400            cancelRequests();
401        } catch (final IOReactorException ex) {
402            if (ex.getCause() != null) {
403                addExceptionEvent(ex.getCause());
404            }
405        }
406        this.selector.wakeup();
407
408        // Close out all channels
409        if (this.selector.isOpen()) {
410            for (final SelectionKey key : this.selector.keys()) {
411                try {
412                    final Channel channel = key.channel();
413                    if (channel != null) {
414                        channel.close();
415                    }
416                } catch (final IOException ex) {
417                    addExceptionEvent(ex);
418                }
419            }
420            // Stop dispatching I/O events
421            try {
422                this.selector.close();
423            } catch (final IOException ex) {
424                addExceptionEvent(ex);
425            }
426        }
427
428        // Attempt to shut down I/O dispatchers gracefully
429        for (int i = 0; i < this.workerCount; i++) {
430            final BaseIOReactor dispatcher = this.dispatchers[i];
431            dispatcher.gracefulShutdown();
432        }
433
434        final long gracePeriod = this.config.getShutdownGracePeriod();
435
436        try {
437            // Force shut down I/O dispatchers if they fail to terminate
438            // in time
439            for (int i = 0; i < this.workerCount; i++) {
440                final BaseIOReactor dispatcher = this.dispatchers[i];
441                if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
442                    dispatcher.awaitShutdown(gracePeriod);
443                }
444                if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
445                    try {
446                        dispatcher.hardShutdown();
447                    } catch (final IOReactorException ex) {
448                        if (ex.getCause() != null) {
449                            addExceptionEvent(ex.getCause());
450                        }
451                    }
452                }
453            }
454            // Join worker threads
455            for (int i = 0; i < this.workerCount; i++) {
456                final Thread t = this.threads[i];
457                if (t != null) {
458                    t.join(gracePeriod);
459                }
460            }
461        } catch (final InterruptedException ex) {
462            throw new InterruptedIOException(ex.getMessage());
463        }
464    }
465
466    /**
467     * Assigns the given channel entry to one of the worker I/O reactors.
468     *
469     * @param entry the channel entry.
470     */
471    protected void addChannel(final ChannelEntry entry) {
472        // Distribute new channels among the workers
473        final int i = Math.abs(this.currentWorker++ % this.workerCount);
474        this.dispatchers[i].addChannel(entry);
475    }
476
477    /**
478     * Registers the given channel with the main {@link Selector}.
479     *
480     * @param channel the channel.
481     * @param ops interest ops.
482     * @return  selection key.
483     * @throws ClosedChannelException if the channel has been already closed.
484     */
485    protected SelectionKey registerChannel(
486            final SelectableChannel channel, final int ops) throws ClosedChannelException {
487        return channel.register(this.selector, ops);
488    }
489
490    /**
491     * Prepares the given {@link Socket} by resetting some of its properties.
492     *
493     * @param socket the socket
494     * @throws IOException in case of an I/O error.
495     */
496    protected void prepareSocket(final Socket socket) throws IOException {
497        socket.setTcpNoDelay(this.config.isTcpNoDelay());
498        socket.setKeepAlive(this.config.isSoKeepalive());
499        if (this.config.getSoTimeout() > 0) {
500            socket.setSoTimeout(this.config.getSoTimeout());
501        }
502        if (this.config.getSndBufSize() > 0) {
503            socket.setSendBufferSize(this.config.getSndBufSize());
504        }
505        if (this.config.getRcvBufSize() > 0) {
506            socket.setReceiveBufferSize(this.config.getRcvBufSize());
507        }
508        final int linger = this.config.getSoLinger();
509        if (linger >= 0) {
510            socket.setSoLinger(true, linger);
511        }
512    }
513
514    /**
515     * Blocks for the given period of time in milliseconds awaiting
516     * the completion of the reactor shutdown. If the value of
517     * {@code timeout} is set to {@code 0} this method blocks
518     * indefinitely.
519     *
520     * @param timeout the maximum wait time.
521     * @throws InterruptedException if interrupted.
522     */
523    protected void awaitShutdown(final long timeout) throws InterruptedException {
524        synchronized (this.statusLock) {
525            final long deadline = System.currentTimeMillis() + timeout;
526            long remaining = timeout;
527            while (this.status != IOReactorStatus.SHUT_DOWN) {
528                this.statusLock.wait(remaining);
529                if (timeout > 0) {
530                    remaining = deadline - System.currentTimeMillis();
531                    if (remaining <= 0) {
532                        break;
533                    }
534                }
535            }
536        }
537    }
538
539    @Override
540    public void shutdown() throws IOException {
541        shutdown(2000);
542    }
543
544    @Override
545    public void shutdown(final long waitMs) throws IOException {
546        synchronized (this.statusLock) {
547            if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
548                return;
549            }
550            if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
551                this.status = IOReactorStatus.SHUT_DOWN;
552                cancelRequests();
553                this.selector.close();
554                return;
555            }
556            this.status = IOReactorStatus.SHUTDOWN_REQUEST;
557        }
558        this.selector.wakeup();
559        try {
560            awaitShutdown(waitMs);
561        } catch (final InterruptedException ignore) {
562        }
563    }
564
565    static void closeChannel(final Channel channel) {
566        try {
567            channel.close();
568        } catch (final IOException ignore) {
569        }
570    }
571
572    static class Worker implements Runnable {
573
574        final BaseIOReactor dispatcher;
575        final IOEventDispatch eventDispatch;
576
577        private volatile Throwable exception;
578
579        public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
580            super();
581            this.dispatcher = dispatcher;
582            this.eventDispatch = eventDispatch;
583        }
584
585        @Override
586        public void run() {
587            try {
588                this.dispatcher.execute(this.eventDispatch);
589            } catch (final Error ex) {
590                this.exception = ex;
591                throw ex;
592            } catch (final Exception ex) {
593                this.exception = ex;
594            }
595        }
596
597        public Throwable getThrowable() {
598            return this.exception;
599        }
600
601    }
602
603    static class DefaultThreadFactory implements ThreadFactory {
604
605        private final static AtomicLong COUNT = new AtomicLong(1);
606
607        @Override
608        public Thread newThread(final Runnable r) {
609            return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
610        }
611
612    }
613
614}