1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package mapreduce;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
/**
*
* @author mph
* @param <IN> input type
* @param <K> key type
* @param <V> value type
* @param <OUT> output type
*/
public class MapReduce<IN, K, V, OUT>
implements Callable<Map<K, OUT>> {
private List<IN> inputList;
private Supplier<Mapper<IN, K, V>> mapperSupplier;
private Supplier<Reducer<K, V, OUT>> reducerSupplier;
private static ForkJoinPool pool;
/**
*
*/
public MapReduce() {
pool = new ForkJoinPool();
mapperSupplier = () -> {
throw new UnsupportedOperationException("No mapper supplier");
};
reducerSupplier = () -> {
throw new UnsupportedOperationException("No reducer supplier");
};
}
@Override
public Map<K, OUT> call() {
Set<Mapper<IN, K, V>> mappers = new HashSet<>();
// TODO: for each element in inputList, create a mapper thread, set its input, and execute it
// helpful methods: Supplier.get() ForkJoinPool.execute()
Map<K, List<V>> mapResults = new HashMap<>();
// TODO: for each mapper thread, join and merge the results
// helpful methods: ForkJoinTask.join()
Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>();
// TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it
// helpful methods: Supplier.get() ForkJoinPool.execute()
Map<K, OUT> result = new HashMap<>();
// TODO: for each reducer thread, join and merge the result to the final result
// helpful methods: ForkJoinTask.join()
throw new UnsupportedOperationException("MapReduce.call() not implemented yet.");
}
/**
* @param aMapperSupplier construct mapper thread
*/
public void setMapperSupplier(Supplier<Mapper<IN, K, V>> aMapperSupplier) {
mapperSupplier = aMapperSupplier;
}
/**
* @param aReducerSupplier construct reducer thread
*/
public void setReducerSupplier(Supplier<Reducer<K, V, OUT>> aReducerSupplier) {
reducerSupplier = aReducerSupplier;
}
/**
* @param anInput the task's set of inputs
*/
public void setInput(List<IN> anInput) {
inputList = anInput;
}
}
|