001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.reactor;
029
030import java.io.IOException;
031import java.net.InetSocketAddress;
032import java.net.Socket;
033import java.net.SocketAddress;
034import java.net.UnknownHostException;
035import java.nio.channels.CancelledKeyException;
036import java.nio.channels.SelectionKey;
037import java.nio.channels.SocketChannel;
038import java.util.Queue;
039import java.util.Set;
040import java.util.concurrent.ConcurrentLinkedQueue;
041import java.util.concurrent.ThreadFactory;
042
043import org.apache.http.nio.reactor.ConnectingIOReactor;
044import org.apache.http.nio.reactor.IOReactorException;
045import org.apache.http.nio.reactor.IOReactorStatus;
046import org.apache.http.nio.reactor.SessionRequest;
047import org.apache.http.nio.reactor.SessionRequestCallback;
048import org.apache.http.params.HttpParams;
049import org.apache.http.util.Asserts;
050
051/**
052 * Default implementation of {@link ConnectingIOReactor}. This class extends
053 * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
054 * hosts.
055 *
056 * @since 4.0
057 */
058@SuppressWarnings("deprecation")
059public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
060        implements ConnectingIOReactor {
061
062    private final Queue<SessionRequestImpl> requestQueue;
063
064    private long lastTimeoutCheck;
065
066    /**
067     * Creates an instance of DefaultConnectingIOReactor with the given configuration.
068     *
069     * @param config I/O reactor configuration.
070     * @param threadFactory the factory to create threads.
071     *   Can be {@code null}.
072     * @throws IOReactorException in case if a non-recoverable I/O error.
073     *
074     * @since 4.2
075     */
076    public DefaultConnectingIOReactor(
077            final IOReactorConfig config,
078            final ThreadFactory threadFactory) throws IOReactorException {
079        super(config, threadFactory);
080        this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
081        this.lastTimeoutCheck = System.currentTimeMillis();
082    }
083
084    /**
085     * Creates an instance of DefaultConnectingIOReactor with the given configuration.
086     *
087     * @param config I/O reactor configuration.
088     *   Can be {@code null}.
089     * @throws IOReactorException in case if a non-recoverable I/O error.
090     *
091     * @since 4.2
092     */
093    public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
094        this(config, null);
095    }
096
097    /**
098     * Creates an instance of DefaultConnectingIOReactor with default configuration.
099     *
100     * @throws IOReactorException in case if a non-recoverable I/O error.
101     *
102     * @since 4.2
103     */
104    public DefaultConnectingIOReactor() throws IOReactorException {
105        this(null, null);
106    }
107
108    /**
109     * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)}
110     */
111    @Deprecated
112    public DefaultConnectingIOReactor(
113            final int workerCount,
114            final ThreadFactory threadFactory,
115            final HttpParams params) throws IOReactorException {
116        this(convert(workerCount, params), threadFactory);
117    }
118
119    /**
120     * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)}
121     */
122    @Deprecated
123    public DefaultConnectingIOReactor(
124            final int workerCount,
125            final HttpParams params) throws IOReactorException {
126        this(convert(workerCount, params), null);
127    }
128
129    @Override
130    protected void cancelRequests() throws IOReactorException {
131        SessionRequestImpl request;
132        while ((request = this.requestQueue.poll()) != null) {
133            request.cancel();
134        }
135    }
136
137    @Override
138    protected void processEvents(final int readyCount) throws IOReactorException {
139        processSessionRequests();
140
141        if (readyCount > 0) {
142            final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
143            for (final SelectionKey key : selectedKeys) {
144
145                processEvent(key);
146
147            }
148            selectedKeys.clear();
149        }
150
151        final long currentTime = System.currentTimeMillis();
152        if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
153            this.lastTimeoutCheck = currentTime;
154            final Set<SelectionKey> keys = this.selector.keys();
155            processTimeouts(keys);
156        }
157    }
158
159    private void processEvent(final SelectionKey key) {
160        try {
161
162            if (key.isConnectable()) {
163
164                final SocketChannel channel = (SocketChannel) key.channel();
165                // Get request handle
166                final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
167                final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
168
169                // Finish connection process
170                try {
171                    channel.finishConnect();
172                } catch (final IOException ex) {
173                    sessionRequest.failed(ex);
174                }
175                key.cancel();
176                key.attach(null);
177                if (!sessionRequest.isCompleted()) {
178                    addChannel(new ChannelEntry(channel, sessionRequest));
179                } else {
180                    try {
181                        channel.close();
182                    } catch (final IOException ignore) {
183                    }
184                }
185            }
186
187        } catch (final CancelledKeyException ex) {
188            final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
189            key.attach(null);
190            if (requestHandle != null) {
191                final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
192                if (sessionRequest != null) {
193                    sessionRequest.cancel();
194                }
195            }
196        }
197    }
198
199    private void processTimeouts(final Set<SelectionKey> keys) {
200        final long now = System.currentTimeMillis();
201        for (final SelectionKey key : keys) {
202            final Object attachment = key.attachment();
203
204            if (attachment instanceof SessionRequestHandle) {
205                final SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
206                final SessionRequestImpl sessionRequest = handle.getSessionRequest();
207                final int timeout = sessionRequest.getConnectTimeout();
208                if (timeout > 0) {
209                    if (handle.getRequestTime() + timeout < now) {
210                        sessionRequest.timeout();
211                    }
212                }
213            }
214
215        }
216    }
217
218    @Override
219    public SessionRequest connect(
220            final SocketAddress remoteAddress,
221            final SocketAddress localAddress,
222            final Object attachment,
223            final SessionRequestCallback callback) {
224        Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
225            "I/O reactor has been shut down");
226        final SessionRequestImpl sessionRequest = new SessionRequestImpl(
227                remoteAddress, localAddress, attachment, callback);
228        sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
229
230        this.requestQueue.add(sessionRequest);
231        this.selector.wakeup();
232
233        return sessionRequest;
234    }
235
236    private void validateAddress(final SocketAddress address) throws UnknownHostException {
237        if (address == null) {
238            return;
239        }
240        if (address instanceof InetSocketAddress) {
241            final InetSocketAddress endpoint = (InetSocketAddress) address;
242            if (endpoint.isUnresolved()) {
243                throw new UnknownHostException(endpoint.getHostName());
244            }
245        }
246    }
247
248    private void processSessionRequests() throws IOReactorException {
249        SessionRequestImpl request;
250        while ((request = this.requestQueue.poll()) != null) {
251            if (request.isCompleted()) {
252                continue;
253            }
254            final SocketChannel socketChannel;
255            try {
256                socketChannel = SocketChannel.open();
257            } catch (final IOException ex) {
258                request.failed(ex);
259                return;
260            }
261            try {
262                validateAddress(request.getLocalAddress());
263                validateAddress(request.getRemoteAddress());
264
265                socketChannel.configureBlocking(false);
266                prepareSocket(socketChannel.socket());
267
268                if (request.getLocalAddress() != null) {
269                    final Socket sock = socketChannel.socket();
270                    sock.setReuseAddress(this.config.isSoReuseAddress());
271                    sock.bind(request.getLocalAddress());
272                }
273                final boolean connected = socketChannel.connect(request.getRemoteAddress());
274                if (connected) {
275                    final ChannelEntry entry = new ChannelEntry(socketChannel, request);
276                    addChannel(entry);
277                    continue;
278                }
279            } catch (final IOException ex) {
280                closeChannel(socketChannel);
281                request.failed(ex);
282                return;
283            }
284
285            final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
286            try {
287                final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
288                        requestHandle);
289                request.setKey(key);
290            } catch (final IOException ex) {
291                closeChannel(socketChannel);
292                throw new IOReactorException("Failure registering channel " +
293                        "with the selector", ex);
294            }
295        }
296    }
297
298}