From cb491e82b5ce3dcb7e3c41973a46cb7dcbaa9008 Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sun, 10 Dec 2023 19:07:21 +0000 Subject: Initial commit --- mapreduce/MapReduce.java | 84 ++++++++++++++++++++++++++++++++++++++++++++++++ mapreduce/Mapper.java | 22 +++++++++++++ mapreduce/Reducer.java | 26 +++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 mapreduce/MapReduce.java create mode 100644 mapreduce/Mapper.java create mode 100644 mapreduce/Reducer.java (limited to 'mapreduce') diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java new file mode 100644 index 0000000..f993f09 --- /dev/null +++ b/mapreduce/MapReduce.java @@ -0,0 +1,84 @@ +package mapreduce; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +/** + * + * @author mph + * @param input type + * @param key type + * @param value type + * @param output type + */ +public class MapReduce + implements Callable> { + + private List inputList; + private Supplier> mapperSupplier; + private Supplier> reducerSupplier; + private static ForkJoinPool pool; + + /** + * + */ + public MapReduce() { + pool = new ForkJoinPool(); + mapperSupplier = () -> { + throw new UnsupportedOperationException("No mapper supplier"); + }; + reducerSupplier = () -> { + throw new UnsupportedOperationException("No reducer supplier"); + }; + } + + @Override + public Map call() { + Set> mappers = new HashSet<>(); + // TODO: for each element in inputList, create a mapper thread, set its input, and execute it + // helpful methods: Supplier.get() ForkJoinPool.execute() + + Map> mapResults = new HashMap<>(); + // TODO: for each mapper thread, join and merge the results + // helpful methods: ForkJoinTask.join() + + Map> reducers = new HashMap<>(); + // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it + // helpful methods: Supplier.get() ForkJoinPool.execute() + + Map result = new HashMap<>(); + // TODO: for each reducer thread, join and merge the result to the final result + // helpful methods: ForkJoinTask.join() + + throw new UnsupportedOperationException("MapReduce.call() not implemented yet."); + } + + /** + * @param aMapperSupplier construct mapper thread + */ + public void setMapperSupplier(Supplier> aMapperSupplier) { + mapperSupplier = aMapperSupplier; + } + + /** + * @param aReducerSupplier construct reducer thread + */ + public void setReducerSupplier(Supplier> aReducerSupplier) { + reducerSupplier = aReducerSupplier; + } + + /** + * @param anInput the task's set of inputs + */ + public void setInput(List anInput) { + inputList = anInput; + } + +} diff --git a/mapreduce/Mapper.java b/mapreduce/Mapper.java new file mode 100644 index 0000000..c2da326 --- /dev/null +++ b/mapreduce/Mapper.java @@ -0,0 +1,22 @@ +package mapreduce; + +import java.util.Map; +import java.util.concurrent.RecursiveTask; + +/** + * + * @author mph + * @param input type + * @param key type + * @param accumulator type + */ +public abstract class Mapper extends RecursiveTask> { + protected IN input; + + /** + * @param anInput list of input items + */ + public void setInput(IN anInput) { + input = anInput; + } +} diff --git a/mapreduce/Reducer.java b/mapreduce/Reducer.java new file mode 100644 index 0000000..982ddfc --- /dev/null +++ b/mapreduce/Reducer.java @@ -0,0 +1,26 @@ +package mapreduce; + +import java.util.List; +import java.util.concurrent.RecursiveTask; + +/** + * + * @author mph + * @param key + * @param valueListumulator + * @param output value + */ +public abstract class Reducer extends RecursiveTask { + + protected K key; + protected List valueList; + + /** + * @param aKey key for this reducer + * @param aList list of values + */ + public void setInput(K aKey, List aList) { + key = aKey; + valueList = aList; + } +}; -- cgit v1.2.3-70-g09d2