001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.reactor; 029 030import java.io.IOException; 031import java.net.InetSocketAddress; 032import java.net.Socket; 033import java.net.SocketAddress; 034import java.nio.channels.ByteChannel; 035import java.nio.channels.SelectionKey; 036import java.nio.channels.SocketChannel; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.Map; 040 041import org.apache.http.annotation.ThreadingBehavior; 042import org.apache.http.annotation.Contract; 043import org.apache.http.nio.reactor.IOSession; 044import org.apache.http.nio.reactor.SessionBufferStatus; 045import org.apache.http.nio.reactor.SocketAccessor; 046import org.apache.http.util.Args; 047 048/** 049 * Default implementation of {@link IOSession}. 050 * 051 * @since 4.0 052 */ 053@Contract(threading = ThreadingBehavior.SAFE) 054public class IOSessionImpl implements IOSession, SocketAccessor { 055 056 private final SelectionKey key; 057 private final ByteChannel channel; 058 private final Map<String, Object> attributes; 059 private final InterestOpsCallback interestOpsCallback; 060 private final SessionClosedCallback sessionClosedCallback; 061 062 private volatile int status; 063 private volatile int currentEventMask; 064 private volatile SessionBufferStatus bufferStatus; 065 private volatile int socketTimeout; 066 067 private final long startedTime; 068 069 private volatile long lastReadTime; 070 private volatile long lastWriteTime; 071 private volatile long lastAccessTime; 072 073 /** 074 * Creates new instance of IOSessionImpl. 075 * 076 * @param key the selection key. 077 * @param interestOpsCallback interestOps callback. 078 * @param sessionClosedCallback session closed callback. 079 * 080 * @since 4.1 081 */ 082 public IOSessionImpl( 083 final SelectionKey key, 084 final InterestOpsCallback interestOpsCallback, 085 final SessionClosedCallback sessionClosedCallback) { 086 super(); 087 Args.notNull(key, "Selection key"); 088 this.key = key; 089 this.channel = (ByteChannel) this.key.channel(); 090 this.interestOpsCallback = interestOpsCallback; 091 this.sessionClosedCallback = sessionClosedCallback; 092 this.attributes = Collections.synchronizedMap(new HashMap<String, Object>()); 093 this.currentEventMask = key.interestOps(); 094 this.socketTimeout = 0; 095 this.status = ACTIVE; 096 final long now = System.currentTimeMillis(); 097 this.startedTime = now; 098 this.lastReadTime = now; 099 this.lastWriteTime = now; 100 this.lastAccessTime = now; 101 } 102 103 /** 104 * Creates new instance of IOSessionImpl. 105 * 106 * @param key the selection key. 107 * @param sessionClosedCallback session closed callback. 108 */ 109 public IOSessionImpl( 110 final SelectionKey key, 111 final SessionClosedCallback sessionClosedCallback) { 112 this(key, null, sessionClosedCallback); 113 } 114 115 @Override 116 public ByteChannel channel() { 117 return this.channel; 118 } 119 120 @Override 121 public SocketAddress getLocalAddress() { 122 if (this.channel instanceof SocketChannel) { 123 return ((SocketChannel)this.channel).socket().getLocalSocketAddress(); 124 } else { 125 return null; 126 } 127 } 128 129 @Override 130 public SocketAddress getRemoteAddress() { 131 if (this.channel instanceof SocketChannel) { 132 return ((SocketChannel)this.channel).socket().getRemoteSocketAddress(); 133 } else { 134 return null; 135 } 136 } 137 138 @Override 139 public int getEventMask() { 140 return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps(); 141 } 142 143 @Override 144 public synchronized void setEventMask(final int ops) { 145 if (this.status == CLOSED) { 146 return; 147 } 148 if (this.interestOpsCallback != null) { 149 // update the current event mask 150 this.currentEventMask = ops; 151 152 // local variable 153 final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask); 154 155 // add this operation to the interestOps() queue 156 this.interestOpsCallback.addInterestOps(entry); 157 } else { 158 this.key.interestOps(ops); 159 } 160 this.key.selector().wakeup(); 161 } 162 163 @Override 164 public synchronized void setEvent(final int op) { 165 if (this.status == CLOSED) { 166 return; 167 } 168 if (this.interestOpsCallback != null) { 169 // update the current event mask 170 this.currentEventMask |= op; 171 172 // local variable 173 final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask); 174 175 // add this operation to the interestOps() queue 176 this.interestOpsCallback.addInterestOps(entry); 177 } else { 178 final int ops = this.key.interestOps(); 179 this.key.interestOps(ops | op); 180 } 181 this.key.selector().wakeup(); 182 } 183 184 @Override 185 public synchronized void clearEvent(final int op) { 186 if (this.status == CLOSED) { 187 return; 188 } 189 if (this.interestOpsCallback != null) { 190 // update the current event mask 191 this.currentEventMask &= ~op; 192 193 // local variable 194 final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask); 195 196 // add this operation to the interestOps() queue 197 this.interestOpsCallback.addInterestOps(entry); 198 } else { 199 final int ops = this.key.interestOps(); 200 this.key.interestOps(ops & ~op); 201 } 202 this.key.selector().wakeup(); 203 } 204 205 @Override 206 public int getSocketTimeout() { 207 return this.socketTimeout; 208 } 209 210 @Override 211 public void setSocketTimeout(final int timeout) { 212 this.socketTimeout = timeout; 213 this.lastAccessTime = System.currentTimeMillis(); 214 } 215 216 @Override 217 public void close() { 218 synchronized (this) { 219 if (this.status == CLOSED) { 220 return; 221 } 222 this.status = CLOSED; 223 } 224 synchronized (this.key) { 225 this.key.cancel(); 226 try { 227 this.key.channel().close(); 228 } catch (final IOException ex) { 229 // Munching exceptions is not nice 230 // but in this case it is justified 231 } 232 if (this.sessionClosedCallback != null) { 233 this.sessionClosedCallback.sessionClosed(this); 234 } 235 if (this.key.selector().isOpen()) { 236 this.key.selector().wakeup(); 237 } 238 } 239 } 240 241 @Override 242 public int getStatus() { 243 return this.status; 244 } 245 246 @Override 247 public boolean isClosed() { 248 return this.status == CLOSED; 249 } 250 251 @Override 252 public void shutdown() { 253 // For this type of session, a close() does exactly 254 // what we need and nothing more. 255 close(); 256 } 257 258 @Override 259 public boolean hasBufferedInput() { 260 final SessionBufferStatus buffStatus = this.bufferStatus; 261 return buffStatus != null && buffStatus.hasBufferedInput(); 262 } 263 264 @Override 265 public boolean hasBufferedOutput() { 266 final SessionBufferStatus buffStatus = this.bufferStatus; 267 return buffStatus != null && buffStatus.hasBufferedOutput(); 268 } 269 270 @Override 271 public void setBufferStatus(final SessionBufferStatus bufferStatus) { 272 this.bufferStatus = bufferStatus; 273 } 274 275 @Override 276 public Object getAttribute(final String name) { 277 return this.attributes.get(name); 278 } 279 280 @Override 281 public Object removeAttribute(final String name) { 282 return this.attributes.remove(name); 283 } 284 285 @Override 286 public void setAttribute(final String name, final Object obj) { 287 this.attributes.put(name, obj); 288 } 289 290 public long getStartedTime() { 291 return this.startedTime; 292 } 293 294 public long getLastReadTime() { 295 return this.lastReadTime; 296 } 297 298 public long getLastWriteTime() { 299 return this.lastWriteTime; 300 } 301 302 public long getLastAccessTime() { 303 return this.lastAccessTime; 304 } 305 306 void resetLastRead() { 307 final long now = System.currentTimeMillis(); 308 this.lastReadTime = now; 309 this.lastAccessTime = now; 310 } 311 312 void resetLastWrite() { 313 final long now = System.currentTimeMillis(); 314 this.lastWriteTime = now; 315 this.lastAccessTime = now; 316 } 317 318 private static void formatOps(final StringBuilder buffer, final int ops) { 319 if ((ops & SelectionKey.OP_READ) > 0) { 320 buffer.append('r'); 321 } 322 if ((ops & SelectionKey.OP_WRITE) > 0) { 323 buffer.append('w'); 324 } 325 if ((ops & SelectionKey.OP_ACCEPT) > 0) { 326 buffer.append('a'); 327 } 328 if ((ops & SelectionKey.OP_CONNECT) > 0) { 329 buffer.append('c'); 330 } 331 } 332 333 private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) { 334 if (socketAddress instanceof InetSocketAddress) { 335 final InetSocketAddress addr = ((InetSocketAddress) socketAddress); 336 buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() : 337 addr.getAddress()) 338 .append(':') 339 .append(addr.getPort()); 340 } else { 341 buffer.append(socketAddress); 342 } 343 } 344 345 @Override 346 public String toString() { 347 final StringBuilder buffer = new StringBuilder(); 348 synchronized (this.key) { 349 final SocketAddress remoteAddress = getRemoteAddress(); 350 final SocketAddress localAddress = getLocalAddress(); 351 if (remoteAddress != null && localAddress != null) { 352 formatAddress(buffer, localAddress); 353 buffer.append("<->"); 354 formatAddress(buffer, remoteAddress); 355 } 356 buffer.append('['); 357 switch (this.status) { 358 case ACTIVE: 359 buffer.append("ACTIVE"); 360 break; 361 case CLOSING: 362 buffer.append("CLOSING"); 363 break; 364 case CLOSED: 365 buffer.append("CLOSED"); 366 break; 367 } 368 buffer.append("]["); 369 if (this.key.isValid()) { 370 formatOps(buffer, this.interestOpsCallback != null ? 371 this.currentEventMask : this.key.interestOps()); 372 buffer.append(':'); 373 formatOps(buffer, this.key.readyOps()); 374 } 375 } 376 buffer.append(']'); 377 return new String(buffer); 378 } 379 380 @Override 381 public Socket getSocket() { 382 if (this.channel instanceof SocketChannel) { 383 return ((SocketChannel) this.channel).socket(); 384 } else { 385 return null; 386 } 387 } 388 389}