diff options
Diffstat (limited to 'mapreduce')
-rw-r--r-- | mapreduce/MapReduce.java | 84 | ||||
-rw-r--r-- | mapreduce/Mapper.java | 22 | ||||
-rw-r--r-- | mapreduce/Reducer.java | 26 |
3 files changed, 132 insertions, 0 deletions
diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java new file mode 100644 index 0000000..f993f09 --- /dev/null +++ b/mapreduce/MapReduce.java @@ -0,0 +1,84 @@ +package mapreduce; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +/** + * + * @author mph + * @param <IN> input type + * @param <K> key type + * @param <V> value type + * @param <OUT> output type + */ +public class MapReduce<IN, K, V, OUT> + implements Callable<Map<K, OUT>> { + + private List<IN> inputList; + private Supplier<Mapper<IN, K, V>> mapperSupplier; + private Supplier<Reducer<K, V, OUT>> reducerSupplier; + private static ForkJoinPool pool; + + /** + * + */ + public MapReduce() { + pool = new ForkJoinPool(); + mapperSupplier = () -> { + throw new UnsupportedOperationException("No mapper supplier"); + }; + reducerSupplier = () -> { + throw new UnsupportedOperationException("No reducer supplier"); + }; + } + + @Override + public Map<K, OUT> call() { + Set<Mapper<IN, K, V>> mappers = new HashSet<>(); + // TODO: for each element in inputList, create a mapper thread, set its input, and execute it + // helpful methods: Supplier.get() ForkJoinPool.execute() + + Map<K, List<V>> mapResults = new HashMap<>(); + // TODO: for each mapper thread, join and merge the results + // helpful methods: ForkJoinTask.join() + + Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>(); + // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it + // helpful methods: Supplier.get() ForkJoinPool.execute() + + Map<K, OUT> result = new HashMap<>(); + // TODO: for each reducer thread, join and merge the result to the final result + // helpful methods: ForkJoinTask.join() + + throw new UnsupportedOperationException("MapReduce.call() not implemented yet."); + } + + /** + * @param aMapperSupplier construct mapper thread + */ + public void setMapperSupplier(Supplier<Mapper<IN, K, V>> aMapperSupplier) { + mapperSupplier = aMapperSupplier; + } + + /** + * @param aReducerSupplier construct reducer thread + */ + public void setReducerSupplier(Supplier<Reducer<K, V, OUT>> aReducerSupplier) { + reducerSupplier = aReducerSupplier; + } + + /** + * @param anInput the task's set of inputs + */ + public void setInput(List<IN> anInput) { + inputList = anInput; + } + +} diff --git a/mapreduce/Mapper.java b/mapreduce/Mapper.java new file mode 100644 index 0000000..c2da326 --- /dev/null +++ b/mapreduce/Mapper.java @@ -0,0 +1,22 @@ +package mapreduce; + +import java.util.Map; +import java.util.concurrent.RecursiveTask; + +/** + * + * @author mph + * @param <IN> input type + * @param <K> key type + * @param <V> accumulator type + */ +public abstract class Mapper<IN, K, V> extends RecursiveTask<Map<K, V>> { + protected IN input; + + /** + * @param anInput list of input items + */ + public void setInput(IN anInput) { + input = anInput; + } +} diff --git a/mapreduce/Reducer.java b/mapreduce/Reducer.java new file mode 100644 index 0000000..982ddfc --- /dev/null +++ b/mapreduce/Reducer.java @@ -0,0 +1,26 @@ +package mapreduce; + +import java.util.List; +import java.util.concurrent.RecursiveTask; + +/** + * + * @author mph + * @param <K> key + * @param <V> valueListumulator + * @param <OUT> output value + */ +public abstract class Reducer<K, V, OUT> extends RecursiveTask<OUT> { + + protected K key; + protected List<V> valueList; + + /** + * @param aKey key for this reducer + * @param aList list of values + */ + public void setInput(K aKey, List<V> aList) { + key = aKey; + valueList = aList; + } +}; |