package mapreduce; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; /** * * @author mph * @param input type * @param key type * @param value type * @param output type */ public class MapReduce implements Callable> { private List inputList; private Supplier> mapperSupplier; private Supplier> reducerSupplier; private static ForkJoinPool pool; /** * */ public MapReduce() { pool = new ForkJoinPool(); mapperSupplier = () -> { throw new UnsupportedOperationException("No mapper supplier"); }; reducerSupplier = () -> { throw new UnsupportedOperationException("No reducer supplier"); }; } @Override public Map call() { Set> mappers = new HashSet<>(); // TODO: for each element in inputList, create a mapper thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() for (IN input : inputList) { Mapper mapper = mapperSupplier.get(); mapper.setInput(input); pool.execute(mapper); mappers.add(mapper); } Map> mapResults = new HashMap<>(); // TODO: for each mapper thread, join and merge the results // helpful methods: ForkJoinTask.join() for (Mapper mapper : mappers) { Map map = mapper.join(); for (K key : map.keySet()) { mapResults.putIfAbsent(key, new LinkedList<>()); mapResults.get(key).add(map.get(key)); } } Map> reducers = new HashMap<>(); // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() mapResults.forEach((key, value) -> { Reducer reducer = reducerSupplier.get(); reducer.setInput(key, value); pool.execute(reducer); reducers.put(key, reducer); }); Map result = new HashMap<>(); // TODO: for each reducer thread, join and merge the result to the final result // helpful methods: ForkJoinTask.join() reducers.forEach((key, reducer) -> { result.put(key, reducer.join()); }); return result; } /** * @param aMapperSupplier construct mapper thread */ public void setMapperSupplier(Supplier> aMapperSupplier) { mapperSupplier = aMapperSupplier; } /** * @param aReducerSupplier construct reducer thread */ public void setReducerSupplier(Supplier> aReducerSupplier) { reducerSupplier = aReducerSupplier; } /** * @param anInput the task's set of inputs */ public void setInput(List anInput) { inputList = anInput; } }