001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027package org.apache.http.nio.pool;
028
029import java.io.IOException;
030import java.net.SocketAddress;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.ListIterator;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.Future;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.TimeoutException;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.locks.Lock;
045import java.util.concurrent.locks.ReentrantLock;
046
047import org.apache.http.annotation.Contract;
048import org.apache.http.annotation.ThreadingBehavior;
049import org.apache.http.concurrent.BasicFuture;
050import org.apache.http.concurrent.FutureCallback;
051import org.apache.http.nio.reactor.ConnectingIOReactor;
052import org.apache.http.nio.reactor.IOReactorStatus;
053import org.apache.http.nio.reactor.IOSession;
054import org.apache.http.nio.reactor.SessionRequest;
055import org.apache.http.nio.reactor.SessionRequestCallback;
056import org.apache.http.pool.ConnPool;
057import org.apache.http.pool.ConnPoolControl;
058import org.apache.http.pool.PoolEntry;
059import org.apache.http.pool.PoolEntryCallback;
060import org.apache.http.pool.PoolStats;
061import org.apache.http.util.Args;
062import org.apache.http.util.Asserts;
063import org.apache.http.util.LangUtils;
064
065/**
066 * Abstract non-blocking connection pool.
067 *
068 * @param <T> route
069 * @param <C> connection object
070 * @param <E> pool entry
071 *
072 * @since 4.2
073 */
074@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
075public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
076                                                  implements ConnPool<T, E>, ConnPoolControl<T> {
077
078    private final ConnectingIOReactor ioreactor;
079    private final NIOConnFactory<T, C> connFactory;
080    private final SocketAddressResolver<T> addressResolver;
081    private final SessionRequestCallback sessionRequestCallback;
082    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
083    private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
084    private final Set<SessionRequest> pending;
085    private final Set<E> leased;
086    private final LinkedList<E> available;
087    private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
088    private final Map<T, Integer> maxPerRoute;
089    private final Lock lock;
090    private final AtomicBoolean isShutDown;
091
092    private volatile int defaultMaxPerRoute;
093    private volatile int maxTotal;
094
095    /**
096     * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
097     *   NIOConnFactory, SocketAddressResolver, int, int)}
098     */
099    @Deprecated
100    public AbstractNIOConnPool(
101            final ConnectingIOReactor ioreactor,
102            final NIOConnFactory<T, C> connFactory,
103            final int defaultMaxPerRoute,
104            final int maxTotal) {
105        super();
106        Args.notNull(ioreactor, "I/O reactor");
107        Args.notNull(connFactory, "Connection factory");
108        Args.positive(defaultMaxPerRoute, "Max per route value");
109        Args.positive(maxTotal, "Max total value");
110        this.ioreactor = ioreactor;
111        this.connFactory = connFactory;
112        this.addressResolver = new SocketAddressResolver<T>() {
113
114            @Override
115            public SocketAddress resolveLocalAddress(final T route) throws IOException {
116                return AbstractNIOConnPool.this.resolveLocalAddress(route);
117            }
118
119            @Override
120            public SocketAddress resolveRemoteAddress(final T route) throws IOException {
121                return AbstractNIOConnPool.this.resolveRemoteAddress(route);
122            }
123
124        };
125        this.sessionRequestCallback = new InternalSessionRequestCallback();
126        this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
127        this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
128        this.pending = new HashSet<SessionRequest>();
129        this.leased = new HashSet<E>();
130        this.available = new LinkedList<E>();
131        this.maxPerRoute = new HashMap<T, Integer>();
132        this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
133        this.lock = new ReentrantLock();
134        this.isShutDown = new AtomicBoolean(false);
135        this.defaultMaxPerRoute = defaultMaxPerRoute;
136        this.maxTotal = maxTotal;
137    }
138
139    /**
140     * @since 4.3
141     */
142    public AbstractNIOConnPool(
143            final ConnectingIOReactor ioreactor,
144            final NIOConnFactory<T, C> connFactory,
145            final SocketAddressResolver<T> addressResolver,
146            final int defaultMaxPerRoute,
147            final int maxTotal) {
148        super();
149        Args.notNull(ioreactor, "I/O reactor");
150        Args.notNull(connFactory, "Connection factory");
151        Args.notNull(addressResolver, "Address resolver");
152        Args.positive(defaultMaxPerRoute, "Max per route value");
153        Args.positive(maxTotal, "Max total value");
154        this.ioreactor = ioreactor;
155        this.connFactory = connFactory;
156        this.addressResolver = addressResolver;
157        this.sessionRequestCallback = new InternalSessionRequestCallback();
158        this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
159        this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
160        this.pending = new HashSet<SessionRequest>();
161        this.leased = new HashSet<E>();
162        this.available = new LinkedList<E>();
163        this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
164        this.maxPerRoute = new HashMap<T, Integer>();
165        this.lock = new ReentrantLock();
166        this.isShutDown = new AtomicBoolean(false);
167        this.defaultMaxPerRoute = defaultMaxPerRoute;
168        this.maxTotal = maxTotal;
169    }
170
171    /**
172     * @deprecated (4.3) use {@link SocketAddressResolver}
173     */
174    @Deprecated
175    protected SocketAddress resolveRemoteAddress(final T route) {
176        return null;
177    }
178
179    /**
180     * @deprecated (4.3) use {@link SocketAddressResolver}
181     */
182    @Deprecated
183    protected SocketAddress resolveLocalAddress(final T route) {
184        return null;
185    }
186
187    protected abstract E createEntry(T route, C conn);
188
189    /**
190     * @since 4.3
191     */
192    protected void onLease(final E entry) {
193    }
194
195    /**
196     * @since 4.3
197     */
198    protected void onRelease(final E entry) {
199    }
200
201    /**
202     * @since 4.4
203     */
204    protected void onReuse(final E entry) {
205    }
206
207    public boolean isShutdown() {
208        return this.isShutDown.get();
209    }
210
211    public void shutdown(final long waitMs) throws IOException {
212        if (this.isShutDown.compareAndSet(false, true)) {
213            fireCallbacks();
214            this.lock.lock();
215            try {
216                for (final SessionRequest sessionRequest: this.pending) {
217                    sessionRequest.cancel();
218                }
219                for (final E entry: this.available) {
220                    entry.close();
221                }
222                for (final E entry: this.leased) {
223                    entry.close();
224                }
225                for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
226                    pool.shutdown();
227                }
228                this.routeToPool.clear();
229                this.leased.clear();
230                this.pending.clear();
231                this.available.clear();
232                this.leasingRequests.clear();
233                this.ioreactor.shutdown(waitMs);
234            } finally {
235                this.lock.unlock();
236            }
237        }
238    }
239
240    private RouteSpecificPool<T, C, E> getPool(final T route) {
241        RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
242        if (pool == null) {
243            pool = new RouteSpecificPool<T, C, E>(route) {
244
245                @Override
246                protected E createEntry(final T route, final C conn) {
247                    return AbstractNIOConnPool.this.createEntry(route, conn);
248                }
249
250            };
251            this.routeToPool.put(route, pool);
252        }
253        return pool;
254    }
255
256    public Future<E> lease(
257            final T route, final Object state,
258            final long connectTimeout, final TimeUnit tunit,
259            final FutureCallback<E> callback) {
260        return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback);
261    }
262
263    /**
264     * @since 4.3
265     */
266    public Future<E> lease(
267            final T route, final Object state,
268            final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
269            final FutureCallback<E> callback) {
270        Args.notNull(route, "Route");
271        Args.notNull(tunit, "Time unit");
272        Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
273        final BasicFuture<E> future = new BasicFuture<E>(callback);
274        final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
275                connectTimeout >= 0 ? tunit.toMillis(connectTimeout) : -1,
276                leaseTimeout > 0 ? tunit.toMillis(leaseTimeout) : 0,
277                future);
278        this.lock.lock();
279        try {
280            final boolean completed = processPendingRequest(leaseRequest);
281            if (!leaseRequest.isDone() && !completed) {
282                this.leasingRequests.add(leaseRequest);
283            }
284            if (leaseRequest.isDone()) {
285                this.completedRequests.add(leaseRequest);
286            }
287        } finally {
288            this.lock.unlock();
289        }
290        fireCallbacks();
291        return new Future<E>() {
292
293            @Override
294            public E get() throws InterruptedException, ExecutionException {
295                return future.get();
296            }
297
298            @Override
299            public E get(
300                    final long timeout,
301                    final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
302                return future.get(timeout, unit);
303            }
304
305            @Override
306            public boolean cancel(final boolean mayInterruptIfRunning) {
307                try {
308                    leaseRequest.cancel();
309                } finally {
310                    return future.cancel(mayInterruptIfRunning);
311                }
312            }
313
314            @Override
315            public boolean isCancelled() {
316                return future.isCancelled();
317            }
318
319            @Override
320            public boolean isDone() {
321                return future.isDone();
322            }
323
324        };
325    }
326
327    @Override
328    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
329        return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
330    }
331
332    public Future<E> lease(final T route, final Object state) {
333        return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
334    }
335
336    @Override
337    public void release(final E entry, final boolean reusable) {
338        if (entry == null) {
339            return;
340        }
341        if (this.isShutDown.get()) {
342            return;
343        }
344        this.lock.lock();
345        try {
346            if (this.leased.remove(entry)) {
347                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
348                pool.free(entry, reusable);
349                if (reusable) {
350                    this.available.addFirst(entry);
351                    onRelease(entry);
352                } else {
353                    entry.close();
354                }
355                processNextPendingRequest();
356            }
357        } finally {
358            this.lock.unlock();
359        }
360        fireCallbacks();
361    }
362
363    private void processPendingRequests() {
364        final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
365        while (it.hasNext()) {
366            final LeaseRequest<T, C, E> request = it.next();
367            final BasicFuture<E> future = request.getFuture();
368            if (future.isCancelled()) {
369                it.remove();
370                continue;
371            }
372            final boolean completed = processPendingRequest(request);
373            if (request.isDone() || completed) {
374                it.remove();
375            }
376            if (request.isDone()) {
377                this.completedRequests.add(request);
378            }
379        }
380    }
381
382    private void processNextPendingRequest() {
383        final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
384        while (it.hasNext()) {
385            final LeaseRequest<T, C, E> request = it.next();
386            final BasicFuture<E> future = request.getFuture();
387            if (future.isCancelled()) {
388                it.remove();
389                continue;
390            }
391            final boolean completed = processPendingRequest(request);
392            if (request.isDone() || completed) {
393                it.remove();
394            }
395            if (request.isDone()) {
396                this.completedRequests.add(request);
397            }
398            if (completed) {
399                return;
400            }
401        }
402    }
403
404    private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
405        final T route = request.getRoute();
406        final Object state = request.getState();
407        final long deadline = request.getDeadline();
408
409        final long now = System.currentTimeMillis();
410        if (now > deadline) {
411            request.failed(new TimeoutException());
412            return false;
413        }
414
415        final RouteSpecificPool<T, C, E> pool = getPool(route);
416        E entry;
417        for (;;) {
418            entry = pool.getFree(state);
419            if (entry == null) {
420                break;
421            }
422            if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
423                entry.close();
424                this.available.remove(entry);
425                pool.free(entry, false);
426            } else {
427                break;
428            }
429        }
430        if (entry != null) {
431            this.available.remove(entry);
432            this.leased.add(entry);
433            request.completed(entry);
434            onReuse(entry);
435            onLease(entry);
436            return true;
437        }
438
439        // New connection is needed
440        final int maxPerRoute = getMax(route);
441        // Shrink the pool prior to allocating a new connection
442        final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
443        if (excess > 0) {
444            for (int i = 0; i < excess; i++) {
445                final E lastUsed = pool.getLastUsed();
446                if (lastUsed == null) {
447                    break;
448                }
449                lastUsed.close();
450                this.available.remove(lastUsed);
451                pool.remove(lastUsed);
452            }
453        }
454
455        if (pool.getAllocatedCount() < maxPerRoute) {
456            final int totalUsed = this.pending.size() + this.leased.size();
457            final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
458            if (freeCapacity == 0) {
459                return false;
460            }
461            final int totalAvailable = this.available.size();
462            if (totalAvailable > freeCapacity - 1) {
463                if (!this.available.isEmpty()) {
464                    final E lastUsed = this.available.removeLast();
465                    lastUsed.close();
466                    final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
467                    otherpool.remove(lastUsed);
468                }
469            }
470
471            final SocketAddress localAddress;
472            final SocketAddress remoteAddress;
473            try {
474                remoteAddress = this.addressResolver.resolveRemoteAddress(route);
475                localAddress = this.addressResolver.resolveLocalAddress(route);
476            } catch (final IOException ex) {
477                request.failed(ex);
478                return false;
479            }
480
481            final SessionRequest sessionRequest = this.ioreactor.connect(
482                    remoteAddress, localAddress, route, this.sessionRequestCallback);
483            request.attachSessionRequest(sessionRequest);
484            final long connectTimeout = request.getConnectTimeout();
485            if (connectTimeout >= 0) {
486                sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE);
487            }
488            this.pending.add(sessionRequest);
489            pool.addPending(sessionRequest, request.getFuture());
490            return true;
491        } else {
492            return false;
493        }
494    }
495
496    private void fireCallbacks() {
497        LeaseRequest<T, C, E> request;
498        while ((request = this.completedRequests.poll()) != null) {
499            final BasicFuture<E> future = request.getFuture();
500            final Exception ex = request.getException();
501            final E result = request.getResult();
502            boolean successfullyCompleted = false;
503            if (ex != null) {
504                future.failed(ex);
505            } else if (result != null) {
506                if (future.completed(result)) {
507                    successfullyCompleted = true;
508                }
509            } else {
510                future.cancel();
511            }
512            if (!successfullyCompleted) {
513                release(result, true);
514            }
515        }
516    }
517
518    public void validatePendingRequests() {
519        this.lock.lock();
520        try {
521            final long now = System.currentTimeMillis();
522            final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
523            while (it.hasNext()) {
524                final LeaseRequest<T, C, E> request = it.next();
525                final BasicFuture<E> future = request.getFuture();
526                if (future.isCancelled() && !request.isDone()) {
527                    it.remove();
528                } else {
529                    final long deadline = request.getDeadline();
530                    if (now > deadline) {
531                        request.failed(new TimeoutException());
532                    }
533                    if (request.isDone()) {
534                        it.remove();
535                        this.completedRequests.add(request);
536                    }
537                }
538            }
539        } finally {
540            this.lock.unlock();
541        }
542        fireCallbacks();
543    }
544
545    protected void requestCompleted(final SessionRequest request) {
546        if (this.isShutDown.get()) {
547            return;
548        }
549        @SuppressWarnings("unchecked")
550        final
551        T route = (T) request.getAttachment();
552        this.lock.lock();
553        try {
554            this.pending.remove(request);
555            final RouteSpecificPool<T, C, E> pool = getPool(route);
556            final IOSession session = request.getSession();
557            try {
558                final C conn = this.connFactory.create(route, session);
559                final E entry = pool.createEntry(request, conn);
560                if (pool.completed(request, entry)) {
561                    this.leased.add(entry);
562                    onLease(entry);
563                } else {
564                    this.available.add(entry);
565                    if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
566                        processNextPendingRequest();
567                    }
568                }
569            } catch (final IOException ex) {
570                pool.failed(request, ex);
571            }
572        } finally {
573            this.lock.unlock();
574        }
575        fireCallbacks();
576    }
577
578    protected void requestCancelled(final SessionRequest request) {
579        if (this.isShutDown.get()) {
580            return;
581        }
582        @SuppressWarnings("unchecked")
583        final
584        T route = (T) request.getAttachment();
585        this.lock.lock();
586        try {
587            this.pending.remove(request);
588            final RouteSpecificPool<T, C, E> pool = getPool(route);
589            pool.cancelled(request);
590            if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
591                processNextPendingRequest();
592            }
593        } finally {
594            this.lock.unlock();
595        }
596        fireCallbacks();
597    }
598
599    protected void requestFailed(final SessionRequest request) {
600        if (this.isShutDown.get()) {
601            return;
602        }
603        @SuppressWarnings("unchecked")
604        final
605        T route = (T) request.getAttachment();
606        this.lock.lock();
607        try {
608            this.pending.remove(request);
609            final RouteSpecificPool<T, C, E> pool = getPool(route);
610            pool.failed(request, request.getException());
611            processNextPendingRequest();
612        } finally {
613            this.lock.unlock();
614        }
615        fireCallbacks();
616    }
617
618    protected void requestTimeout(final SessionRequest request) {
619        if (this.isShutDown.get()) {
620            return;
621        }
622        @SuppressWarnings("unchecked")
623        final
624        T route = (T) request.getAttachment();
625        this.lock.lock();
626        try {
627            this.pending.remove(request);
628            final RouteSpecificPool<T, C, E> pool = getPool(route);
629            pool.timeout(request);
630            processNextPendingRequest();
631        } finally {
632            this.lock.unlock();
633        }
634        fireCallbacks();
635    }
636
637    private int getMax(final T route) {
638        final Integer v = this.maxPerRoute.get(route);
639        if (v != null) {
640            return v.intValue();
641        } else {
642            return this.defaultMaxPerRoute;
643        }
644    }
645
646    @Override
647    public void setMaxTotal(final int max) {
648        Args.positive(max, "Max value");
649        this.lock.lock();
650        try {
651            this.maxTotal = max;
652        } finally {
653            this.lock.unlock();
654        }
655    }
656
657    @Override
658    public int getMaxTotal() {
659        this.lock.lock();
660        try {
661            return this.maxTotal;
662        } finally {
663            this.lock.unlock();
664        }
665    }
666
667    @Override
668    public void setDefaultMaxPerRoute(final int max) {
669        Args.positive(max, "Max value");
670        this.lock.lock();
671        try {
672            this.defaultMaxPerRoute = max;
673        } finally {
674            this.lock.unlock();
675        }
676    }
677
678    @Override
679    public int getDefaultMaxPerRoute() {
680        this.lock.lock();
681        try {
682            return this.defaultMaxPerRoute;
683        } finally {
684            this.lock.unlock();
685        }
686    }
687
688    @Override
689    public void setMaxPerRoute(final T route, final int max) {
690        Args.notNull(route, "Route");
691        Args.positive(max, "Max value");
692        this.lock.lock();
693        try {
694            this.maxPerRoute.put(route, Integer.valueOf(max));
695        } finally {
696            this.lock.unlock();
697        }
698    }
699
700    @Override
701    public int getMaxPerRoute(final T route) {
702        Args.notNull(route, "Route");
703        this.lock.lock();
704        try {
705            return getMax(route);
706        } finally {
707            this.lock.unlock();
708        }
709    }
710
711    @Override
712    public PoolStats getTotalStats() {
713        this.lock.lock();
714        try {
715            return new PoolStats(
716                    this.leased.size(),
717                    this.pending.size(),
718                    this.available.size(),
719                    this.maxTotal);
720        } finally {
721            this.lock.unlock();
722        }
723    }
724
725    @Override
726    public PoolStats getStats(final T route) {
727        Args.notNull(route, "Route");
728        this.lock.lock();
729        try {
730            final RouteSpecificPool<T, C, E> pool = getPool(route);
731            int pendingCount = 0;
732            for (final LeaseRequest<T, C, E> request: leasingRequests) {
733                if (LangUtils.equals(route, request.getRoute())) {
734                    pendingCount++;
735                }
736            }
737            return new PoolStats(
738                    pool.getLeasedCount(),
739                    pendingCount + pool.getPendingCount(),
740                    pool.getAvailableCount(),
741                    getMax(route));
742        } finally {
743            this.lock.unlock();
744        }
745    }
746
747    /**
748     * Returns snapshot of all knows routes
749     *
750     * @since 4.4
751     */
752    public Set<T> getRoutes() {
753        this.lock.lock();
754        try {
755            return new HashSet<T>(routeToPool.keySet());
756        } finally {
757            this.lock.unlock();
758        }
759    }
760
761    /**
762     * Enumerates all available connections.
763     *
764     * @since 4.3
765     */
766    protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
767        this.lock.lock();
768        try {
769            final Iterator<E> it = this.available.iterator();
770            while (it.hasNext()) {
771                final E entry = it.next();
772                callback.process(entry);
773                if (entry.isClosed()) {
774                    final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
775                    pool.remove(entry);
776                    it.remove();
777                }
778            }
779            processPendingRequests();
780            purgePoolMap();
781        } finally {
782            this.lock.unlock();
783        }
784    }
785
786    /**
787     * Enumerates all leased connections.
788     *
789     * @since 4.3
790     */
791    protected void enumLeased(final PoolEntryCallback<T, C> callback) {
792        this.lock.lock();
793        try {
794            final Iterator<E> it = this.leased.iterator();
795            while (it.hasNext()) {
796                final E entry = it.next();
797                callback.process(entry);
798            }
799            processPendingRequests();
800        } finally {
801            this.lock.unlock();
802        }
803    }
804
805    /**
806     * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
807     *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
808     *
809     * @deprecated (4.3.2)
810     */
811    @Deprecated
812    protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
813        while (it.hasNext()) {
814            final E entry = it.next();
815            callback.process(entry);
816        }
817        processPendingRequests();
818    }
819
820    private void purgePoolMap() {
821        final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
822        while (it.hasNext()) {
823            final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
824            final RouteSpecificPool<T, C, E> pool = entry.getValue();
825            if (pool.getAllocatedCount() == 0) {
826                it.remove();
827            }
828        }
829    }
830
831    public void closeIdle(final long idletime, final TimeUnit tunit) {
832        Args.notNull(tunit, "Time unit");
833        long time = tunit.toMillis(idletime);
834        if (time < 0) {
835            time = 0;
836        }
837        final long deadline = System.currentTimeMillis() - time;
838        enumAvailable(new PoolEntryCallback<T, C>() {
839
840            @Override
841            public void process(final PoolEntry<T, C> entry) {
842                if (entry.getUpdated() <= deadline) {
843                    entry.close();
844                }
845            }
846
847        });
848    }
849
850    public void closeExpired() {
851        final long now = System.currentTimeMillis();
852        enumAvailable(new PoolEntryCallback<T, C>() {
853
854            @Override
855            public void process(final PoolEntry<T, C> entry) {
856                if (entry.isExpired(now)) {
857                    entry.close();
858                }
859            }
860
861        });
862    }
863
864    @Override
865    public String toString() {
866        final StringBuilder buffer = new StringBuilder();
867        buffer.append("[leased: ");
868        buffer.append(this.leased);
869        buffer.append("][available: ");
870        buffer.append(this.available);
871        buffer.append("][pending: ");
872        buffer.append(this.pending);
873        buffer.append("]");
874        return buffer.toString();
875    }
876
877    class InternalSessionRequestCallback implements SessionRequestCallback {
878
879        @Override
880        public void completed(final SessionRequest request) {
881            requestCompleted(request);
882        }
883
884        @Override
885        public void cancelled(final SessionRequest request) {
886            requestCancelled(request);
887        }
888
889        @Override
890        public void failed(final SessionRequest request) {
891            requestFailed(request);
892        }
893
894        @Override
895        public void timeout(final SessionRequest request) {
896            requestTimeout(request);
897        }
898
899    }
900
901}