001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.reactor; 029 030import java.io.IOException; 031import java.net.InetSocketAddress; 032import java.net.Socket; 033import java.net.SocketAddress; 034import java.net.UnknownHostException; 035import java.nio.channels.CancelledKeyException; 036import java.nio.channels.SelectionKey; 037import java.nio.channels.SocketChannel; 038import java.util.Queue; 039import java.util.Set; 040import java.util.concurrent.ConcurrentLinkedQueue; 041import java.util.concurrent.ThreadFactory; 042 043import org.apache.http.nio.reactor.ConnectingIOReactor; 044import org.apache.http.nio.reactor.IOReactorException; 045import org.apache.http.nio.reactor.IOReactorStatus; 046import org.apache.http.nio.reactor.SessionRequest; 047import org.apache.http.nio.reactor.SessionRequestCallback; 048import org.apache.http.params.HttpParams; 049import org.apache.http.util.Asserts; 050 051/** 052 * Default implementation of {@link ConnectingIOReactor}. This class extends 053 * {@link AbstractMultiworkerIOReactor} with capability to connect to remote 054 * hosts. 055 * 056 * @since 4.0 057 */ 058@SuppressWarnings("deprecation") 059public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor 060 implements ConnectingIOReactor { 061 062 private final Queue<SessionRequestImpl> requestQueue; 063 064 private long lastTimeoutCheck; 065 066 /** 067 * Creates an instance of DefaultConnectingIOReactor with the given configuration. 068 * 069 * @param config I/O reactor configuration. 070 * @param threadFactory the factory to create threads. 071 * Can be {@code null}. 072 * @throws IOReactorException in case if a non-recoverable I/O error. 073 * 074 * @since 4.2 075 */ 076 public DefaultConnectingIOReactor( 077 final IOReactorConfig config, 078 final ThreadFactory threadFactory) throws IOReactorException { 079 super(config, threadFactory); 080 this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>(); 081 this.lastTimeoutCheck = System.currentTimeMillis(); 082 } 083 084 /** 085 * Creates an instance of DefaultConnectingIOReactor with the given configuration. 086 * 087 * @param config I/O reactor configuration. 088 * Can be {@code null}. 089 * @throws IOReactorException in case if a non-recoverable I/O error. 090 * 091 * @since 4.2 092 */ 093 public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException { 094 this(config, null); 095 } 096 097 /** 098 * Creates an instance of DefaultConnectingIOReactor with default configuration. 099 * 100 * @throws IOReactorException in case if a non-recoverable I/O error. 101 * 102 * @since 4.2 103 */ 104 public DefaultConnectingIOReactor() throws IOReactorException { 105 this(null, null); 106 } 107 108 /** 109 * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)} 110 */ 111 @Deprecated 112 public DefaultConnectingIOReactor( 113 final int workerCount, 114 final ThreadFactory threadFactory, 115 final HttpParams params) throws IOReactorException { 116 this(convert(workerCount, params), threadFactory); 117 } 118 119 /** 120 * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)} 121 */ 122 @Deprecated 123 public DefaultConnectingIOReactor( 124 final int workerCount, 125 final HttpParams params) throws IOReactorException { 126 this(convert(workerCount, params), null); 127 } 128 129 @Override 130 protected void cancelRequests() throws IOReactorException { 131 SessionRequestImpl request; 132 while ((request = this.requestQueue.poll()) != null) { 133 request.cancel(); 134 } 135 } 136 137 @Override 138 protected void processEvents(final int readyCount) throws IOReactorException { 139 processSessionRequests(); 140 141 if (readyCount > 0) { 142 final Set<SelectionKey> selectedKeys = this.selector.selectedKeys(); 143 for (final SelectionKey key : selectedKeys) { 144 145 processEvent(key); 146 147 } 148 selectedKeys.clear(); 149 } 150 151 final long currentTime = System.currentTimeMillis(); 152 if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) { 153 this.lastTimeoutCheck = currentTime; 154 final Set<SelectionKey> keys = this.selector.keys(); 155 processTimeouts(keys); 156 } 157 } 158 159 private void processEvent(final SelectionKey key) { 160 try { 161 162 if (key.isConnectable()) { 163 164 final SocketChannel channel = (SocketChannel) key.channel(); 165 // Get request handle 166 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); 167 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); 168 169 // Finish connection process 170 try { 171 channel.finishConnect(); 172 } catch (final IOException ex) { 173 sessionRequest.failed(ex); 174 } 175 key.cancel(); 176 key.attach(null); 177 if (!sessionRequest.isCompleted()) { 178 addChannel(new ChannelEntry(channel, sessionRequest)); 179 } else { 180 try { 181 channel.close(); 182 } catch (final IOException ignore) { 183 } 184 } 185 } 186 187 } catch (final CancelledKeyException ex) { 188 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); 189 key.attach(null); 190 if (requestHandle != null) { 191 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); 192 if (sessionRequest != null) { 193 sessionRequest.cancel(); 194 } 195 } 196 } 197 } 198 199 private void processTimeouts(final Set<SelectionKey> keys) { 200 final long now = System.currentTimeMillis(); 201 for (final SelectionKey key : keys) { 202 final Object attachment = key.attachment(); 203 204 if (attachment instanceof SessionRequestHandle) { 205 final SessionRequestHandle handle = (SessionRequestHandle) key.attachment(); 206 final SessionRequestImpl sessionRequest = handle.getSessionRequest(); 207 final int timeout = sessionRequest.getConnectTimeout(); 208 if (timeout > 0) { 209 if (handle.getRequestTime() + timeout < now) { 210 sessionRequest.timeout(); 211 } 212 } 213 } 214 215 } 216 } 217 218 @Override 219 public SessionRequest connect( 220 final SocketAddress remoteAddress, 221 final SocketAddress localAddress, 222 final Object attachment, 223 final SessionRequestCallback callback) { 224 Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0, 225 "I/O reactor has been shut down"); 226 final SessionRequestImpl sessionRequest = new SessionRequestImpl( 227 remoteAddress, localAddress, attachment, callback); 228 sessionRequest.setConnectTimeout(this.config.getConnectTimeout()); 229 230 this.requestQueue.add(sessionRequest); 231 this.selector.wakeup(); 232 233 return sessionRequest; 234 } 235 236 private void validateAddress(final SocketAddress address) throws UnknownHostException { 237 if (address == null) { 238 return; 239 } 240 if (address instanceof InetSocketAddress) { 241 final InetSocketAddress endpoint = (InetSocketAddress) address; 242 if (endpoint.isUnresolved()) { 243 throw new UnknownHostException(endpoint.getHostName()); 244 } 245 } 246 } 247 248 private void processSessionRequests() throws IOReactorException { 249 SessionRequestImpl request; 250 while ((request = this.requestQueue.poll()) != null) { 251 if (request.isCompleted()) { 252 continue; 253 } 254 final SocketChannel socketChannel; 255 try { 256 socketChannel = SocketChannel.open(); 257 } catch (final IOException ex) { 258 request.failed(ex); 259 return; 260 } 261 try { 262 validateAddress(request.getLocalAddress()); 263 validateAddress(request.getRemoteAddress()); 264 265 socketChannel.configureBlocking(false); 266 prepareSocket(socketChannel.socket()); 267 268 if (request.getLocalAddress() != null) { 269 final Socket sock = socketChannel.socket(); 270 sock.setReuseAddress(this.config.isSoReuseAddress()); 271 sock.bind(request.getLocalAddress()); 272 } 273 final boolean connected = socketChannel.connect(request.getRemoteAddress()); 274 if (connected) { 275 final ChannelEntry entry = new ChannelEntry(socketChannel, request); 276 addChannel(entry); 277 continue; 278 } 279 } catch (final IOException ex) { 280 closeChannel(socketChannel); 281 request.failed(ex); 282 return; 283 } 284 285 final SessionRequestHandle requestHandle = new SessionRequestHandle(request); 286 try { 287 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT, 288 requestHandle); 289 request.setKey(key); 290 } catch (final IOException ex) { 291 closeChannel(socketChannel); 292 throw new IOReactorException("Failure registering channel " + 293 "with the selector", ex); 294 } 295 } 296 } 297 298}