001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.reactor; 029 030import java.io.InterruptedIOException; 031import java.nio.channels.CancelledKeyException; 032import java.nio.channels.SelectionKey; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.Set; 036 037import org.apache.http.nio.reactor.EventMask; 038import org.apache.http.nio.reactor.IOEventDispatch; 039import org.apache.http.nio.reactor.IOReactorException; 040import org.apache.http.nio.reactor.IOReactorExceptionHandler; 041import org.apache.http.nio.reactor.IOSession; 042import org.apache.http.util.Args; 043 044/** 045 * Default implementation of {@link AbstractIOReactor} that serves as a base 046 * for more advanced {@link org.apache.http.nio.reactor.IOReactor} 047 * implementations. This class adds support for the I/O event dispatching 048 * using {@link IOEventDispatch}, management of buffering sessions, and 049 * session timeout handling. 050 * 051 * @since 4.0 052 */ 053public class BaseIOReactor extends AbstractIOReactor { 054 055 private final long timeoutCheckInterval; 056 private final Set<IOSession> bufferingSessions; 057 058 private long lastTimeoutCheck; 059 060 private IOReactorExceptionHandler exceptionHandler = null; 061 private IOEventDispatch eventDispatch = null; 062 063 /** 064 * Creates new BaseIOReactor instance. 065 * 066 * @param selectTimeout the select timeout. 067 * @throws IOReactorException in case if a non-recoverable I/O error. 068 */ 069 public BaseIOReactor(final long selectTimeout) throws IOReactorException { 070 this(selectTimeout, false); 071 } 072 073 /** 074 * Creates new BaseIOReactor instance. 075 * 076 * @param selectTimeout the select timeout. 077 * @param interestOpsQueueing Ops queueing flag. 078 * 079 * @throws IOReactorException in case if a non-recoverable I/O error. 080 * 081 * @since 4.1 082 */ 083 public BaseIOReactor( 084 final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException { 085 super(selectTimeout, interestOpsQueueing); 086 this.bufferingSessions = new HashSet<IOSession>(); 087 this.timeoutCheckInterval = selectTimeout; 088 this.lastTimeoutCheck = System.currentTimeMillis(); 089 } 090 091 /** 092 * Activates the I/O reactor. The I/O reactor will start reacting to I/O 093 * events and dispatch I/O event notifications to the given 094 * {@link IOEventDispatch}. 095 * 096 * @throws InterruptedIOException if the dispatch thread is interrupted. 097 * @throws IOReactorException in case if a non-recoverable I/O error. 098 */ 099 @Override 100 public void execute( 101 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException { 102 Args.notNull(eventDispatch, "Event dispatcher"); 103 this.eventDispatch = eventDispatch; 104 execute(); 105 } 106 107 /** 108 * Sets exception handler for this I/O reactor. 109 * 110 * @param exceptionHandler the exception handler. 111 */ 112 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) { 113 this.exceptionHandler = exceptionHandler; 114 } 115 116 /** 117 * Handles the given {@link RuntimeException}. This method delegates 118 * handling of the exception to the {@link IOReactorExceptionHandler}, 119 * if available. 120 * 121 * @param ex the runtime exception. 122 */ 123 protected void handleRuntimeException(final RuntimeException ex) { 124 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) { 125 throw ex; 126 } 127 } 128 129 /** 130 * This I/O reactor implementation does not react to the 131 * {@link SelectionKey#OP_ACCEPT} event. 132 * <p> 133 * Super-classes can override this method to react to the event. 134 */ 135 @Override 136 protected void acceptable(final SelectionKey key) { 137 } 138 139 /** 140 * This I/O reactor implementation does not react to the 141 * {@link SelectionKey#OP_CONNECT} event. 142 * <p> 143 * Super-classes can override this method to react to the event. 144 */ 145 @Override 146 protected void connectable(final SelectionKey key) { 147 } 148 149 /** 150 * Processes {@link SelectionKey#OP_READ} event on the given selection key. 151 * This method dispatches the event notification to the 152 * {@link IOEventDispatch#inputReady(IOSession)} method. 153 */ 154 @Override 155 protected void readable(final SelectionKey key) { 156 final IOSession session = getSession(key); 157 try { 158 // Try to gently feed more data to the event dispatcher 159 // if the session input buffer has not been fully exhausted 160 // (the choice of 5 iterations is purely arbitrary) 161 for (int i = 0; i < 5; i++) { 162 this.eventDispatch.inputReady(session); 163 if (!session.hasBufferedInput() 164 || (session.getEventMask() & SelectionKey.OP_READ) == 0) { 165 break; 166 } 167 } 168 if (session.hasBufferedInput()) { 169 this.bufferingSessions.add(session); 170 } 171 } catch (final CancelledKeyException ex) { 172 queueClosedSession(session); 173 key.attach(null); 174 } catch (final RuntimeException ex) { 175 handleRuntimeException(ex); 176 } 177 } 178 179 /** 180 * Processes {@link SelectionKey#OP_WRITE} event on the given selection key. 181 * This method dispatches the event notification to the 182 * {@link IOEventDispatch#outputReady(IOSession)} method. 183 */ 184 @Override 185 protected void writable(final SelectionKey key) { 186 final IOSession session = getSession(key); 187 try { 188 this.eventDispatch.outputReady(session); 189 } catch (final CancelledKeyException ex) { 190 queueClosedSession(session); 191 key.attach(null); 192 } catch (final RuntimeException ex) { 193 handleRuntimeException(ex); 194 } 195 } 196 197 /** 198 * Verifies whether any of the sessions associated with the given selection 199 * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)} 200 * method. 201 * <p> 202 * This method will also invoke the 203 * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions 204 * that have buffered input data. 205 */ 206 @Override 207 protected void validate(final Set<SelectionKey> keys) { 208 final long currentTime = System.currentTimeMillis(); 209 if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) { 210 this.lastTimeoutCheck = currentTime; 211 if (keys != null) { 212 for (final SelectionKey key : keys) { 213 timeoutCheck(key, currentTime); 214 } 215 } 216 } 217 if (!this.bufferingSessions.isEmpty()) { 218 for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) { 219 final IOSession session = it.next(); 220 if (!session.hasBufferedInput()) { 221 it.remove(); 222 continue; 223 } 224 try { 225 if ((session.getEventMask() & EventMask.READ) > 0) { 226 this.eventDispatch.inputReady(session); 227 if (!session.hasBufferedInput()) { 228 it.remove(); 229 } 230 } 231 } catch (final CancelledKeyException ex) { 232 it.remove(); 233 queueClosedSession(session); 234 } catch (final RuntimeException ex) { 235 handleRuntimeException(ex); 236 } 237 } 238 } 239 } 240 241 /** 242 * Processes newly created I/O session. This method dispatches the event 243 * notification to the {@link IOEventDispatch#connected(IOSession)} method. 244 */ 245 @Override 246 protected void sessionCreated(final SelectionKey key, final IOSession session) { 247 try { 248 this.eventDispatch.connected(session); 249 } catch (final CancelledKeyException ex) { 250 queueClosedSession(session); 251 } catch (final RuntimeException ex) { 252 handleRuntimeException(ex); 253 } 254 } 255 256 /** 257 * Processes timed out I/O session. This method dispatches the event 258 * notification to the {@link IOEventDispatch#timeout(IOSession)} method. 259 */ 260 @Override 261 protected void sessionTimedOut(final IOSession session) { 262 try { 263 this.eventDispatch.timeout(session); 264 } catch (final CancelledKeyException ex) { 265 queueClosedSession(session); 266 } catch (final RuntimeException ex) { 267 handleRuntimeException(ex); 268 } 269 } 270 271 /** 272 * Processes closed I/O session. This method dispatches the event 273 * notification to the {@link IOEventDispatch#disconnected(IOSession)} 274 * method. 275 */ 276 @Override 277 protected void sessionClosed(final IOSession session) { 278 try { 279 this.eventDispatch.disconnected(session); 280 } catch (final CancelledKeyException ex) { 281 // ignore 282 } catch (final RuntimeException ex) { 283 handleRuntimeException(ex); 284 } 285 } 286 287}