001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027
028package org.apache.http.impl.nio.reactor;
029
030import java.io.IOException;
031import java.net.InetSocketAddress;
032import java.net.Socket;
033import java.net.SocketAddress;
034import java.nio.channels.ByteChannel;
035import java.nio.channels.SelectionKey;
036import java.nio.channels.SocketChannel;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.Map;
040
041import org.apache.http.annotation.ThreadingBehavior;
042import org.apache.http.annotation.Contract;
043import org.apache.http.nio.reactor.IOSession;
044import org.apache.http.nio.reactor.SessionBufferStatus;
045import org.apache.http.nio.reactor.SocketAccessor;
046import org.apache.http.util.Args;
047
048/**
049 * Default implementation of {@link IOSession}.
050 *
051 * @since 4.0
052 */
053@Contract(threading = ThreadingBehavior.SAFE)
054public class IOSessionImpl implements IOSession, SocketAccessor {
055
056    private final SelectionKey key;
057    private final ByteChannel channel;
058    private final Map<String, Object> attributes;
059    private final InterestOpsCallback interestOpsCallback;
060    private final SessionClosedCallback sessionClosedCallback;
061
062    private volatile int status;
063    private volatile int currentEventMask;
064    private volatile SessionBufferStatus bufferStatus;
065    private volatile int socketTimeout;
066
067    private final long startedTime;
068
069    private volatile long lastReadTime;
070    private volatile long lastWriteTime;
071    private volatile long lastAccessTime;
072
073    /**
074     * Creates new instance of IOSessionImpl.
075     *
076     * @param key the selection key.
077     * @param interestOpsCallback interestOps callback.
078     * @param sessionClosedCallback session closed callback.
079     *
080     * @since 4.1
081     */
082    public IOSessionImpl(
083            final SelectionKey key,
084            final InterestOpsCallback interestOpsCallback,
085            final SessionClosedCallback sessionClosedCallback) {
086        super();
087        Args.notNull(key, "Selection key");
088        this.key = key;
089        this.channel = (ByteChannel) this.key.channel();
090        this.interestOpsCallback = interestOpsCallback;
091        this.sessionClosedCallback = sessionClosedCallback;
092        this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
093        this.currentEventMask = key.interestOps();
094        this.socketTimeout = 0;
095        this.status = ACTIVE;
096        final long now = System.currentTimeMillis();
097        this.startedTime = now;
098        this.lastReadTime = now;
099        this.lastWriteTime = now;
100        this.lastAccessTime = now;
101    }
102
103    /**
104     * Creates new instance of IOSessionImpl.
105     *
106     * @param key the selection key.
107     * @param sessionClosedCallback session closed callback.
108     */
109    public IOSessionImpl(
110            final SelectionKey key,
111            final SessionClosedCallback sessionClosedCallback) {
112        this(key, null, sessionClosedCallback);
113    }
114
115    @Override
116    public ByteChannel channel() {
117        return this.channel;
118    }
119
120    @Override
121    public SocketAddress getLocalAddress() {
122        if (this.channel instanceof SocketChannel) {
123            return ((SocketChannel)this.channel).socket().getLocalSocketAddress();
124        } else {
125            return null;
126        }
127    }
128
129    @Override
130    public SocketAddress getRemoteAddress() {
131        if (this.channel instanceof SocketChannel) {
132            return ((SocketChannel)this.channel).socket().getRemoteSocketAddress();
133        } else {
134            return null;
135        }
136    }
137
138    @Override
139    public int getEventMask() {
140        return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
141    }
142
143    @Override
144    public synchronized void setEventMask(final int ops) {
145        if (this.status == CLOSED) {
146            return;
147        }
148        if (this.interestOpsCallback != null) {
149            // update the current event mask
150            this.currentEventMask = ops;
151
152            // local variable
153            final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
154
155            // add this operation to the interestOps() queue
156            this.interestOpsCallback.addInterestOps(entry);
157        } else {
158            this.key.interestOps(ops);
159        }
160        this.key.selector().wakeup();
161    }
162
163    @Override
164    public synchronized void setEvent(final int op) {
165        if (this.status == CLOSED) {
166            return;
167        }
168        if (this.interestOpsCallback != null) {
169            // update the current event mask
170            this.currentEventMask |= op;
171
172            // local variable
173            final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
174
175            // add this operation to the interestOps() queue
176            this.interestOpsCallback.addInterestOps(entry);
177        } else {
178            final int ops = this.key.interestOps();
179            this.key.interestOps(ops | op);
180        }
181        this.key.selector().wakeup();
182    }
183
184    @Override
185    public synchronized void clearEvent(final int op) {
186        if (this.status == CLOSED) {
187            return;
188        }
189        if (this.interestOpsCallback != null) {
190            // update the current event mask
191            this.currentEventMask &= ~op;
192
193            // local variable
194            final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
195
196            // add this operation to the interestOps() queue
197            this.interestOpsCallback.addInterestOps(entry);
198        } else {
199            final int ops = this.key.interestOps();
200            this.key.interestOps(ops & ~op);
201        }
202        this.key.selector().wakeup();
203    }
204
205    @Override
206    public int getSocketTimeout() {
207        return this.socketTimeout;
208    }
209
210    @Override
211    public void setSocketTimeout(final int timeout) {
212        this.socketTimeout = timeout;
213        this.lastAccessTime = System.currentTimeMillis();
214    }
215
216    @Override
217    public void close() {
218        synchronized (this) {
219            if (this.status == CLOSED) {
220                return;
221            }
222            this.status = CLOSED;
223        }
224        synchronized (this.key) {
225            this.key.cancel();
226            try {
227                this.key.channel().close();
228            } catch (final IOException ex) {
229                // Munching exceptions is not nice
230                // but in this case it is justified
231            }
232            if (this.sessionClosedCallback != null) {
233                this.sessionClosedCallback.sessionClosed(this);
234            }
235            if (this.key.selector().isOpen()) {
236                this.key.selector().wakeup();
237            }
238        }
239    }
240
241    @Override
242    public int getStatus() {
243        return this.status;
244    }
245
246    @Override
247    public boolean isClosed() {
248        return this.status == CLOSED;
249    }
250
251    @Override
252    public void shutdown() {
253        // For this type of session, a close() does exactly
254        // what we need and nothing more.
255        close();
256    }
257
258    @Override
259    public boolean hasBufferedInput() {
260        final SessionBufferStatus buffStatus = this.bufferStatus;
261        return buffStatus != null && buffStatus.hasBufferedInput();
262    }
263
264    @Override
265    public boolean hasBufferedOutput() {
266        final SessionBufferStatus buffStatus = this.bufferStatus;
267        return buffStatus != null && buffStatus.hasBufferedOutput();
268    }
269
270    @Override
271    public void setBufferStatus(final SessionBufferStatus bufferStatus) {
272        this.bufferStatus = bufferStatus;
273    }
274
275    @Override
276    public Object getAttribute(final String name) {
277        return this.attributes.get(name);
278    }
279
280    @Override
281    public Object removeAttribute(final String name) {
282        return this.attributes.remove(name);
283    }
284
285    @Override
286    public void setAttribute(final String name, final Object obj) {
287        this.attributes.put(name, obj);
288    }
289
290    public long getStartedTime() {
291        return this.startedTime;
292    }
293
294    public long getLastReadTime() {
295        return this.lastReadTime;
296    }
297
298    public long getLastWriteTime() {
299        return this.lastWriteTime;
300    }
301
302    public long getLastAccessTime() {
303        return this.lastAccessTime;
304    }
305
306    void resetLastRead() {
307        final long now = System.currentTimeMillis();
308        this.lastReadTime = now;
309        this.lastAccessTime = now;
310    }
311
312    void resetLastWrite() {
313        final long now = System.currentTimeMillis();
314        this.lastWriteTime = now;
315        this.lastAccessTime = now;
316    }
317
318    private static void formatOps(final StringBuilder buffer, final int ops) {
319        if ((ops & SelectionKey.OP_READ) > 0) {
320            buffer.append('r');
321        }
322        if ((ops & SelectionKey.OP_WRITE) > 0) {
323            buffer.append('w');
324        }
325        if ((ops & SelectionKey.OP_ACCEPT) > 0) {
326            buffer.append('a');
327        }
328        if ((ops & SelectionKey.OP_CONNECT) > 0) {
329            buffer.append('c');
330        }
331    }
332
333    private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
334        if (socketAddress instanceof InetSocketAddress) {
335            final InetSocketAddress addr = ((InetSocketAddress) socketAddress);
336            buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
337                addr.getAddress())
338            .append(':')
339            .append(addr.getPort());
340        } else {
341            buffer.append(socketAddress);
342        }
343    }
344
345    @Override
346    public String toString() {
347        final StringBuilder buffer = new StringBuilder();
348        synchronized (this.key) {
349            final SocketAddress remoteAddress = getRemoteAddress();
350            final SocketAddress localAddress = getLocalAddress();
351            if (remoteAddress != null && localAddress != null) {
352                formatAddress(buffer, localAddress);
353                buffer.append("<->");
354                formatAddress(buffer, remoteAddress);
355            }
356            buffer.append('[');
357            switch (this.status) {
358            case ACTIVE:
359                buffer.append("ACTIVE");
360                break;
361            case CLOSING:
362                buffer.append("CLOSING");
363                break;
364            case CLOSED:
365                buffer.append("CLOSED");
366                break;
367            }
368            buffer.append("][");
369            if (this.key.isValid()) {
370                formatOps(buffer, this.interestOpsCallback != null ?
371                        this.currentEventMask : this.key.interestOps());
372                buffer.append(':');
373                formatOps(buffer, this.key.readyOps());
374            }
375        }
376        buffer.append(']');
377        return new String(buffer);
378    }
379
380    @Override
381    public Socket getSocket() {
382        if (this.channel instanceof SocketChannel) {
383            return ((SocketChannel) this.channel).socket();
384        } else {
385            return null;
386        }
387    }
388
389}