介绍

使用缓存时候,如果缓存中没有数据,通常会去数据库查询。为了避免多个线程同时查询同一数据导致的重复查询,经常是会使用锁来保证同一时间只有一个线程去查询数据。 但是锁会导致性能下降,尤其是高并发场景下。 因此可以使用ConcurrentHashMapCompletableFuture来实现并发缓存加载。

源码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;

/**
 * 并发缓存加载
 *
 * @author p_x_c
 */
public class CacheServiceTest {
    // 缓存:可以用你自己的缓存组件,比如 Caffeine、Redis 等
    private final ConcurrentHashMap<String, Object> localCache = new ConcurrentHashMap<>();

    // 占位Map,控制只查一次数据库,其他线程等待
    private final ConcurrentHashMap<String, CompletableFuture<Object>> futureMap = new ConcurrentHashMap<>();

    /**
     * 获取数据,key是缓存的标识
     */
    public Object getData(String key) {
        // Step 1: 先尝试从缓存拿数据
        Object cached = localCache.get(key);
        if (cached != null) {
            return cached;
        }

        // Step 2: 尝试占坑(放入一个future)
        CompletableFuture<Object> future = new CompletableFuture<>();
        CompletableFuture<Object> existingFuture = futureMap.putIfAbsent(key, future);

        if (existingFuture == null) {
            // 我是第一个请求,需要查数据库
            try {
                Object value = queryFromDatabase(key);

                // 写入缓存
                localCache.put(key, value);

                // 通知其他线程,其他线程get就可以拿到数据了
                future.complete(value);
                return value;
            } catch (Throwable e) {
                // 出现异常,也要通知其他线程,避免卡住
                future.completeExceptionally(e);
                throw new RuntimeException(e);
            } finally {
                // 移除占位符
                futureMap.remove(key);
            }
        } else {
            // 其他请求,等待第一个请求查库完成
            try {
                return existingFuture.get(500, TimeUnit.MILLISECONDS); // 等待500毫秒返回
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    // 模拟数据库查询
    private Object queryFromDatabase(String key) throws InterruptedException {
        System.out.println("查询数据库:" + key);
        Thread.sleep(490); // 模拟耗时操作
        return "DB-" + key;
    }

    // 测试
    public static void main(String[] args) {
        CacheServiceTest service = new CacheServiceTest();

        @Cleanup ExecutorService executor = Executors.newFixedThreadPool(10);
        String key = "k1";

        for (int i = 0; i < 100; i++) {
            executor.submit(() -> {
                Object result = service.getData(key);
                System.out.println(Thread.currentThread()
                                         .getName() + " -> " + result);
            });
        }

        executor.shutdown();
    }
}

优点

  1. 高并发友好:多个线程可以同时请求同一数据,但只有第一个线程会去查询数据库,其他线程会等待结果。
  2. 无锁:使用ConcurrentHashMapCompletableFuture 避免了传统锁的开销,减少了线程阻塞。