public static class WordCountBolt extends BaseRichBolt{
OutputCollector _collector;
Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map conf,TopologyContext context,OutputCollector collector){
_collector=collector;
}
@Override
public void execute(Tuple input){
// 接收一个单词
String word = input.getString(0);
// 获取该单词对应的计数
Integer count = counts.get(word);
// 计数增加
if(count == null) {
count = 0;
} else {
count++;
}
// 将单词和对应的计数加入map中
counts.put(word, count);
System.out.println(word + ":" + count);
// 发送单词和计数(分别对应字段word和count)
_collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
// 定义一个字段
declarer.declare(new Fields("word","count"));
}
}
/**
* 主函数,创建topology
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
//创建一个Topology
TopologyBuilder builder=new TopologyBuilder();
// 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5
builder.setSpout("spout", new WordCountSpout(), 5);
// 设置分词Bolt,并行度为8,它的数据来源是spout的
builder.setBolt("split", new WordSplitBolt(), 8).shuffleGrouping("spout");
// 设置计数Bolt,你并行度为12,它的数据来源是split的word字段
builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
spout和bolt的executor设置为5,task为10,那实际运行的时候,它是会产生多少个实例?
如果是多例的话,在WordCountBolt中的成员变量counts不是会发生并发问题吗?
参考GPT和自己的思路:
根据给出的代码和设置,spout和bolt的executor数量都是5,而task数量是10,每个executor会运行2个task,因此实际运行时会有10个实例。每个实例都是在独立的worker进程中运行,它们之间是并行的。
在WordCountBolt中,counts确实会发生并发问题。因为在多个task同时执行execute方法时,它们会同时访问counts变量,可能出现同时对同一个单词做计数的情况。为了解决这个问题,可以考虑将counts变量放在一个支持并发访问的数据结构中,例如ConcurrentHashMap。这样就可以避免并发问题。