package mapreduce; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; /** * * @author mph * @param input type * @param key type * @param value type * @param output type */ public class MapReduce implements Callable> { private List inputList; private Supplier> mapperSupplier; private Supplier> reducerSupplier; private static ForkJoinPool pool; /** * */ public MapReduce() { pool = new ForkJoinPool(); mapperSupplier = () -> { throw new UnsupportedOperationException("No mapper supplier"); }; reducerSupplier = () -> { throw new UnsupportedOperationException("No reducer supplier"); }; } @Override public Map call() { Set> mappers = new HashSet<>(); // TODO: for each element in inputList, create a mapper thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() Map> mapResults = new HashMap<>(); // TODO: for each mapper thread, join and merge the results // helpful methods: ForkJoinTask.join() Map> reducers = new HashMap<>(); // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() Map result = new HashMap<>(); // TODO: for each reducer thread, join and merge the result to the final result // helpful methods: ForkJoinTask.join() throw new UnsupportedOperationException("MapReduce.call() not implemented yet."); } /** * @param aMapperSupplier construct mapper thread */ public void setMapperSupplier(Supplier> aMapperSupplier) { mapperSupplier = aMapperSupplier; } /** * @param aReducerSupplier construct reducer thread */ public void setReducerSupplier(Supplier> aReducerSupplier) { reducerSupplier = aReducerSupplier; } /** * @param anInput the task's set of inputs */ public void setInput(List anInput) { inputList = anInput; } }