001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027package org.apache.http.pool; 028 029import java.io.IOException; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.LinkedList; 035import java.util.Map; 036import java.util.Set; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicReference; 043import java.util.concurrent.locks.Condition; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046 047import org.apache.http.annotation.Contract; 048import org.apache.http.annotation.ThreadingBehavior; 049import org.apache.http.concurrent.FutureCallback; 050import org.apache.http.util.Args; 051import org.apache.http.util.Asserts; 052 053/** 054 * Abstract synchronous (blocking) pool of connections. 055 * <p> 056 * Please note that this class does not maintain its own pool of execution {@link Thread}s. 057 * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)} 058 * method on the {@link Future} object returned by the 059 * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation 060 * to complete. 061 * 062 * @param <T> the route type that represents the opposite endpoint of a pooled 063 * connection. 064 * @param <C> the connection type. 065 * @param <E> the type of the pool entry containing a pooled connection. 066 * @since 4.2 067 */ 068@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) 069public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>> 070 implements ConnPool<T, E>, ConnPoolControl<T> { 071 072 private final Lock lock; 073 private final Condition condition; 074 private final ConnFactory<T, C> connFactory; 075 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool; 076 private final Set<E> leased; 077 private final LinkedList<E> available; 078 private final LinkedList<Future<E>> pending; 079 private final Map<T, Integer> maxPerRoute; 080 081 private volatile boolean isShutDown; 082 private volatile int defaultMaxPerRoute; 083 private volatile int maxTotal; 084 private volatile int validateAfterInactivity; 085 086 public AbstractConnPool( 087 final ConnFactory<T, C> connFactory, 088 final int defaultMaxPerRoute, 089 final int maxTotal) { 090 super(); 091 this.connFactory = Args.notNull(connFactory, "Connection factory"); 092 this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value"); 093 this.maxTotal = Args.positive(maxTotal, "Max total value"); 094 this.lock = new ReentrantLock(); 095 this.condition = this.lock.newCondition(); 096 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>(); 097 this.leased = new HashSet<E>(); 098 this.available = new LinkedList<E>(); 099 this.pending = new LinkedList<Future<E>>(); 100 this.maxPerRoute = new HashMap<T, Integer>(); 101 } 102 103 /** 104 * Creates a new entry for the given connection with the given route. 105 */ 106 protected abstract E createEntry(T route, C conn); 107 108 /** 109 * @since 4.3 110 */ 111 protected void onLease(final E entry) { 112 } 113 114 /** 115 * @since 4.3 116 */ 117 protected void onRelease(final E entry) { 118 } 119 120 /** 121 * @since 4.4 122 */ 123 protected void onReuse(final E entry) { 124 } 125 126 /** 127 * @since 4.4 128 */ 129 protected boolean validate(final E entry) { 130 return true; 131 } 132 133 public boolean isShutdown() { 134 return this.isShutDown; 135 } 136 137 /** 138 * Shuts down the pool. 139 */ 140 public void shutdown() throws IOException { 141 if (this.isShutDown) { 142 return ; 143 } 144 this.isShutDown = true; 145 this.lock.lock(); 146 try { 147 for (final E entry: this.available) { 148 entry.close(); 149 } 150 for (final E entry: this.leased) { 151 entry.close(); 152 } 153 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) { 154 pool.shutdown(); 155 } 156 this.routeToPool.clear(); 157 this.leased.clear(); 158 this.available.clear(); 159 } finally { 160 this.lock.unlock(); 161 } 162 } 163 164 private RouteSpecificPool<T, C, E> getPool(final T route) { 165 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route); 166 if (pool == null) { 167 pool = new RouteSpecificPool<T, C, E>(route) { 168 169 @Override 170 protected E createEntry(final C conn) { 171 return AbstractConnPool.this.createEntry(route, conn); 172 } 173 174 }; 175 this.routeToPool.put(route, pool); 176 } 177 return pool; 178 } 179 180 /** 181 * {@inheritDoc} 182 * <p> 183 * Please note that this class does not maintain its own pool of execution 184 * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()} 185 * or {@link Future#get(long, TimeUnit)} method on the {@link Future} 186 * returned by this method in order for the lease operation to complete. 187 */ 188 @Override 189 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) { 190 Args.notNull(route, "Route"); 191 Asserts.check(!this.isShutDown, "Connection pool shut down"); 192 193 return new Future<E>() { 194 195 private final AtomicBoolean cancelled = new AtomicBoolean(false); 196 private final AtomicBoolean done = new AtomicBoolean(false); 197 private final AtomicReference<E> entryRef = new AtomicReference<E>(null); 198 199 @Override 200 public boolean cancel(final boolean mayInterruptIfRunning) { 201 if (cancelled.compareAndSet(false, true)) { 202 done.set(true); 203 lock.lock(); 204 try { 205 condition.signalAll(); 206 } finally { 207 lock.unlock(); 208 } 209 if (callback != null) { 210 callback.cancelled(); 211 } 212 return true; 213 } else { 214 return false; 215 } 216 } 217 218 @Override 219 public boolean isCancelled() { 220 return cancelled.get(); 221 } 222 223 @Override 224 public boolean isDone() { 225 return done.get(); 226 } 227 228 @Override 229 public E get() throws InterruptedException, ExecutionException { 230 try { 231 return get(0L, TimeUnit.MILLISECONDS); 232 } catch (final TimeoutException ex) { 233 throw new ExecutionException(ex); 234 } 235 } 236 237 @Override 238 public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException { 239 final E entry = entryRef.get(); 240 if (entry != null) { 241 return entry; 242 } 243 synchronized (this) { 244 try { 245 for (;;) { 246 final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this); 247 if (validateAfterInactivity > 0) { 248 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) { 249 if (!validate(leasedEntry)) { 250 leasedEntry.close(); 251 release(leasedEntry, false); 252 continue; 253 } 254 } 255 } 256 entryRef.set(leasedEntry); 257 done.set(true); 258 onLease(leasedEntry); 259 if (callback != null) { 260 callback.completed(leasedEntry); 261 } 262 return leasedEntry; 263 } 264 } catch (final IOException ex) { 265 done.set(true); 266 if (callback != null) { 267 callback.failed(ex); 268 } 269 throw new ExecutionException(ex); 270 } 271 } 272 } 273 274 }; 275 } 276 277 /** 278 * Attempts to lease a connection for the given route and with the given 279 * state from the pool. 280 * <p> 281 * Please note that this class does not maintain its own pool of execution 282 * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()} 283 * or {@link Future#get(long, TimeUnit)} method on the {@link Future} 284 * returned by this method in order for the lease operation to complete. 285 * 286 * @param route route of the connection. 287 * @param state arbitrary object that represents a particular state 288 * (usually a security principal or a unique token identifying 289 * the user whose credentials have been used while establishing the connection). 290 * May be {@code null}. 291 * @return future for a leased pool entry. 292 */ 293 public Future<E> lease(final T route, final Object state) { 294 return lease(route, state, null); 295 } 296 297 private E getPoolEntryBlocking( 298 final T route, final Object state, 299 final long timeout, final TimeUnit tunit, 300 final Future<E> future) throws IOException, InterruptedException, TimeoutException { 301 302 Date deadline = null; 303 if (timeout > 0) { 304 deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout)); 305 } 306 this.lock.lock(); 307 try { 308 final RouteSpecificPool<T, C, E> pool = getPool(route); 309 E entry; 310 for (;;) { 311 Asserts.check(!this.isShutDown, "Connection pool shut down"); 312 for (;;) { 313 entry = pool.getFree(state); 314 if (entry == null) { 315 break; 316 } 317 if (entry.isExpired(System.currentTimeMillis())) { 318 entry.close(); 319 } 320 if (entry.isClosed()) { 321 this.available.remove(entry); 322 pool.free(entry, false); 323 } else { 324 break; 325 } 326 } 327 if (entry != null) { 328 this.available.remove(entry); 329 this.leased.add(entry); 330 onReuse(entry); 331 return entry; 332 } 333 334 // New connection is needed 335 final int maxPerRoute = getMax(route); 336 // Shrink the pool prior to allocating a new connection 337 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); 338 if (excess > 0) { 339 for (int i = 0; i < excess; i++) { 340 final E lastUsed = pool.getLastUsed(); 341 if (lastUsed == null) { 342 break; 343 } 344 lastUsed.close(); 345 this.available.remove(lastUsed); 346 pool.remove(lastUsed); 347 } 348 } 349 350 if (pool.getAllocatedCount() < maxPerRoute) { 351 final int totalUsed = this.leased.size(); 352 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); 353 if (freeCapacity > 0) { 354 final int totalAvailable = this.available.size(); 355 if (totalAvailable > freeCapacity - 1) { 356 if (!this.available.isEmpty()) { 357 final E lastUsed = this.available.removeLast(); 358 lastUsed.close(); 359 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); 360 otherpool.remove(lastUsed); 361 } 362 } 363 final C conn = this.connFactory.create(route); 364 entry = pool.add(conn); 365 this.leased.add(entry); 366 return entry; 367 } 368 } 369 370 boolean success = false; 371 try { 372 if (future.isCancelled()) { 373 throw new InterruptedException("Operation interrupted"); 374 } 375 pool.queue(future); 376 this.pending.add(future); 377 if (deadline != null) { 378 success = this.condition.awaitUntil(deadline); 379 } else { 380 this.condition.await(); 381 success = true; 382 } 383 if (future.isCancelled()) { 384 throw new InterruptedException("Operation interrupted"); 385 } 386 } finally { 387 // In case of 'success', we were woken up by the 388 // connection pool and should now have a connection 389 // waiting for us, or else we're shutting down. 390 // Just continue in the loop, both cases are checked. 391 pool.unqueue(future); 392 this.pending.remove(future); 393 } 394 // check for spurious wakeup vs. timeout 395 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) { 396 break; 397 } 398 } 399 throw new TimeoutException("Timeout waiting for connection"); 400 } finally { 401 this.lock.unlock(); 402 } 403 } 404 405 @Override 406 public void release(final E entry, final boolean reusable) { 407 this.lock.lock(); 408 try { 409 if (this.leased.remove(entry)) { 410 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); 411 pool.free(entry, reusable); 412 if (reusable && !this.isShutDown) { 413 this.available.addFirst(entry); 414 } else { 415 entry.close(); 416 } 417 onRelease(entry); 418 Future<E> future = pool.nextPending(); 419 if (future != null) { 420 this.pending.remove(future); 421 } else { 422 future = this.pending.poll(); 423 } 424 if (future != null) { 425 this.condition.signalAll(); 426 } 427 } 428 } finally { 429 this.lock.unlock(); 430 } 431 } 432 433 private int getMax(final T route) { 434 final Integer v = this.maxPerRoute.get(route); 435 if (v != null) { 436 return v.intValue(); 437 } else { 438 return this.defaultMaxPerRoute; 439 } 440 } 441 442 @Override 443 public void setMaxTotal(final int max) { 444 Args.positive(max, "Max value"); 445 this.lock.lock(); 446 try { 447 this.maxTotal = max; 448 } finally { 449 this.lock.unlock(); 450 } 451 } 452 453 @Override 454 public int getMaxTotal() { 455 this.lock.lock(); 456 try { 457 return this.maxTotal; 458 } finally { 459 this.lock.unlock(); 460 } 461 } 462 463 @Override 464 public void setDefaultMaxPerRoute(final int max) { 465 Args.positive(max, "Max per route value"); 466 this.lock.lock(); 467 try { 468 this.defaultMaxPerRoute = max; 469 } finally { 470 this.lock.unlock(); 471 } 472 } 473 474 @Override 475 public int getDefaultMaxPerRoute() { 476 this.lock.lock(); 477 try { 478 return this.defaultMaxPerRoute; 479 } finally { 480 this.lock.unlock(); 481 } 482 } 483 484 @Override 485 public void setMaxPerRoute(final T route, final int max) { 486 Args.notNull(route, "Route"); 487 Args.positive(max, "Max per route value"); 488 this.lock.lock(); 489 try { 490 this.maxPerRoute.put(route, Integer.valueOf(max)); 491 } finally { 492 this.lock.unlock(); 493 } 494 } 495 496 @Override 497 public int getMaxPerRoute(final T route) { 498 Args.notNull(route, "Route"); 499 this.lock.lock(); 500 try { 501 return getMax(route); 502 } finally { 503 this.lock.unlock(); 504 } 505 } 506 507 @Override 508 public PoolStats getTotalStats() { 509 this.lock.lock(); 510 try { 511 return new PoolStats( 512 this.leased.size(), 513 this.pending.size(), 514 this.available.size(), 515 this.maxTotal); 516 } finally { 517 this.lock.unlock(); 518 } 519 } 520 521 @Override 522 public PoolStats getStats(final T route) { 523 Args.notNull(route, "Route"); 524 this.lock.lock(); 525 try { 526 final RouteSpecificPool<T, C, E> pool = getPool(route); 527 return new PoolStats( 528 pool.getLeasedCount(), 529 pool.getPendingCount(), 530 pool.getAvailableCount(), 531 getMax(route)); 532 } finally { 533 this.lock.unlock(); 534 } 535 } 536 537 /** 538 * Returns snapshot of all knows routes 539 * @return the set of routes 540 * 541 * @since 4.4 542 */ 543 public Set<T> getRoutes() { 544 this.lock.lock(); 545 try { 546 return new HashSet<T>(routeToPool.keySet()); 547 } finally { 548 this.lock.unlock(); 549 } 550 } 551 552 /** 553 * Enumerates all available connections. 554 * 555 * @since 4.3 556 */ 557 protected void enumAvailable(final PoolEntryCallback<T, C> callback) { 558 this.lock.lock(); 559 try { 560 final Iterator<E> it = this.available.iterator(); 561 while (it.hasNext()) { 562 final E entry = it.next(); 563 callback.process(entry); 564 if (entry.isClosed()) { 565 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); 566 pool.remove(entry); 567 it.remove(); 568 } 569 } 570 purgePoolMap(); 571 } finally { 572 this.lock.unlock(); 573 } 574 } 575 576 /** 577 * Enumerates all leased connections. 578 * 579 * @since 4.3 580 */ 581 protected void enumLeased(final PoolEntryCallback<T, C> callback) { 582 this.lock.lock(); 583 try { 584 final Iterator<E> it = this.leased.iterator(); 585 while (it.hasNext()) { 586 final E entry = it.next(); 587 callback.process(entry); 588 } 589 } finally { 590 this.lock.unlock(); 591 } 592 } 593 594 private void purgePoolMap() { 595 final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator(); 596 while (it.hasNext()) { 597 final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next(); 598 final RouteSpecificPool<T, C, E> pool = entry.getValue(); 599 if (pool.getPendingCount() + pool.getAllocatedCount() == 0) { 600 it.remove(); 601 } 602 } 603 } 604 605 /** 606 * Closes connections that have been idle longer than the given period 607 * of time and evicts them from the pool. 608 * 609 * @param idletime maximum idle time. 610 * @param tunit time unit. 611 */ 612 public void closeIdle(final long idletime, final TimeUnit tunit) { 613 Args.notNull(tunit, "Time unit"); 614 long time = tunit.toMillis(idletime); 615 if (time < 0) { 616 time = 0; 617 } 618 final long deadline = System.currentTimeMillis() - time; 619 enumAvailable(new PoolEntryCallback<T, C>() { 620 621 @Override 622 public void process(final PoolEntry<T, C> entry) { 623 if (entry.getUpdated() <= deadline) { 624 entry.close(); 625 } 626 } 627 628 }); 629 } 630 631 /** 632 * Closes expired connections and evicts them from the pool. 633 */ 634 public void closeExpired() { 635 final long now = System.currentTimeMillis(); 636 enumAvailable(new PoolEntryCallback<T, C>() { 637 638 @Override 639 public void process(final PoolEntry<T, C> entry) { 640 if (entry.isExpired(now)) { 641 entry.close(); 642 } 643 } 644 645 }); 646 } 647 648 /** 649 * @return the number of milliseconds 650 * @since 4.4 651 */ 652 public int getValidateAfterInactivity() { 653 return this.validateAfterInactivity; 654 } 655 656 /** 657 * @param ms the number of milliseconds 658 * @since 4.4 659 */ 660 public void setValidateAfterInactivity(final int ms) { 661 this.validateAfterInactivity = ms; 662 } 663 664 @Override 665 public String toString() { 666 final StringBuilder buffer = new StringBuilder(); 667 buffer.append("[leased: "); 668 buffer.append(this.leased); 669 buffer.append("][available: "); 670 buffer.append(this.available); 671 buffer.append("][pending: "); 672 buffer.append(this.pending); 673 buffer.append("]"); 674 return buffer.toString(); 675 } 676 677}