001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.reactor;
029
030import java.io.IOException;
031import java.net.ServerSocket;
032import java.net.SocketAddress;
033import java.nio.channels.CancelledKeyException;
034import java.nio.channels.SelectionKey;
035import java.nio.channels.ServerSocketChannel;
036import java.nio.channels.SocketChannel;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.Queue;
041import java.util.Set;
042import java.util.concurrent.ConcurrentLinkedQueue;
043import java.util.concurrent.ThreadFactory;
044
045import org.apache.http.nio.reactor.IOReactorException;
046import org.apache.http.nio.reactor.IOReactorStatus;
047import org.apache.http.nio.reactor.ListenerEndpoint;
048import org.apache.http.nio.reactor.ListeningIOReactor;
049import org.apache.http.params.HttpParams;
050import org.apache.http.util.Asserts;
051
052/**
053 * Default implementation of {@link ListeningIOReactor}. This class extends
054 * {@link AbstractMultiworkerIOReactor} with capability to listen for incoming
055 * connections.
056 *
057 * @since 4.0
058 */
059@SuppressWarnings("deprecation")
060public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
061        implements ListeningIOReactor {
062
063    private final Queue<ListenerEndpointImpl> requestQueue;
064    private final Set<ListenerEndpointImpl> endpoints;
065    private final Set<SocketAddress> pausedEndpoints;
066
067    private volatile boolean paused;
068
069    /**
070     * Creates an instance of DefaultListeningIOReactor with the given configuration.
071     *
072     * @param config I/O reactor configuration.
073     * @param threadFactory the factory to create threads.
074     *   Can be {@code null}.
075     * @throws IOReactorException in case if a non-recoverable I/O error.
076     *
077     * @since 4.2
078     */
079    public DefaultListeningIOReactor(
080            final IOReactorConfig config,
081            final ThreadFactory threadFactory) throws IOReactorException {
082        super(config, threadFactory);
083        this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
084        this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
085        this.pausedEndpoints = new HashSet<SocketAddress>();
086    }
087
088    /**
089     * Creates an instance of DefaultListeningIOReactor with the given configuration.
090     *
091     * @param config I/O reactor configuration.
092     *   Can be {@code null}.
093     * @throws IOReactorException in case if a non-recoverable I/O error.
094     *
095     * @since 4.2
096     */
097    public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
098        this(config, null);
099    }
100
101    /**
102     * Creates an instance of DefaultListeningIOReactor with default configuration.
103     *
104     * @throws IOReactorException in case if a non-recoverable I/O error.
105     *
106     * @since 4.2
107     */
108    public DefaultListeningIOReactor() throws IOReactorException {
109        this(null, null);
110    }
111
112    /**
113     * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig, ThreadFactory)}
114     */
115    @Deprecated
116    public DefaultListeningIOReactor(
117            final int workerCount,
118            final ThreadFactory threadFactory,
119            final HttpParams params) throws IOReactorException {
120        this(convert(workerCount, params), threadFactory);
121    }
122
123    /**
124     * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig)}
125     */
126    @Deprecated
127    public DefaultListeningIOReactor(
128            final int workerCount,
129            final HttpParams params) throws IOReactorException {
130        this(convert(workerCount, params), null);
131    }
132
133    @Override
134    protected void cancelRequests() throws IOReactorException {
135        ListenerEndpointImpl request;
136        while ((request = this.requestQueue.poll()) != null) {
137            request.cancel();
138        }
139    }
140
141    @Override
142    protected void processEvents(final int readyCount) throws IOReactorException {
143        if (!this.paused) {
144            processSessionRequests();
145        }
146
147        if (readyCount > 0) {
148            final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
149            for (final SelectionKey key : selectedKeys) {
150
151                processEvent(key);
152
153            }
154            selectedKeys.clear();
155        }
156    }
157
158    private void processEvent(final SelectionKey key)
159            throws IOReactorException {
160        try {
161
162            if (key.isAcceptable()) {
163
164                final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
165                for (;;) {
166                    SocketChannel socketChannel = null;
167                    try {
168                        socketChannel = serverChannel.accept();
169                    } catch (final IOException ex) {
170                        if (this.exceptionHandler == null ||
171                                !this.exceptionHandler.handle(ex)) {
172                            throw new IOReactorException(
173                                    "Failure accepting connection", ex);
174                        }
175                    }
176                    if (socketChannel == null) {
177                        break;
178                    }
179                    try {
180                        prepareSocket(socketChannel.socket());
181                    } catch (final IOException ex) {
182                        if (this.exceptionHandler == null ||
183                                !this.exceptionHandler.handle(ex)) {
184                            throw new IOReactorException(
185                                    "Failure initalizing socket", ex);
186                        }
187                    }
188                    final ChannelEntry entry = new ChannelEntry(socketChannel);
189                    addChannel(entry);
190                }
191            }
192
193        } catch (final CancelledKeyException ex) {
194            final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
195            this.endpoints.remove(endpoint);
196            key.attach(null);
197        }
198    }
199
200    private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
201        return new ListenerEndpointImpl(
202                address,
203                new ListenerEndpointClosedCallback() {
204
205                    @Override
206                    public void endpointClosed(final ListenerEndpoint endpoint) {
207                        endpoints.remove(endpoint);
208                    }
209
210                });
211    }
212
213    @Override
214    public ListenerEndpoint listen(final SocketAddress address) {
215        Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
216                "I/O reactor has been shut down");
217        final ListenerEndpointImpl request = createEndpoint(address);
218        this.requestQueue.add(request);
219        this.selector.wakeup();
220        return request;
221    }
222
223    private void processSessionRequests() throws IOReactorException {
224        ListenerEndpointImpl request;
225        while ((request = this.requestQueue.poll()) != null) {
226            final SocketAddress address = request.getAddress();
227            final ServerSocketChannel serverChannel;
228            try {
229                serverChannel = ServerSocketChannel.open();
230            } catch (final IOException ex) {
231                throw new IOReactorException("Failure opening server socket", ex);
232            }
233            try {
234                final ServerSocket socket = serverChannel.socket();
235                socket.setReuseAddress(this.config.isSoReuseAddress());
236                if (this.config.getSoTimeout() > 0) {
237                    socket.setSoTimeout(this.config.getSoTimeout());
238                }
239                if (this.config.getRcvBufSize() > 0) {
240                    socket.setReceiveBufferSize(this.config.getRcvBufSize());
241                }
242                serverChannel.configureBlocking(false);
243                socket.bind(address, this.config.getBacklogSize());
244            } catch (final IOException ex) {
245                closeChannel(serverChannel);
246                request.failed(ex);
247                if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
248                    throw new IOReactorException("Failure binding socket to address "
249                            + address, ex);
250                } else {
251                    return;
252                }
253            }
254            try {
255                final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
256                key.attach(request);
257                request.setKey(key);
258            } catch (final IOException ex) {
259                closeChannel(serverChannel);
260                throw new IOReactorException("Failure registering channel " +
261                        "with the selector", ex);
262            }
263
264            this.endpoints.add(request);
265            request.completed(serverChannel.socket().getLocalSocketAddress());
266        }
267    }
268
269    @Override
270    public Set<ListenerEndpoint> getEndpoints() {
271        final Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
272        synchronized (this.endpoints) {
273            final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
274            while (it.hasNext()) {
275                final ListenerEndpoint endpoint = it.next();
276                if (!endpoint.isClosed()) {
277                    set.add(endpoint);
278                } else {
279                    it.remove();
280                }
281            }
282        }
283        return set;
284    }
285
286    @Override
287    public void pause() throws IOException {
288        if (this.paused) {
289            return;
290        }
291        this.paused = true;
292        synchronized (this.endpoints) {
293            for (final ListenerEndpointImpl endpoint : this.endpoints) {
294                if (!endpoint.isClosed()) {
295                    endpoint.close();
296                    this.pausedEndpoints.add(endpoint.getAddress());
297                }
298            }
299            this.endpoints.clear();
300        }
301    }
302
303    @Override
304    public void resume() throws IOException {
305        if (!this.paused) {
306            return;
307        }
308        this.paused = false;
309        for (final SocketAddress address: this.pausedEndpoints) {
310            final ListenerEndpointImpl request = createEndpoint(address);
311            this.requestQueue.add(request);
312        }
313        this.pausedEndpoints.clear();
314        this.selector.wakeup();
315    }
316
317}