001// License: GPL. For details, see LICENSE file. 002package org.openstreetmap.josm.data.cache; 003 004import java.io.IOException; 005import java.net.URL; 006import java.util.Iterator; 007import java.util.Map; 008import java.util.concurrent.ConcurrentHashMap; 009import java.util.concurrent.LinkedBlockingDeque; 010import java.util.concurrent.Semaphore; 011import java.util.concurrent.TimeUnit; 012 013import org.openstreetmap.josm.tools.Logging; 014 015/** 016 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task 017 * and it will set a runnable task with semaphore release, when job has finished. 018 * <p> 019 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't 020 * guarantee that all threads will be busy, when there is work for them[2]. <br> 021 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus 022 * tasks do not go through the Queue <br> 023 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread 024 * take the first available job and wait for semaphore. It might be the case, that semaphore was released 025 * for some task further in queue, but this implementation doesn't try to detect such situation 026 * 027 * @author Wiktor Niesiobędzki 028 */ 029public class HostLimitQueue extends LinkedBlockingDeque<Runnable> { 030 private static final long serialVersionUID = 1L; 031 032 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>(); 033 private final int hostLimit; 034 035 /** 036 * Creates an unbounded queue 037 * @param hostLimit how many parallel calls to host to allow 038 */ 039 public HostLimitQueue(int hostLimit) { 040 super(); // create unbounded queue 041 this.hostLimit = hostLimit; 042 } 043 044 /** 045 * Creates bounded queue 046 * @param hostLimit how many parallel calls to host to allow 047 * @param queueLimit how deep the queue should be 048 */ 049 public HostLimitQueue(int hostLimit, int queueLimit) { 050 super(queueLimit); // create bounded queue 051 this.hostLimit = hostLimit; 052 } 053 054 private JCSCachedTileLoaderJob<?, ?> findJob() { 055 for (Iterator<Runnable> it = iterator(); it.hasNext();) { 056 Runnable r = it.next(); 057 if (r instanceof JCSCachedTileLoaderJob) { 058 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r; 059 if (tryAcquireSemaphore(job)) { 060 if (remove(job)) { 061 return job; 062 } else { 063 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did 064 // release the semaphore and look for another candidate 065 releaseSemaphore(job); 066 } 067 } else { 068 URL url = null; 069 try { 070 url = job.getUrl(); 071 } catch (IOException e) { 072 Logging.debug(e); 073 } 074 Logging.debug("TMS - Skipping job {0} because host limit reached", url); 075 } 076 } 077 } 078 return null; 079 } 080 081 @Override 082 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 083 Runnable job = findJob(); 084 if (job != null) { 085 return job; 086 } 087 job = pollFirst(timeout, unit); 088 if (job != null) { 089 try { 090 boolean gotLock = tryAcquireSemaphore(job, timeout, unit); 091 return gotLock ? job : null; 092 } catch (InterruptedException e) { 093 // acquire my got interrupted, first offer back what was taken 094 if (!offer(job)) { 095 Logging.warn("Unable to offer back " + job); 096 } 097 throw e; 098 } 099 } 100 return job; 101 } 102 103 @Override 104 public Runnable take() throws InterruptedException { 105 Runnable job = findJob(); 106 if (job != null) { 107 return job; 108 } 109 job = takeFirst(); 110 try { 111 acquireSemaphore(job); 112 } catch (InterruptedException e) { 113 // acquire my got interrupted, first offer back what was taken 114 if (!offer(job)) { 115 Logging.warn("Unable to offer back " + job); 116 } 117 throw e; 118 } 119 return job; 120 } 121 122 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 123 String host; 124 try { 125 host = job.getUrl().getHost(); 126 } catch (IOException e) { 127 // do not pass me illegal URL's 128 throw new IllegalArgumentException(e); 129 } 130 Semaphore limit = hostSemaphores.get(host); 131 if (limit == null) { 132 limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit)); 133 } 134 return limit; 135 } 136 137 private void acquireSemaphore(Runnable job) throws InterruptedException { 138 if (job instanceof JCSCachedTileLoaderJob) { 139 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 140 getSemaphore(jcsJob).acquire(); 141 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 142 } 143 } 144 145 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) { 146 boolean ret = true; 147 Semaphore limit = getSemaphore(job); 148 if (limit != null) { 149 ret = limit.tryAcquire(); 150 if (ret) { 151 job.setFinishedTask(() -> releaseSemaphore(job)); 152 } 153 } 154 return ret; 155 } 156 157 private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException { 158 boolean ret = true; 159 if (job instanceof JCSCachedTileLoaderJob) { 160 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 161 Semaphore limit = getSemaphore(jcsJob); 162 if (limit != null) { 163 ret = limit.tryAcquire(timeout, unit); 164 if (ret) { 165 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 166 } 167 } 168 } 169 return ret; 170 } 171 172 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 173 Semaphore limit = getSemaphore(job); 174 if (limit != null) { 175 limit.release(); 176 if (limit.availablePermits() > hostLimit) { 177 Logging.warn("More permits than it should be"); 178 } 179 } 180 } 181}