001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.reactor;
029
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.nio.channels.CancelledKeyException;
033import java.nio.channels.ClosedChannelException;
034import java.nio.channels.ClosedSelectorException;
035import java.nio.channels.SelectionKey;
036import java.nio.channels.Selector;
037import java.nio.channels.SocketChannel;
038import java.util.Collections;
039import java.util.HashSet;
040import java.util.Queue;
041import java.util.Set;
042import java.util.concurrent.ConcurrentLinkedQueue;
043
044import org.apache.http.nio.reactor.IOReactor;
045import org.apache.http.nio.reactor.IOReactorException;
046import org.apache.http.nio.reactor.IOReactorStatus;
047import org.apache.http.nio.reactor.IOSession;
048import org.apache.http.util.Args;
049import org.apache.http.util.Asserts;
050
051/**
052 * Generic implementation of {@link IOReactor} that can used as a subclass
053 * for more specialized I/O reactors. It is based on a single {@link Selector}
054 * instance.
055 *
056 * @since 4.0
057 */
058public abstract class AbstractIOReactor implements IOReactor {
059
060    private volatile IOReactorStatus status;
061
062    private final Object statusMutex;
063    private final long selectTimeout;
064    private final boolean interestOpsQueueing;
065    private final Selector selector;
066    private final Set<IOSession> sessions;
067    private final Queue<InterestOpEntry> interestOpsQueue;
068    private final Queue<IOSession> closedSessions;
069    private final Queue<ChannelEntry> newChannels;
070
071    /**
072     * Creates new AbstractIOReactor instance.
073     *
074     * @param selectTimeout the select timeout.
075     * @throws IOReactorException in case if a non-recoverable I/O error.
076     */
077    public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
078        this(selectTimeout, false);
079    }
080
081    /**
082     * Creates new AbstractIOReactor instance.
083     *
084     * @param selectTimeout the select timeout.
085     * @param interestOpsQueueing Ops queueing flag.
086     *
087     * @throws IOReactorException in case if a non-recoverable I/O error.
088     *
089     * @since 4.1
090     */
091    public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
092        super();
093        Args.positive(selectTimeout, "Select timeout");
094        this.selectTimeout = selectTimeout;
095        this.interestOpsQueueing = interestOpsQueueing;
096        this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
097        this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
098        this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
099        this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
100        try {
101            this.selector = Selector.open();
102        } catch (final IOException ex) {
103            throw new IOReactorException("Failure opening selector", ex);
104        }
105        this.statusMutex = new Object();
106        this.status = IOReactorStatus.INACTIVE;
107    }
108
109    /**
110     * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
111     * <p>
112     * Super-classes can implement this method to react to the event.
113     *
114     * @param key the selection key.
115     */
116    protected abstract void acceptable(SelectionKey key);
117
118    /**
119     * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
120     * <p>
121     * Super-classes can implement this method to react to the event.
122     *
123     * @param key the selection key.
124     */
125    protected abstract void connectable(SelectionKey key);
126
127    /**
128     * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
129     * <p>
130     * Super-classes can implement this method to react to the event.
131     *
132     * @param key the selection key.
133     */
134    protected abstract void readable(SelectionKey key);
135
136    /**
137     * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
138     * <p>
139     * Super-classes can implement this method to react to the event.
140     *
141     * @param key the selection key.
142     */
143    protected abstract void writable(SelectionKey key);
144
145    /**
146     * Triggered to validate keys currently registered with the selector. This
147     * method is called after each I/O select loop.
148     * <p>
149     * Super-classes can implement this method to run validity checks on
150     * active sessions and include additional processing that needs to be
151     * executed after each I/O select loop.
152     *
153     * @param keys all selection keys registered with the selector.
154     */
155    protected abstract void validate(Set<SelectionKey> keys);
156
157    /**
158     * Triggered when new session has been created.
159     * <p>
160     * Super-classes can implement this method to react to the event.
161     *
162     * @param key the selection key.
163     * @param session new I/O session.
164     */
165    protected void sessionCreated(final SelectionKey key, final IOSession session) {
166    }
167
168    /**
169     * Triggered when a session has been closed.
170     * <p>
171     * Super-classes can implement this method to react to the event.
172     *
173     * @param session closed I/O session.
174     */
175    protected void sessionClosed(final IOSession session) {
176    }
177
178    /**
179     * Triggered when a session has timed out.
180     * <p>
181     * Super-classes can implement this method to react to the event.
182     *
183     * @param session timed out I/O session.
184     */
185    protected void sessionTimedOut(final IOSession session) {
186    }
187
188    /**
189     * Obtains {@link IOSession} instance associated with the given selection
190     * key.
191     *
192     * @param key the selection key.
193     * @return I/O session.
194     */
195    protected IOSession getSession(final SelectionKey key) {
196        return (IOSession) key.attachment();
197    }
198
199    @Override
200    public IOReactorStatus getStatus() {
201        return this.status;
202    }
203
204    /**
205     * Returns {@code true} if interest Ops queueing is enabled, {@code false} otherwise.
206     *
207     * @since 4.1
208     */
209    public boolean getInterestOpsQueueing() {
210        return this.interestOpsQueueing;
211    }
212
213    /**
214     * Adds new channel entry. The channel will be asynchronously registered
215     * with the selector.
216     *
217     * @param channelEntry the channel entry.
218     */
219    public void addChannel(final ChannelEntry channelEntry) {
220        Args.notNull(channelEntry, "Channel entry");
221        this.newChannels.add(channelEntry);
222        this.selector.wakeup();
223    }
224
225    /**
226     * Activates the I/O reactor. The I/O reactor will start reacting to
227     * I/O events and triggering notification methods.
228     * <p>
229     * This method will enter the infinite I/O select loop on
230     * the {@link Selector} instance associated with this I/O reactor.
231     * <p>
232     * The method will remain blocked unto the I/O reactor is shut down or the
233     * execution thread is interrupted.
234     *
235     * @see #acceptable(SelectionKey)
236     * @see #connectable(SelectionKey)
237     * @see #readable(SelectionKey)
238     * @see #writable(SelectionKey)
239     * @see #timeoutCheck(SelectionKey, long)
240     * @see #validate(Set)
241     * @see #sessionCreated(SelectionKey, IOSession)
242     * @see #sessionClosed(IOSession)
243     *
244     * @throws InterruptedIOException if the dispatch thread is interrupted.
245     * @throws IOReactorException in case if a non-recoverable I/O error.
246     */
247    protected void execute() throws InterruptedIOException, IOReactorException {
248        this.status = IOReactorStatus.ACTIVE;
249
250        try {
251            for (;;) {
252
253                final int readyCount;
254                try {
255                    readyCount = this.selector.select(this.selectTimeout);
256                } catch (final InterruptedIOException ex) {
257                    throw ex;
258                } catch (final IOException ex) {
259                    throw new IOReactorException("Unexpected selector failure", ex);
260                }
261
262                if (this.status == IOReactorStatus.SHUT_DOWN) {
263                    // Hard shut down. Exit select loop immediately
264                    break;
265                }
266
267                if (this.status == IOReactorStatus.SHUTTING_DOWN) {
268                    // Graceful shutdown in process
269                    // Try to close things out nicely
270                    closeSessions();
271                    closeNewChannels();
272                }
273
274                // Process selected I/O events
275                if (readyCount > 0) {
276                    processEvents(this.selector.selectedKeys());
277                }
278
279                // Validate active channels
280                validate(this.selector.keys());
281
282                // Process closed sessions
283                processClosedSessions();
284
285                // If active process new channels
286                if (this.status == IOReactorStatus.ACTIVE) {
287                    processNewChannels();
288                }
289
290                // Exit select loop if graceful shutdown has been completed
291                if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
292                        && this.sessions.isEmpty()) {
293                    break;
294                }
295
296                if (this.interestOpsQueueing) {
297                    // process all pending interestOps() operations
298                    processPendingInterestOps();
299                }
300
301            }
302
303        } catch (final ClosedSelectorException ignore) {
304        } finally {
305            hardShutdown();
306            synchronized (this.statusMutex) {
307                this.statusMutex.notifyAll();
308            }
309        }
310    }
311
312    private void processEvents(final Set<SelectionKey> selectedKeys) {
313        for (final SelectionKey key : selectedKeys) {
314
315            processEvent(key);
316
317        }
318        selectedKeys.clear();
319    }
320
321    /**
322     * Processes new event on the given selection key.
323     *
324     * @param key the selection key that triggered an event.
325     */
326    protected void processEvent(final SelectionKey key) {
327        final IOSessionImpl session = (IOSessionImpl) key.attachment();
328        try {
329            if (key.isAcceptable()) {
330                acceptable(key);
331            }
332            if (key.isConnectable()) {
333                connectable(key);
334            }
335            if (key.isReadable()) {
336                session.resetLastRead();
337                readable(key);
338            }
339            if (key.isWritable()) {
340                session.resetLastWrite();
341                writable(key);
342            }
343        } catch (final CancelledKeyException ex) {
344            queueClosedSession(session);
345            key.attach(null);
346        }
347    }
348
349    /**
350     * Queues the given I/O session to be processed asynchronously as closed.
351     *
352     * @param session the closed I/O session.
353     */
354    protected void queueClosedSession(final IOSession session) {
355        if (session != null) {
356            this.closedSessions.add(session);
357        }
358    }
359
360    private void processNewChannels() throws IOReactorException {
361        ChannelEntry entry;
362        while ((entry = this.newChannels.poll()) != null) {
363
364            final SocketChannel channel;
365            final SelectionKey key;
366            try {
367                channel = entry.getChannel();
368                channel.configureBlocking(false);
369                key = channel.register(this.selector, SelectionKey.OP_READ);
370            } catch (final ClosedChannelException ex) {
371                final SessionRequestImpl sessionRequest = entry.getSessionRequest();
372                if (sessionRequest != null) {
373                    sessionRequest.failed(ex);
374                }
375                return;
376
377            } catch (final IOException ex) {
378                throw new IOReactorException("Failure registering channel " +
379                        "with the selector", ex);
380            }
381
382            final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
383
384                @Override
385                public void sessionClosed(final IOSession session) {
386                    queueClosedSession(session);
387                }
388
389            };
390
391            InterestOpsCallback interestOpsCallback = null;
392            if (this.interestOpsQueueing) {
393                interestOpsCallback = new InterestOpsCallback() {
394
395                    @Override
396                    public void addInterestOps(final InterestOpEntry entry) {
397                        queueInterestOps(entry);
398                    }
399
400                };
401            }
402
403            final IOSession session;
404            try {
405                session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
406                int timeout = 0;
407                try {
408                    timeout = channel.socket().getSoTimeout();
409                } catch (final IOException ex) {
410                    // Very unlikely to happen and is not fatal
411                    // as the protocol layer is expected to overwrite
412                    // this value anyways
413                }
414
415                session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
416                session.setSocketTimeout(timeout);
417            } catch (final CancelledKeyException ex) {
418                continue;
419            }
420            try {
421                this.sessions.add(session);
422                final SessionRequestImpl sessionRequest = entry.getSessionRequest();
423                if (sessionRequest != null) {
424                    sessionRequest.completed(session);
425                }
426                key.attach(session);
427                sessionCreated(key, session);
428            } catch (final CancelledKeyException ex) {
429                queueClosedSession(session);
430                key.attach(null);
431            }
432        }
433    }
434
435    private void processClosedSessions() {
436        IOSession session;
437        while ((session = this.closedSessions.poll()) != null) {
438            if (this.sessions.remove(session)) {
439                try {
440                    sessionClosed(session);
441                } catch (final CancelledKeyException ex) {
442                    // ignore and move on
443                }
444            }
445        }
446    }
447
448    private void processPendingInterestOps() {
449        // validity check
450        if (!this.interestOpsQueueing) {
451            return;
452        }
453        InterestOpEntry entry;
454        while ((entry = this.interestOpsQueue.poll()) != null) {
455            // obtain the operation's details
456            final SelectionKey key = entry.getSelectionKey();
457            final int eventMask = entry.getEventMask();
458            if (key.isValid()) {
459                key.interestOps(eventMask);
460            }
461        }
462    }
463
464    private boolean queueInterestOps(final InterestOpEntry entry) {
465        // validity checks
466        Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
467        if (entry == null) {
468            return false;
469        }
470
471        // add this operation to the interestOps() queue
472        this.interestOpsQueue.add(entry);
473
474        return true;
475    }
476
477    /**
478     * Triggered to verify whether the I/O session associated with the
479     * given selection key has not timed out.
480     * <p>
481     * Super-classes can implement this method to react to the event.
482     *
483     * @param key the selection key.
484     * @param now current time as long value.
485     */
486    protected void timeoutCheck(final SelectionKey key, final long now) {
487        final IOSessionImpl session = (IOSessionImpl) key.attachment();
488        if (session != null) {
489            final int timeout = session.getSocketTimeout();
490            if (timeout > 0) {
491                if (session.getLastAccessTime() + timeout < now) {
492                    sessionTimedOut(session);
493                }
494            }
495        }
496    }
497
498    /**
499     * Closes out all I/O sessions maintained by this I/O reactor.
500     */
501    protected void closeSessions() {
502        synchronized (this.sessions) {
503            for (final IOSession session : this.sessions) {
504                session.close();
505            }
506        }
507    }
508
509    /**
510     * Closes out all new channels pending registration with the selector of
511     * this I/O reactor.
512     * @throws IOReactorException - not thrown currently
513     */
514    protected void closeNewChannels() throws IOReactorException {
515        ChannelEntry entry;
516        while ((entry = this.newChannels.poll()) != null) {
517            final SessionRequestImpl sessionRequest = entry.getSessionRequest();
518            if (sessionRequest != null) {
519                sessionRequest.cancel();
520            }
521            final SocketChannel channel = entry.getChannel();
522            try {
523                channel.close();
524            } catch (final IOException ignore) {
525            }
526        }
527    }
528
529    /**
530     * Closes out all active channels registered with the selector of
531     * this I/O reactor.
532     * @throws IOReactorException - not thrown currently
533     */
534    protected void closeActiveChannels() throws IOReactorException {
535        try {
536            final Set<SelectionKey> keys = this.selector.keys();
537            for (final SelectionKey key : keys) {
538                final IOSession session = getSession(key);
539                if (session != null) {
540                    session.close();
541                }
542            }
543            this.selector.close();
544        } catch (final IOException ignore) {
545        }
546    }
547
548    /**
549     * Attempts graceful shutdown of this I/O reactor.
550     */
551    public void gracefulShutdown() {
552        synchronized (this.statusMutex) {
553            if (this.status != IOReactorStatus.ACTIVE) {
554                // Already shutting down
555                return;
556            }
557            this.status = IOReactorStatus.SHUTTING_DOWN;
558        }
559        this.selector.wakeup();
560    }
561
562    /**
563     * Attempts force-shutdown of this I/O reactor.
564     */
565    public void hardShutdown() throws IOReactorException {
566        synchronized (this.statusMutex) {
567            if (this.status == IOReactorStatus.SHUT_DOWN) {
568                // Already shut down
569                return;
570            }
571            this.status = IOReactorStatus.SHUT_DOWN;
572        }
573
574        closeNewChannels();
575        closeActiveChannels();
576        processClosedSessions();
577    }
578
579    /**
580     * Blocks for the given period of time in milliseconds awaiting
581     * the completion of the reactor shutdown.
582     *
583     * @param timeout the maximum wait time.
584     * @throws InterruptedException if interrupted.
585     */
586    public void awaitShutdown(final long timeout) throws InterruptedException {
587        synchronized (this.statusMutex) {
588            final long deadline = System.currentTimeMillis() + timeout;
589            long remaining = timeout;
590            while (this.status != IOReactorStatus.SHUT_DOWN) {
591                this.statusMutex.wait(remaining);
592                if (timeout > 0) {
593                    remaining = deadline - System.currentTimeMillis();
594                    if (remaining <= 0) {
595                        break;
596                    }
597                }
598            }
599        }
600    }
601
602    @Override
603    public void shutdown(final long gracePeriod) throws IOReactorException {
604        if (this.status != IOReactorStatus.INACTIVE) {
605            gracefulShutdown();
606            try {
607                awaitShutdown(gracePeriod);
608            } catch (final InterruptedException ignore) {
609            }
610        }
611        if (this.status != IOReactorStatus.SHUT_DOWN) {
612            hardShutdown();
613        }
614    }
615
616    @Override
617    public void shutdown() throws IOReactorException {
618        shutdown(1000);
619    }
620
621}