001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.concurrent.ConcurrentHashMap;
009import java.util.concurrent.LinkedBlockingDeque;
010import java.util.concurrent.Semaphore;
011import java.util.concurrent.TimeUnit;
012
013import org.openstreetmap.josm.tools.Logging;
014
015/**
016 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
017 * and it will set a runnable task with semaphore release, when job has finished.
018 * <p>
019 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
020 * guarantee that all threads will be busy, when there is work for them[2]. <br>
021 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
022 *     tasks do not go through the Queue <br>
023 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
024 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
025 *     for some task further in queue, but this implementation doesn't try to detect such situation
026 *
027 * @author Wiktor Niesiobędzki
028 */
029public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
030    private static final long serialVersionUID = 1L;
031
032    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
033    private final int hostLimit;
034
035    /**
036     * Creates an unbounded queue
037     * @param hostLimit how many parallel calls to host to allow
038     */
039    public HostLimitQueue(int hostLimit) {
040        super(); // create unbounded queue
041        this.hostLimit = hostLimit;
042    }
043
044    /**
045     * Creates bounded queue
046     * @param hostLimit how many parallel calls to host to allow
047     * @param queueLimit how deep the queue should be
048     */
049    public HostLimitQueue(int hostLimit, int queueLimit) {
050        super(queueLimit); // create bounded queue
051        this.hostLimit = hostLimit;
052    }
053
054    private JCSCachedTileLoaderJob<?, ?> findJob() {
055        for (Iterator<Runnable> it = iterator(); it.hasNext();) {
056            Runnable r = it.next();
057            if (r instanceof JCSCachedTileLoaderJob) {
058                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
059                if (tryAcquireSemaphore(job)) {
060                    if (remove(job)) {
061                        return job;
062                    } else {
063                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
064                        // release the semaphore and look for another candidate
065                        releaseSemaphore(job);
066                    }
067                } else {
068                    URL url = null;
069                    try {
070                        url = job.getUrl();
071                    } catch (IOException e) {
072                        Logging.debug(e);
073                    }
074                    Logging.debug("TMS - Skipping job {0} because host limit reached", url);
075                }
076            }
077        }
078        return null;
079    }
080
081    @Override
082    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
083        Runnable job = findJob();
084        if (job != null) {
085            return job;
086        }
087        job = pollFirst(timeout, unit);
088        if (job != null) {
089            try {
090                boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
091                return gotLock ? job : null;
092            } catch (InterruptedException e) {
093                // acquire my got interrupted, first offer back what was taken
094                if (!offer(job)) {
095                    Logging.warn("Unable to offer back " + job);
096                }
097                throw e;
098            }
099        }
100        return job;
101    }
102
103    @Override
104    public Runnable take() throws InterruptedException {
105        Runnable job = findJob();
106        if (job != null) {
107            return job;
108        }
109        job = takeFirst();
110        try {
111            acquireSemaphore(job);
112        } catch (InterruptedException e) {
113            // acquire my got interrupted, first offer back what was taken
114            if (!offer(job)) {
115                Logging.warn("Unable to offer back " + job);
116            }
117            throw e;
118        }
119        return job;
120    }
121
122    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
123        String host;
124        try {
125            host = job.getUrl().getHost();
126        } catch (IOException e) {
127            // do not pass me illegal URL's
128            throw new IllegalArgumentException(e);
129        }
130        Semaphore limit = hostSemaphores.get(host);
131        if (limit == null) {
132            limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit));
133        }
134        return limit;
135    }
136
137    private void acquireSemaphore(Runnable job) throws InterruptedException {
138        if (job instanceof JCSCachedTileLoaderJob) {
139            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
140            getSemaphore(jcsJob).acquire();
141            jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
142        }
143    }
144
145    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
146        boolean ret = true;
147        Semaphore limit = getSemaphore(job);
148        if (limit != null) {
149            ret = limit.tryAcquire();
150            if (ret) {
151                job.setFinishedTask(() -> releaseSemaphore(job));
152            }
153        }
154        return ret;
155    }
156
157    private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
158        boolean ret = true;
159        if (job instanceof JCSCachedTileLoaderJob) {
160            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
161            Semaphore limit = getSemaphore(jcsJob);
162            if (limit != null) {
163                ret = limit.tryAcquire(timeout, unit);
164                if (ret) {
165                    jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
166                }
167            }
168        }
169        return ret;
170    }
171
172    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
173        Semaphore limit = getSemaphore(job);
174        if (limit != null) {
175            limit.release();
176            if (limit.availablePermits() > hostLimit) {
177                Logging.warn("More permits than it should be");
178            }
179        }
180    }
181}