aboutsummaryrefslogtreecommitdiff
path: root/mapreduce/MapReduce.java
blob: f993f091fda22bf441dd12171b4ff6064269c0f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package mapreduce;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;

/**
 *
 * @author mph
 * @param <IN>  input type
 * @param <K>   key type
 * @param <V>   value type
 * @param <OUT> output type
 */
public class MapReduce<IN, K, V, OUT>
    implements Callable<Map<K, OUT>> {

  private List<IN> inputList;
  private Supplier<Mapper<IN, K, V>> mapperSupplier;
  private Supplier<Reducer<K, V, OUT>> reducerSupplier;
  private static ForkJoinPool pool;

  /**
   *
   */
  public MapReduce() {
    pool = new ForkJoinPool();
    mapperSupplier = () -> {
      throw new UnsupportedOperationException("No mapper supplier");
    };
    reducerSupplier = () -> {
      throw new UnsupportedOperationException("No reducer supplier");
    };
  }

  @Override
  public Map<K, OUT> call() {
    Set<Mapper<IN, K, V>> mappers = new HashSet<>();
    // TODO: for each element in inputList, create a mapper thread, set its input, and execute it
    // helpful methods: Supplier.get() ForkJoinPool.execute()
    
    Map<K, List<V>> mapResults = new HashMap<>();
    // TODO: for each mapper thread, join and merge the results
    // helpful methods: ForkJoinTask.join()

    Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>();
    // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it
    // helpful methods: Supplier.get() ForkJoinPool.execute()

    Map<K, OUT> result = new HashMap<>();
    // TODO: for each reducer thread, join and merge the result to the final result
    // helpful methods: ForkJoinTask.join()
    
    throw new UnsupportedOperationException("MapReduce.call() not implemented yet.");
  }

  /**
   * @param aMapperSupplier construct mapper thread
   */
  public void setMapperSupplier(Supplier<Mapper<IN, K, V>> aMapperSupplier) {
    mapperSupplier = aMapperSupplier;
  }

  /**
   * @param aReducerSupplier construct reducer thread
   */
  public void setReducerSupplier(Supplier<Reducer<K, V, OUT>> aReducerSupplier) {
    reducerSupplier = aReducerSupplier;
  }

  /**
   * @param anInput the task's set of inputs
   */
  public void setInput(List<IN> anInput) {
    inputList = anInput;
  }

}