001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027package org.apache.http.nio.pool; 028 029import java.io.IOException; 030import java.net.SocketAddress; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.LinkedList; 035import java.util.ListIterator; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.Future; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.TimeoutException; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046 047import org.apache.http.annotation.Contract; 048import org.apache.http.annotation.ThreadingBehavior; 049import org.apache.http.concurrent.BasicFuture; 050import org.apache.http.concurrent.FutureCallback; 051import org.apache.http.nio.reactor.ConnectingIOReactor; 052import org.apache.http.nio.reactor.IOReactorStatus; 053import org.apache.http.nio.reactor.IOSession; 054import org.apache.http.nio.reactor.SessionRequest; 055import org.apache.http.nio.reactor.SessionRequestCallback; 056import org.apache.http.pool.ConnPool; 057import org.apache.http.pool.ConnPoolControl; 058import org.apache.http.pool.PoolEntry; 059import org.apache.http.pool.PoolEntryCallback; 060import org.apache.http.pool.PoolStats; 061import org.apache.http.util.Args; 062import org.apache.http.util.Asserts; 063import org.apache.http.util.LangUtils; 064 065/** 066 * Abstract non-blocking connection pool. 067 * 068 * @param <T> route 069 * @param <C> connection object 070 * @param <E> pool entry 071 * 072 * @since 4.2 073 */ 074@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) 075public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>> 076 implements ConnPool<T, E>, ConnPoolControl<T> { 077 078 private final ConnectingIOReactor ioreactor; 079 private final NIOConnFactory<T, C> connFactory; 080 private final SocketAddressResolver<T> addressResolver; 081 private final SessionRequestCallback sessionRequestCallback; 082 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool; 083 private final LinkedList<LeaseRequest<T, C, E>> leasingRequests; 084 private final Set<SessionRequest> pending; 085 private final Set<E> leased; 086 private final LinkedList<E> available; 087 private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests; 088 private final Map<T, Integer> maxPerRoute; 089 private final Lock lock; 090 private final AtomicBoolean isShutDown; 091 092 private volatile int defaultMaxPerRoute; 093 private volatile int maxTotal; 094 095 /** 096 * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor, 097 * NIOConnFactory, SocketAddressResolver, int, int)} 098 */ 099 @Deprecated 100 public AbstractNIOConnPool( 101 final ConnectingIOReactor ioreactor, 102 final NIOConnFactory<T, C> connFactory, 103 final int defaultMaxPerRoute, 104 final int maxTotal) { 105 super(); 106 Args.notNull(ioreactor, "I/O reactor"); 107 Args.notNull(connFactory, "Connection factory"); 108 Args.positive(defaultMaxPerRoute, "Max per route value"); 109 Args.positive(maxTotal, "Max total value"); 110 this.ioreactor = ioreactor; 111 this.connFactory = connFactory; 112 this.addressResolver = new SocketAddressResolver<T>() { 113 114 @Override 115 public SocketAddress resolveLocalAddress(final T route) throws IOException { 116 return AbstractNIOConnPool.this.resolveLocalAddress(route); 117 } 118 119 @Override 120 public SocketAddress resolveRemoteAddress(final T route) throws IOException { 121 return AbstractNIOConnPool.this.resolveRemoteAddress(route); 122 } 123 124 }; 125 this.sessionRequestCallback = new InternalSessionRequestCallback(); 126 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>(); 127 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>(); 128 this.pending = new HashSet<SessionRequest>(); 129 this.leased = new HashSet<E>(); 130 this.available = new LinkedList<E>(); 131 this.maxPerRoute = new HashMap<T, Integer>(); 132 this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>(); 133 this.lock = new ReentrantLock(); 134 this.isShutDown = new AtomicBoolean(false); 135 this.defaultMaxPerRoute = defaultMaxPerRoute; 136 this.maxTotal = maxTotal; 137 } 138 139 /** 140 * @since 4.3 141 */ 142 public AbstractNIOConnPool( 143 final ConnectingIOReactor ioreactor, 144 final NIOConnFactory<T, C> connFactory, 145 final SocketAddressResolver<T> addressResolver, 146 final int defaultMaxPerRoute, 147 final int maxTotal) { 148 super(); 149 Args.notNull(ioreactor, "I/O reactor"); 150 Args.notNull(connFactory, "Connection factory"); 151 Args.notNull(addressResolver, "Address resolver"); 152 Args.positive(defaultMaxPerRoute, "Max per route value"); 153 Args.positive(maxTotal, "Max total value"); 154 this.ioreactor = ioreactor; 155 this.connFactory = connFactory; 156 this.addressResolver = addressResolver; 157 this.sessionRequestCallback = new InternalSessionRequestCallback(); 158 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>(); 159 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>(); 160 this.pending = new HashSet<SessionRequest>(); 161 this.leased = new HashSet<E>(); 162 this.available = new LinkedList<E>(); 163 this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>(); 164 this.maxPerRoute = new HashMap<T, Integer>(); 165 this.lock = new ReentrantLock(); 166 this.isShutDown = new AtomicBoolean(false); 167 this.defaultMaxPerRoute = defaultMaxPerRoute; 168 this.maxTotal = maxTotal; 169 } 170 171 /** 172 * @deprecated (4.3) use {@link SocketAddressResolver} 173 */ 174 @Deprecated 175 protected SocketAddress resolveRemoteAddress(final T route) { 176 return null; 177 } 178 179 /** 180 * @deprecated (4.3) use {@link SocketAddressResolver} 181 */ 182 @Deprecated 183 protected SocketAddress resolveLocalAddress(final T route) { 184 return null; 185 } 186 187 protected abstract E createEntry(T route, C conn); 188 189 /** 190 * @since 4.3 191 */ 192 protected void onLease(final E entry) { 193 } 194 195 /** 196 * @since 4.3 197 */ 198 protected void onRelease(final E entry) { 199 } 200 201 /** 202 * @since 4.4 203 */ 204 protected void onReuse(final E entry) { 205 } 206 207 public boolean isShutdown() { 208 return this.isShutDown.get(); 209 } 210 211 public void shutdown(final long waitMs) throws IOException { 212 if (this.isShutDown.compareAndSet(false, true)) { 213 fireCallbacks(); 214 this.lock.lock(); 215 try { 216 for (final SessionRequest sessionRequest: this.pending) { 217 sessionRequest.cancel(); 218 } 219 for (final E entry: this.available) { 220 entry.close(); 221 } 222 for (final E entry: this.leased) { 223 entry.close(); 224 } 225 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) { 226 pool.shutdown(); 227 } 228 this.routeToPool.clear(); 229 this.leased.clear(); 230 this.pending.clear(); 231 this.available.clear(); 232 this.leasingRequests.clear(); 233 this.ioreactor.shutdown(waitMs); 234 } finally { 235 this.lock.unlock(); 236 } 237 } 238 } 239 240 private RouteSpecificPool<T, C, E> getPool(final T route) { 241 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route); 242 if (pool == null) { 243 pool = new RouteSpecificPool<T, C, E>(route) { 244 245 @Override 246 protected E createEntry(final T route, final C conn) { 247 return AbstractNIOConnPool.this.createEntry(route, conn); 248 } 249 250 }; 251 this.routeToPool.put(route, pool); 252 } 253 return pool; 254 } 255 256 public Future<E> lease( 257 final T route, final Object state, 258 final long connectTimeout, final TimeUnit tunit, 259 final FutureCallback<E> callback) { 260 return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback); 261 } 262 263 /** 264 * @since 4.3 265 */ 266 public Future<E> lease( 267 final T route, final Object state, 268 final long connectTimeout, final long leaseTimeout, final TimeUnit tunit, 269 final FutureCallback<E> callback) { 270 Args.notNull(route, "Route"); 271 Args.notNull(tunit, "Time unit"); 272 Asserts.check(!this.isShutDown.get(), "Connection pool shut down"); 273 final BasicFuture<E> future = new BasicFuture<E>(callback); 274 final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state, 275 connectTimeout >= 0 ? tunit.toMillis(connectTimeout) : -1, 276 leaseTimeout > 0 ? tunit.toMillis(leaseTimeout) : 0, 277 future); 278 this.lock.lock(); 279 try { 280 final boolean completed = processPendingRequest(leaseRequest); 281 if (!leaseRequest.isDone() && !completed) { 282 this.leasingRequests.add(leaseRequest); 283 } 284 if (leaseRequest.isDone()) { 285 this.completedRequests.add(leaseRequest); 286 } 287 } finally { 288 this.lock.unlock(); 289 } 290 fireCallbacks(); 291 return new Future<E>() { 292 293 @Override 294 public E get() throws InterruptedException, ExecutionException { 295 return future.get(); 296 } 297 298 @Override 299 public E get( 300 final long timeout, 301 final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 302 return future.get(timeout, unit); 303 } 304 305 @Override 306 public boolean cancel(final boolean mayInterruptIfRunning) { 307 try { 308 leaseRequest.cancel(); 309 } finally { 310 return future.cancel(mayInterruptIfRunning); 311 } 312 } 313 314 @Override 315 public boolean isCancelled() { 316 return future.isCancelled(); 317 } 318 319 @Override 320 public boolean isDone() { 321 return future.isDone(); 322 } 323 324 }; 325 } 326 327 @Override 328 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) { 329 return lease(route, state, -1, TimeUnit.MICROSECONDS, callback); 330 } 331 332 public Future<E> lease(final T route, final Object state) { 333 return lease(route, state, -1, TimeUnit.MICROSECONDS, null); 334 } 335 336 @Override 337 public void release(final E entry, final boolean reusable) { 338 if (entry == null) { 339 return; 340 } 341 if (this.isShutDown.get()) { 342 return; 343 } 344 this.lock.lock(); 345 try { 346 if (this.leased.remove(entry)) { 347 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); 348 pool.free(entry, reusable); 349 if (reusable) { 350 this.available.addFirst(entry); 351 onRelease(entry); 352 } else { 353 entry.close(); 354 } 355 processNextPendingRequest(); 356 } 357 } finally { 358 this.lock.unlock(); 359 } 360 fireCallbacks(); 361 } 362 363 private void processPendingRequests() { 364 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator(); 365 while (it.hasNext()) { 366 final LeaseRequest<T, C, E> request = it.next(); 367 final BasicFuture<E> future = request.getFuture(); 368 if (future.isCancelled()) { 369 it.remove(); 370 continue; 371 } 372 final boolean completed = processPendingRequest(request); 373 if (request.isDone() || completed) { 374 it.remove(); 375 } 376 if (request.isDone()) { 377 this.completedRequests.add(request); 378 } 379 } 380 } 381 382 private void processNextPendingRequest() { 383 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator(); 384 while (it.hasNext()) { 385 final LeaseRequest<T, C, E> request = it.next(); 386 final BasicFuture<E> future = request.getFuture(); 387 if (future.isCancelled()) { 388 it.remove(); 389 continue; 390 } 391 final boolean completed = processPendingRequest(request); 392 if (request.isDone() || completed) { 393 it.remove(); 394 } 395 if (request.isDone()) { 396 this.completedRequests.add(request); 397 } 398 if (completed) { 399 return; 400 } 401 } 402 } 403 404 private boolean processPendingRequest(final LeaseRequest<T, C, E> request) { 405 final T route = request.getRoute(); 406 final Object state = request.getState(); 407 final long deadline = request.getDeadline(); 408 409 final long now = System.currentTimeMillis(); 410 if (now > deadline) { 411 request.failed(new TimeoutException()); 412 return false; 413 } 414 415 final RouteSpecificPool<T, C, E> pool = getPool(route); 416 E entry; 417 for (;;) { 418 entry = pool.getFree(state); 419 if (entry == null) { 420 break; 421 } 422 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) { 423 entry.close(); 424 this.available.remove(entry); 425 pool.free(entry, false); 426 } else { 427 break; 428 } 429 } 430 if (entry != null) { 431 this.available.remove(entry); 432 this.leased.add(entry); 433 request.completed(entry); 434 onReuse(entry); 435 onLease(entry); 436 return true; 437 } 438 439 // New connection is needed 440 final int maxPerRoute = getMax(route); 441 // Shrink the pool prior to allocating a new connection 442 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); 443 if (excess > 0) { 444 for (int i = 0; i < excess; i++) { 445 final E lastUsed = pool.getLastUsed(); 446 if (lastUsed == null) { 447 break; 448 } 449 lastUsed.close(); 450 this.available.remove(lastUsed); 451 pool.remove(lastUsed); 452 } 453 } 454 455 if (pool.getAllocatedCount() < maxPerRoute) { 456 final int totalUsed = this.pending.size() + this.leased.size(); 457 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); 458 if (freeCapacity == 0) { 459 return false; 460 } 461 final int totalAvailable = this.available.size(); 462 if (totalAvailable > freeCapacity - 1) { 463 if (!this.available.isEmpty()) { 464 final E lastUsed = this.available.removeLast(); 465 lastUsed.close(); 466 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); 467 otherpool.remove(lastUsed); 468 } 469 } 470 471 final SocketAddress localAddress; 472 final SocketAddress remoteAddress; 473 try { 474 remoteAddress = this.addressResolver.resolveRemoteAddress(route); 475 localAddress = this.addressResolver.resolveLocalAddress(route); 476 } catch (final IOException ex) { 477 request.failed(ex); 478 return false; 479 } 480 481 final SessionRequest sessionRequest = this.ioreactor.connect( 482 remoteAddress, localAddress, route, this.sessionRequestCallback); 483 request.attachSessionRequest(sessionRequest); 484 final long connectTimeout = request.getConnectTimeout(); 485 if (connectTimeout >= 0) { 486 sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE); 487 } 488 this.pending.add(sessionRequest); 489 pool.addPending(sessionRequest, request.getFuture()); 490 return true; 491 } else { 492 return false; 493 } 494 } 495 496 private void fireCallbacks() { 497 LeaseRequest<T, C, E> request; 498 while ((request = this.completedRequests.poll()) != null) { 499 final BasicFuture<E> future = request.getFuture(); 500 final Exception ex = request.getException(); 501 final E result = request.getResult(); 502 boolean successfullyCompleted = false; 503 if (ex != null) { 504 future.failed(ex); 505 } else if (result != null) { 506 if (future.completed(result)) { 507 successfullyCompleted = true; 508 } 509 } else { 510 future.cancel(); 511 } 512 if (!successfullyCompleted) { 513 release(result, true); 514 } 515 } 516 } 517 518 public void validatePendingRequests() { 519 this.lock.lock(); 520 try { 521 final long now = System.currentTimeMillis(); 522 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator(); 523 while (it.hasNext()) { 524 final LeaseRequest<T, C, E> request = it.next(); 525 final BasicFuture<E> future = request.getFuture(); 526 if (future.isCancelled() && !request.isDone()) { 527 it.remove(); 528 } else { 529 final long deadline = request.getDeadline(); 530 if (now > deadline) { 531 request.failed(new TimeoutException()); 532 } 533 if (request.isDone()) { 534 it.remove(); 535 this.completedRequests.add(request); 536 } 537 } 538 } 539 } finally { 540 this.lock.unlock(); 541 } 542 fireCallbacks(); 543 } 544 545 protected void requestCompleted(final SessionRequest request) { 546 if (this.isShutDown.get()) { 547 return; 548 } 549 @SuppressWarnings("unchecked") 550 final 551 T route = (T) request.getAttachment(); 552 this.lock.lock(); 553 try { 554 this.pending.remove(request); 555 final RouteSpecificPool<T, C, E> pool = getPool(route); 556 final IOSession session = request.getSession(); 557 try { 558 final C conn = this.connFactory.create(route, session); 559 final E entry = pool.createEntry(request, conn); 560 if (pool.completed(request, entry)) { 561 this.leased.add(entry); 562 onLease(entry); 563 } else { 564 this.available.add(entry); 565 if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) { 566 processNextPendingRequest(); 567 } 568 } 569 } catch (final IOException ex) { 570 pool.failed(request, ex); 571 } 572 } finally { 573 this.lock.unlock(); 574 } 575 fireCallbacks(); 576 } 577 578 protected void requestCancelled(final SessionRequest request) { 579 if (this.isShutDown.get()) { 580 return; 581 } 582 @SuppressWarnings("unchecked") 583 final 584 T route = (T) request.getAttachment(); 585 this.lock.lock(); 586 try { 587 this.pending.remove(request); 588 final RouteSpecificPool<T, C, E> pool = getPool(route); 589 pool.cancelled(request); 590 if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) { 591 processNextPendingRequest(); 592 } 593 } finally { 594 this.lock.unlock(); 595 } 596 fireCallbacks(); 597 } 598 599 protected void requestFailed(final SessionRequest request) { 600 if (this.isShutDown.get()) { 601 return; 602 } 603 @SuppressWarnings("unchecked") 604 final 605 T route = (T) request.getAttachment(); 606 this.lock.lock(); 607 try { 608 this.pending.remove(request); 609 final RouteSpecificPool<T, C, E> pool = getPool(route); 610 pool.failed(request, request.getException()); 611 processNextPendingRequest(); 612 } finally { 613 this.lock.unlock(); 614 } 615 fireCallbacks(); 616 } 617 618 protected void requestTimeout(final SessionRequest request) { 619 if (this.isShutDown.get()) { 620 return; 621 } 622 @SuppressWarnings("unchecked") 623 final 624 T route = (T) request.getAttachment(); 625 this.lock.lock(); 626 try { 627 this.pending.remove(request); 628 final RouteSpecificPool<T, C, E> pool = getPool(route); 629 pool.timeout(request); 630 processNextPendingRequest(); 631 } finally { 632 this.lock.unlock(); 633 } 634 fireCallbacks(); 635 } 636 637 private int getMax(final T route) { 638 final Integer v = this.maxPerRoute.get(route); 639 if (v != null) { 640 return v.intValue(); 641 } else { 642 return this.defaultMaxPerRoute; 643 } 644 } 645 646 @Override 647 public void setMaxTotal(final int max) { 648 Args.positive(max, "Max value"); 649 this.lock.lock(); 650 try { 651 this.maxTotal = max; 652 } finally { 653 this.lock.unlock(); 654 } 655 } 656 657 @Override 658 public int getMaxTotal() { 659 this.lock.lock(); 660 try { 661 return this.maxTotal; 662 } finally { 663 this.lock.unlock(); 664 } 665 } 666 667 @Override 668 public void setDefaultMaxPerRoute(final int max) { 669 Args.positive(max, "Max value"); 670 this.lock.lock(); 671 try { 672 this.defaultMaxPerRoute = max; 673 } finally { 674 this.lock.unlock(); 675 } 676 } 677 678 @Override 679 public int getDefaultMaxPerRoute() { 680 this.lock.lock(); 681 try { 682 return this.defaultMaxPerRoute; 683 } finally { 684 this.lock.unlock(); 685 } 686 } 687 688 @Override 689 public void setMaxPerRoute(final T route, final int max) { 690 Args.notNull(route, "Route"); 691 Args.positive(max, "Max value"); 692 this.lock.lock(); 693 try { 694 this.maxPerRoute.put(route, Integer.valueOf(max)); 695 } finally { 696 this.lock.unlock(); 697 } 698 } 699 700 @Override 701 public int getMaxPerRoute(final T route) { 702 Args.notNull(route, "Route"); 703 this.lock.lock(); 704 try { 705 return getMax(route); 706 } finally { 707 this.lock.unlock(); 708 } 709 } 710 711 @Override 712 public PoolStats getTotalStats() { 713 this.lock.lock(); 714 try { 715 return new PoolStats( 716 this.leased.size(), 717 this.pending.size(), 718 this.available.size(), 719 this.maxTotal); 720 } finally { 721 this.lock.unlock(); 722 } 723 } 724 725 @Override 726 public PoolStats getStats(final T route) { 727 Args.notNull(route, "Route"); 728 this.lock.lock(); 729 try { 730 final RouteSpecificPool<T, C, E> pool = getPool(route); 731 int pendingCount = 0; 732 for (final LeaseRequest<T, C, E> request: leasingRequests) { 733 if (LangUtils.equals(route, request.getRoute())) { 734 pendingCount++; 735 } 736 } 737 return new PoolStats( 738 pool.getLeasedCount(), 739 pendingCount + pool.getPendingCount(), 740 pool.getAvailableCount(), 741 getMax(route)); 742 } finally { 743 this.lock.unlock(); 744 } 745 } 746 747 /** 748 * Returns snapshot of all knows routes 749 * 750 * @since 4.4 751 */ 752 public Set<T> getRoutes() { 753 this.lock.lock(); 754 try { 755 return new HashSet<T>(routeToPool.keySet()); 756 } finally { 757 this.lock.unlock(); 758 } 759 } 760 761 /** 762 * Enumerates all available connections. 763 * 764 * @since 4.3 765 */ 766 protected void enumAvailable(final PoolEntryCallback<T, C> callback) { 767 this.lock.lock(); 768 try { 769 final Iterator<E> it = this.available.iterator(); 770 while (it.hasNext()) { 771 final E entry = it.next(); 772 callback.process(entry); 773 if (entry.isClosed()) { 774 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); 775 pool.remove(entry); 776 it.remove(); 777 } 778 } 779 processPendingRequests(); 780 purgePoolMap(); 781 } finally { 782 this.lock.unlock(); 783 } 784 } 785 786 /** 787 * Enumerates all leased connections. 788 * 789 * @since 4.3 790 */ 791 protected void enumLeased(final PoolEntryCallback<T, C> callback) { 792 this.lock.lock(); 793 try { 794 final Iterator<E> it = this.leased.iterator(); 795 while (it.hasNext()) { 796 final E entry = it.next(); 797 callback.process(entry); 798 } 799 processPendingRequests(); 800 } finally { 801 this.lock.unlock(); 802 } 803 } 804 805 /** 806 * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)} 807 * or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead. 808 * 809 * @deprecated (4.3.2) 810 */ 811 @Deprecated 812 protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) { 813 while (it.hasNext()) { 814 final E entry = it.next(); 815 callback.process(entry); 816 } 817 processPendingRequests(); 818 } 819 820 private void purgePoolMap() { 821 final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator(); 822 while (it.hasNext()) { 823 final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next(); 824 final RouteSpecificPool<T, C, E> pool = entry.getValue(); 825 if (pool.getAllocatedCount() == 0) { 826 it.remove(); 827 } 828 } 829 } 830 831 public void closeIdle(final long idletime, final TimeUnit tunit) { 832 Args.notNull(tunit, "Time unit"); 833 long time = tunit.toMillis(idletime); 834 if (time < 0) { 835 time = 0; 836 } 837 final long deadline = System.currentTimeMillis() - time; 838 enumAvailable(new PoolEntryCallback<T, C>() { 839 840 @Override 841 public void process(final PoolEntry<T, C> entry) { 842 if (entry.getUpdated() <= deadline) { 843 entry.close(); 844 } 845 } 846 847 }); 848 } 849 850 public void closeExpired() { 851 final long now = System.currentTimeMillis(); 852 enumAvailable(new PoolEntryCallback<T, C>() { 853 854 @Override 855 public void process(final PoolEntry<T, C> entry) { 856 if (entry.isExpired(now)) { 857 entry.close(); 858 } 859 } 860 861 }); 862 } 863 864 @Override 865 public String toString() { 866 final StringBuilder buffer = new StringBuilder(); 867 buffer.append("[leased: "); 868 buffer.append(this.leased); 869 buffer.append("][available: "); 870 buffer.append(this.available); 871 buffer.append("][pending: "); 872 buffer.append(this.pending); 873 buffer.append("]"); 874 return buffer.toString(); 875 } 876 877 class InternalSessionRequestCallback implements SessionRequestCallback { 878 879 @Override 880 public void completed(final SessionRequest request) { 881 requestCompleted(request); 882 } 883 884 @Override 885 public void cancelled(final SessionRequest request) { 886 requestCancelled(request); 887 } 888 889 @Override 890 public void failed(final SessionRequest request) { 891 requestFailed(request); 892 } 893 894 @Override 895 public void timeout(final SessionRequest request) { 896 requestTimeout(request); 897 } 898 899 } 900 901}