001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.reactor;
029
030import java.io.InterruptedIOException;
031import java.nio.channels.CancelledKeyException;
032import java.nio.channels.SelectionKey;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.Set;
036
037import org.apache.http.nio.reactor.EventMask;
038import org.apache.http.nio.reactor.IOEventDispatch;
039import org.apache.http.nio.reactor.IOReactorException;
040import org.apache.http.nio.reactor.IOReactorExceptionHandler;
041import org.apache.http.nio.reactor.IOSession;
042import org.apache.http.util.Args;
043
044/**
045 * Default implementation of {@link AbstractIOReactor} that serves as a base
046 * for more advanced {@link org.apache.http.nio.reactor.IOReactor}
047 * implementations. This class adds support for the I/O event dispatching
048 * using {@link IOEventDispatch}, management of buffering sessions, and
049 * session timeout handling.
050 *
051 * @since 4.0
052 */
053public class BaseIOReactor extends AbstractIOReactor {
054
055    private final long timeoutCheckInterval;
056    private final Set<IOSession> bufferingSessions;
057
058    private long lastTimeoutCheck;
059
060    private IOReactorExceptionHandler exceptionHandler = null;
061    private IOEventDispatch eventDispatch = null;
062
063    /**
064     * Creates new BaseIOReactor instance.
065     *
066     * @param selectTimeout the select timeout.
067     * @throws IOReactorException in case if a non-recoverable I/O error.
068     */
069    public BaseIOReactor(final long selectTimeout) throws IOReactorException {
070        this(selectTimeout, false);
071    }
072
073    /**
074     * Creates new BaseIOReactor instance.
075     *
076     * @param selectTimeout the select timeout.
077     * @param interestOpsQueueing Ops queueing flag.
078     *
079     * @throws IOReactorException in case if a non-recoverable I/O error.
080     *
081     * @since 4.1
082     */
083    public BaseIOReactor(
084            final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
085        super(selectTimeout, interestOpsQueueing);
086        this.bufferingSessions = new HashSet<IOSession>();
087        this.timeoutCheckInterval = selectTimeout;
088        this.lastTimeoutCheck = System.currentTimeMillis();
089    }
090
091    /**
092     * Activates the I/O reactor. The I/O reactor will start reacting to I/O
093     * events and dispatch I/O event notifications to the given
094     * {@link IOEventDispatch}.
095     *
096     * @throws InterruptedIOException if the dispatch thread is interrupted.
097     * @throws IOReactorException in case if a non-recoverable I/O error.
098     */
099    @Override
100    public void execute(
101            final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
102        Args.notNull(eventDispatch, "Event dispatcher");
103        this.eventDispatch = eventDispatch;
104        execute();
105    }
106
107    /**
108     * Sets exception handler for this I/O reactor.
109     *
110     * @param exceptionHandler the exception handler.
111     */
112    public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
113        this.exceptionHandler = exceptionHandler;
114    }
115
116    /**
117     * Handles the given {@link RuntimeException}. This method delegates
118     * handling of the exception to the {@link IOReactorExceptionHandler},
119     * if available.
120     *
121     * @param ex the runtime exception.
122     */
123    protected void handleRuntimeException(final RuntimeException ex) {
124        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
125            throw ex;
126        }
127    }
128
129    /**
130     * This I/O reactor implementation does not react to the
131     * {@link SelectionKey#OP_ACCEPT} event.
132     * <p>
133     * Super-classes can override this method to react to the event.
134     */
135    @Override
136    protected void acceptable(final SelectionKey key) {
137    }
138
139    /**
140     * This I/O reactor implementation does not react to the
141     * {@link SelectionKey#OP_CONNECT} event.
142     * <p>
143     * Super-classes can override this method to react to the event.
144     */
145    @Override
146    protected void connectable(final SelectionKey key) {
147    }
148
149    /**
150     * Processes {@link SelectionKey#OP_READ} event on the given selection key.
151     * This method dispatches the event notification to the
152     * {@link IOEventDispatch#inputReady(IOSession)} method.
153     */
154    @Override
155    protected void readable(final SelectionKey key) {
156        final IOSession session = getSession(key);
157        try {
158            // Try to gently feed more data to the event dispatcher
159            // if the session input buffer has not been fully exhausted
160            // (the choice of 5 iterations is purely arbitrary)
161            for (int i = 0; i < 5; i++) {
162                this.eventDispatch.inputReady(session);
163                if (!session.hasBufferedInput()
164                        || (session.getEventMask() & SelectionKey.OP_READ) == 0) {
165                    break;
166                }
167            }
168            if (session.hasBufferedInput()) {
169                this.bufferingSessions.add(session);
170            }
171        } catch (final CancelledKeyException ex) {
172            queueClosedSession(session);
173            key.attach(null);
174        } catch (final RuntimeException ex) {
175            handleRuntimeException(ex);
176        }
177    }
178
179    /**
180     * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
181     * This method dispatches the event notification to the
182     * {@link IOEventDispatch#outputReady(IOSession)} method.
183     */
184    @Override
185    protected void writable(final SelectionKey key) {
186        final IOSession session = getSession(key);
187        try {
188            this.eventDispatch.outputReady(session);
189        } catch (final CancelledKeyException ex) {
190            queueClosedSession(session);
191            key.attach(null);
192        } catch (final RuntimeException ex) {
193            handleRuntimeException(ex);
194        }
195    }
196
197    /**
198     * Verifies whether any of the sessions associated with the given selection
199     * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
200     * method.
201     * <p>
202     * This method will also invoke the
203     * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
204     * that have buffered input data.
205     */
206    @Override
207    protected void validate(final Set<SelectionKey> keys) {
208        final long currentTime = System.currentTimeMillis();
209        if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
210            this.lastTimeoutCheck = currentTime;
211            if (keys != null) {
212                for (final SelectionKey key : keys) {
213                    timeoutCheck(key, currentTime);
214                }
215            }
216        }
217        if (!this.bufferingSessions.isEmpty()) {
218            for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
219                final IOSession session = it.next();
220                if (!session.hasBufferedInput()) {
221                    it.remove();
222                    continue;
223                }
224                try {
225                    if ((session.getEventMask() & EventMask.READ) > 0) {
226                        this.eventDispatch.inputReady(session);
227                        if (!session.hasBufferedInput()) {
228                            it.remove();
229                        }
230                    }
231                } catch (final CancelledKeyException ex) {
232                    it.remove();
233                    queueClosedSession(session);
234                } catch (final RuntimeException ex) {
235                    handleRuntimeException(ex);
236                }
237            }
238        }
239    }
240
241    /**
242     * Processes newly created I/O session. This method dispatches the event
243     * notification to the {@link IOEventDispatch#connected(IOSession)} method.
244     */
245    @Override
246    protected void sessionCreated(final SelectionKey key, final IOSession session) {
247        try {
248            this.eventDispatch.connected(session);
249        } catch (final CancelledKeyException ex) {
250            queueClosedSession(session);
251        } catch (final RuntimeException ex) {
252            handleRuntimeException(ex);
253        }
254    }
255
256    /**
257     * Processes timed out I/O session. This method dispatches the event
258     * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
259     */
260    @Override
261    protected void sessionTimedOut(final IOSession session) {
262        try {
263            this.eventDispatch.timeout(session);
264        } catch (final CancelledKeyException ex) {
265            queueClosedSession(session);
266        } catch (final RuntimeException ex) {
267            handleRuntimeException(ex);
268        }
269    }
270
271    /**
272     * Processes closed I/O session. This method dispatches the event
273     * notification to the {@link IOEventDispatch#disconnected(IOSession)}
274     * method.
275     */
276    @Override
277    protected void sessionClosed(final IOSession session) {
278        try {
279            this.eventDispatch.disconnected(session);
280        } catch (final CancelledKeyException ex) {
281            // ignore
282        } catch (final RuntimeException ex) {
283            handleRuntimeException(ex);
284        }
285    }
286
287}