Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions lld/flipCache/src/main/java/com/example/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,99 @@
import com.example.application.ports.outbound.EvictionPolicy;
import com.example.application.service.FlipCache;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
public static void main(String[] args) {
try {
multiThreadExecutor();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

public static void multiThreadExecutor() throws ExecutionException, InterruptedException {
// Use fixed thread pool for better thread safety testing
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
// add code to measure how long it takes to execute the test
long startTime = System.currentTimeMillis();
testCacheThreadSafety(executorService);
long endTime = System.currentTimeMillis();
System.out.println("======Test completed in " + (endTime - startTime) + " ms =======");
} finally {
executorService.shutdown();
if (!executorService.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
}

private static void testCacheThreadSafety(ExecutorService executorService) throws ExecutionException, InterruptedException {
// Setup cache
DataSource<String, String> dataSource = new InMemoryDataSource<>();
EvictionPolicy<String> evictionPolicy = new SimpleEvictionPolicy<>();
FlipCache<String, String> flipCache = new FlipCache<>(5, dataSource, evictionPolicy);

CacheHook<String, String> cacheHitMetricCollector = new CacheHitMetricCollector<>();
flipCache.registerHook(cacheHitMetricCollector);

// Submit multiple concurrent tasks
List<Future<?>> futures = new ArrayList<>();

// Concurrent writes
int concurrentHitCount = 20_000;
for (int i = 0; i < concurrentHitCount; i++) {
final int threadId = i;
futures.add(executorService.submit(() -> {
String key = "key" + (threadId % 10);
String value = "value" + threadId;
flipCache.setCache(key, value);
System.out.println("Thread " + threadId + " set: " + key + " = " + value);
}));
}

// Concurrent reads
for (int i = 0; i < concurrentHitCount; i++) {
final int threadId = i;
futures.add(executorService.submit(() -> {
String key = "key" + (threadId % 10);
String value = flipCache.getCache(key);
System.out.println("Thread " + threadId + " got: " + key + " = " + value);
}));
}

// Wait for all tasks to complete
for (Future<?> future : futures) {
future.get();
}

System.out.println("All concurrent operations completed");
System.out.println("Final cache state and metrics:");

// Check final state
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = flipCache.getCache(key);
int hitCount = ((CacheHitMetricCollector) cacheHitMetricCollector).getCacheHitCount(key);
System.out.println(key + " = " + value + ", hits: " + hitCount);
}
}

public static void singleThreadExecutor() {
List<String> keys = List.of("A", "B", "C");
int maxSizeOfCache = 2;

DataSource<String, String> dataSource = new InMemoryDataSource<>();
EvictionPolicy<String> evictionPolicy = new SimpleEvictionPolicy<>();
FlipCache<String, String> flipCache = new FlipCache<>(maxSizeOfCache, dataSource, evictionPolicy);

CacheHook<String, String> cacheHitMetricCollector = new CacheHitMetricCollector<>(keys.getFirst());
CacheHook<String, String> cacheHitMetricCollector = new CacheHitMetricCollector<>();
flipCache.registerHook(cacheHitMetricCollector);

flipCache.setCache(keys.get(0), "ValueA");
Expand All @@ -30,6 +111,6 @@ public static void main(String[] args) {
key -> System.out.println(String.format("Got Value: {%s} from cache for key: {%s}", flipCache.getCache(key), key))
);

// System.out.println(String.format("Metric collected: Number of hits: {%s}",));
System.out.println(String.format("Metric collected: Number of hits: {%s}", ((CacheHitMetricCollector)cacheHitMetricCollector).getCacheHitCount("A")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@
import com.example.application.models.Event;
import com.example.application.ports.outbound.CacheHook;

public class CacheHitMetricCollector <K, V> implements CacheHook <K, V> {
private int cacheHitCount = 0;
private final K key;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public CacheHitMetricCollector(K key) {
this.key = key;
}
public class CacheHitMetricCollector <K, V> implements CacheHook <K, V> {
private final Map<K, Integer> hitCounts = new ConcurrentHashMap<>();

public int getCacheHitCount() {
return cacheHitCount;
public int getCacheHitCount(K key) {
return hitCounts.getOrDefault(key, 0);
}

@Override
public void onEvent(Event event, K key, V value) {
if (event == Event.HIT && key.equals(this.key)) {
cacheHitCount += 1;
if (event == Event.HIT) {
hitCounts.put(key, getCacheHitCount(key) + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,36 @@

import com.example.application.ports.outbound.DataSource;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class InMemoryDataSource <K, V> implements DataSource <K, V> {
private final Map<K, V> data;

public InMemoryDataSource() {
this.data = new HashMap<>();
this.data = new ConcurrentHashMap<>();
}

@Override
public V persist(K key, V value) {
data.put(key, value);
return value;
return data.put(key, value);
/*
For Testing multi-threaded access and exception handling
System.out.println("Persisted key: " + key + ", value: " + value + " in InMemoryDataSource. Size: " + data.size());
if (data.size() > 5) {
throw new RuntimeException("InMemoryDataSource capacity exceeded");
}
*/
}

@Override
public V retrieve(K key) {
if (!this.contains(key)) {
return null;
}
return data.get(key);
}

@Override
public V remove(K key) {
if (!this.contains(key)) {
return null;
}

V value = this.retrieve(key);
data.remove(key);
return value;
return data.remove(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class SimpleEvictionPolicy <K> implements EvictionPolicy <K> {
private final List<K> keys;

public SimpleEvictionPolicy() {
this.keys = new ArrayList<>();
this.keys = new CopyOnWriteArrayList<>();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.example.application.models.Event;

// Note: Though this interface expects only one method,
// in general, it would need to maintain state (e.g., count of hits, misses, etc.)
// Hence, it is defined as an interface rather than a functional interface.
public interface CacheHook<K, V> {
void onEvent(Event event, K key, V value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

public class FlipCache<K, V> implements GetCache<K, V>, SetCache<K, V> {
private final int maxSize;
private int currentSize;
private final AtomicInteger currentSize;
private final DataSource<K, V> dataSource;
private final EvictionPolicy<K> evictionPolicy;
private final List<CacheHook<K, V>> hooks;
Expand All @@ -21,8 +23,8 @@ public FlipCache(int maxSize, DataSource<K, V> dataSource, EvictionPolicy<K> evi
this.maxSize = maxSize;
this.dataSource = dataSource;
this.evictionPolicy = evictionPolicy;
this.hooks = new ArrayList<>();
this.currentSize = 0;
this.hooks = new CopyOnWriteArrayList<>();
this.currentSize = new AtomicInteger(0);
}

private void triggerHooks(Event event, K key, V value) {
Expand All @@ -31,9 +33,8 @@ private void triggerHooks(Event event, K key, V value) {
);
}

public CacheHook<K, V> registerHook(CacheHook<K, V> cacheHook) {
public void registerHook(CacheHook<K, V> cacheHook) {
this.hooks.add(cacheHook);
return cacheHook;
}

@Override
Expand All @@ -49,26 +50,26 @@ public V getCache(K key) {
}

@Override
public V setCache(K key, V value) {
public synchronized V setCache(K key, V value) {
// if key already exists
if (dataSource.contains(key)) {
evictionPolicy.keyAccessed(key);
return dataSource.retrieve(key);
}

// If the cache is full, evict an item
if (currentSize == maxSize) {
if (currentSize.intValue() == maxSize) {
K keyToEvict = evictionPolicy.evictionCandidate();
triggerHooks(Event.EVICT, keyToEvict, dataSource.retrieve(keyToEvict));
dataSource.remove(keyToEvict);
evictionPolicy.keyRemoved(keyToEvict);
currentSize -= 1;
currentSize.getAndDecrement();
}

// persist the data
dataSource.persist(key, value);
evictionPolicy.keyAdded(key);
currentSize += 1;
currentSize.getAndIncrement();

// Add appropriate metrics
triggerHooks(Event.CREATE, key, value);
Expand Down