aboutsummaryrefslogtreecommitdiff
path: root/mapreduce/MapReduce.java
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/MapReduce.java')
-rw-r--r--mapreduce/MapReduce.java84
1 files changed, 84 insertions, 0 deletions
diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java
new file mode 100644
index 0000000..f993f09
--- /dev/null
+++ b/mapreduce/MapReduce.java
@@ -0,0 +1,84 @@
+package mapreduce;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
+
+/**
+ *
+ * @author mph
+ * @param <IN> input type
+ * @param <K> key type
+ * @param <V> value type
+ * @param <OUT> output type
+ */
+public class MapReduce<IN, K, V, OUT>
+ implements Callable<Map<K, OUT>> {
+
+ private List<IN> inputList;
+ private Supplier<Mapper<IN, K, V>> mapperSupplier;
+ private Supplier<Reducer<K, V, OUT>> reducerSupplier;
+ private static ForkJoinPool pool;
+
+ /**
+ *
+ */
+ public MapReduce() {
+ pool = new ForkJoinPool();
+ mapperSupplier = () -> {
+ throw new UnsupportedOperationException("No mapper supplier");
+ };
+ reducerSupplier = () -> {
+ throw new UnsupportedOperationException("No reducer supplier");
+ };
+ }
+
+ @Override
+ public Map<K, OUT> call() {
+ Set<Mapper<IN, K, V>> mappers = new HashSet<>();
+ // TODO: for each element in inputList, create a mapper thread, set its input, and execute it
+ // helpful methods: Supplier.get() ForkJoinPool.execute()
+
+ Map<K, List<V>> mapResults = new HashMap<>();
+ // TODO: for each mapper thread, join and merge the results
+ // helpful methods: ForkJoinTask.join()
+
+ Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>();
+ // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it
+ // helpful methods: Supplier.get() ForkJoinPool.execute()
+
+ Map<K, OUT> result = new HashMap<>();
+ // TODO: for each reducer thread, join and merge the result to the final result
+ // helpful methods: ForkJoinTask.join()
+
+ throw new UnsupportedOperationException("MapReduce.call() not implemented yet.");
+ }
+
+ /**
+ * @param aMapperSupplier construct mapper thread
+ */
+ public void setMapperSupplier(Supplier<Mapper<IN, K, V>> aMapperSupplier) {
+ mapperSupplier = aMapperSupplier;
+ }
+
+ /**
+ * @param aReducerSupplier construct reducer thread
+ */
+ public void setReducerSupplier(Supplier<Reducer<K, V, OUT>> aReducerSupplier) {
+ reducerSupplier = aReducerSupplier;
+ }
+
+ /**
+ * @param anInput the task's set of inputs
+ */
+ public void setInput(List<IN> anInput) {
+ inputList = anInput;
+ }
+
+}