- 浏览: 272850 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
付小忠:
牛逼,解释到了点子上.
JAVA CAS原理深度分析 -
yhxf_ie:
csdn那些鬼转载都不注明出处的,这应该是原文了。
JAVA CAS原理深度分析 -
qq569349370:
终于找到一篇说得比较清楚的了,其他好多都是扰乱视听
JAVA CAS原理深度分析 -
lovemelong:
nice
JAVA CAS原理深度分析 -
Tyrion:
写的不错!
JAVA CAS原理深度分析
zz from:http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html
Storm简介
Storm
是一个分布式的、容错的实时计算系统,遵循Eclipse Public License
1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个
消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。
主要商业应用及案例:Twitter
Storm的优点
1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
2. 服务化,一个服务框架,支持热部署,即时上线或下线App.
3. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
4. 容错性。Storm会管理工作进程和节点的故障。
5. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
6. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
7. 快速。系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列。
8. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
Storm目前存在的问题
1. 目前的开源版本中只是单节点Nimbus,挂掉只能自动重启,可以考虑实现一个双nimbus的布局。
2. Clojure是一个在JVM平台运行的动态函数式编程语言,优势在于流程计算, Storm的部分核心内容由Clojure编写,虽然性能上提高不少但同时也提升了维护成本。
Storm架构
Storm 集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名 为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来 它们就变得十分健壮,两者的协调工作是由Zookeeper来完成的。ZooKeeper用于管理集群中的不同组件,ZeroMQ是内部消息系 统,JZMQ是ZeroMQMQ的Java Binding。有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群.
Storm术语解释
Storm 的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络.下面进行详细介绍:
Topologies 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce Job
Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理
Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作
Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks.
stream grouping分类
1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同.
2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts.
3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到.
4. Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.
5. Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.
6.
Direct Grouping:
直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct
Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过
TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
Storm如何保证消息被处理
storm 保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构), 并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。
一个tuple能根据新获取到的spout而触发创建基于此的上千个tuple
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
.fieldsGrouping(2, new Fields("word"));
这个topology从kestrel queue读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个tuple负责读取句子,每一个tuple分别对应计算每一个单词出现的次数,大概样子如下所示:
一个tuple的生命周期:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首 先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个tuple-id来追踪这个tuple。举例来说, KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:
collector.emit(new Values("field1", "field2", 3) , msgId);
接
下来, 这个发射的tuple被传送到消息处理者bolt那里,
storm会跟踪这个消息的树形结构是否创建,根据messageid调用Spout里面的ack函数以确认tuple是否被完全处理。如果tuple超
时就会调用spout的fail方法。由此看出同一个tuple不管是acked还是fail都是由创建他的那个spout发出的,所以即使spout在
集群环境中执行了很多的task,这个tule也不会被不同的task去acked或failed.
当kestrelspout从kestrel
队列中得到一个消息后会打开这个他,这意味着他并不会把此消息拿走,消息的状态会显示为pending,直到等待确认此消息已经处理完成,处于
pending状态直到ack或者fail被调用,处于"Pending"的消息不会再被其他队列消费者使用.如果在这过程中spout中处理此消息的
task断开连接或失去响应则此pending的消息会回到"等待处理"状态.
Storm的一些常用应用场景
1.流聚合
流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段。
builder.setBolt(5, new MyJoiner(), parallelism)
.fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))
.fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))
.fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))
2.批处理
有时候为了性能或者一些别的原因, 你可能想把一组tuple一起处理, 而不是一个个单独处理。
3.BasicBolt
1. 读一个输入tuple
2. 根据这个输入tuple发射一个或者多个tuple
3. 在execute的方法的最后ack那个输入tuple
遵循这类模式的bolt一般是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口: IbasicBolt
4.内存内缓存+Fields grouping组合
在
bolt的内存里面缓存一些东西非常常见。缓存在和fields
grouping结合起来之后就更有用了。比如,你有一个bolt把短链接变成长链接(bit.ly,
t.co之类的)。你可以把短链接到长链接的对应关系利用LRU算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并
缓存在内存里面。看一下下面两段代码有什么不一样:
builder.setBolt(2, new ExpandUrl(), parallelism)
.shuffleGrouping(1);
builder.setBolt(2, new ExpandUrl(), parallelism)
.fieldsGrouping(1, new Fields("url"));
5.计算top N
比如你有一个bolt发射这样的tuple: "value", "count"并且你想一个bolt基于这些信息算出top N的tuple。最简单的办法是有一个bolt可以做一个全局的grouping的动作并且在内存里面保持这top N的值。
这个方式对于大数据量的流显然是没有扩展性的, 因为所有的数据会被发到同一台机器。一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 然后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N, 代码大概是这样的:
builder.setBolt(2, new RankObjects(), parallellism)
.fieldsGrouping(1, new Fields("value"));
builder.setBolt(3, new MergeObjects())
.globalGrouping(2);
这个模式之所以可以成功是因为第一个bolt的fields grouping使得这种并行算法在语义上是正确的。
用TimeCacheMap来高效地保存一个最近被更新的对象的缓存
6.用TimeCacheMap来高效地保存一个最近被更新的对象的缓存
有时候你想在内存里面保存一些最近活跃的对象,以及那些不再活跃的对象。 TimeCacheMap 是一个非常高效的数据结构,它提供了一些callback函数使得我们在对象不再活跃的时候我们可以做一些事情.
7.分布式RPC:CoordinatedBolt和KeyedFairBolt
用
storm做分布式RPC应用的时候有两种比较常见的模式:它们被封装在CoordinatedBolt和KeyedFairBolt里面.
CoordinatedBolt包装你的bolt,并且确定什么时候你的bolt已经接收到所有的tuple,它主要使用Direct
Stream来做这个.
KeyedFairBolt同样包装你的bolt并且保证你的topology同时处理多个DRPC调用,而不是串行地一次只执行一个。
一.构建maven开发环境
为
了开发storm topology, 你需要把storm相关的jar包添加到classpath里面去:要么手动添加所有相关的jar包,
要么使用maven来管理所有的依赖。storm的jar包发布在Clojars(一个maven库),
如果你使用maven的话,把下面的配置添加在你项目的pom.xml里面。
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.5.3</version>
<scope>test</scope>
</dependency>
二.代码范例
1.Topology 入口点 RollingTopWords ------------类似于hadoop的Job定义
本地模式(嵌入Local):
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.starter.bolt.MergeObjects;
import storm.starter.bolt.RankObjects;
import storm.starter.bolt.RollingCountObjects;
public class RollingTopWords {
public static void main(String[] args) throws Exception {
final int TOP_N = 3;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new TestWordSpout(), 5);
builder.setBolt(2, new RollingCountObjects(60, 10), 4)
.fieldsGrouping(1, new Fields("word"));
builder.setBolt(3, new RankObjects(TOP_N), 4)
.fieldsGrouping(2, new Fields("obj"));
builder.setBolt(4, new MergeObjects(TOP_N))
.globalGrouping(3);
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster(); // 本地模式启动集群
cluster.submitTopology("rolling-demo", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
部署模式:
package storm.starter;
import storm.starter.bolt.MergeObjects;
import storm.starter.bolt.RankObjects;
import storm.starter.bolt.RollingCountObjects;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class RollingTopWords {
public static void main(String[] args) throws Exception {
final int TOP_N = 3;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new TestWordSpout(), 5);
builder.setBolt(2, new RollingCountObjects(60, 10), 4).fieldsGrouping(
1, new Fields("word"));
builder.setBolt(3, new RankObjects(TOP_N), 4).fieldsGrouping(2,
new Fields("obj"));
builder.setBolt(4, new MergeObjects(TOP_N)).globalGrouping(3);
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("demo", conf,
builder.createTopology());
Thread.sleep(10000);
}
}
2. 直接使用内置的TestWordSpout(随机产生一个word)
TestWordSpout
package backtype.storm.testing;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Random;
import org.apache.log4j.Logger;
public class TestWordSpout implements IRichSpout {
public static Logger LOG = Logger.getLogger(TestWordSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
public TestWordSpout() {
this(true);
}
public TestWordSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public boolean isDistributed() {
return _isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
3.各环节处理Bolt
RollingCountObjects 滚动计数word,并通过定时触发时间,清空计数列表
package storm.starter.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@SuppressWarnings("serial")
public class RollingCountObjects implements IRichBolt {
private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
private int _numBuckets;
private transient Thread cleaner;
private OutputCollector _collector;
private int _trackMinutes;
public RollingCountObjects(int numBuckets, int trackMinutes) {
_numBuckets = numBuckets;
_trackMinutes = trackMinutes;
}
public long totalObjects (Object obj) {
long[] curr = _objectCounts.get(obj);
long total = 0;
for (long l: curr) {
total+=l;
}
return total;
}
public int currentBucket (int buckets) {
return (currentSecond() / secondsPerBucket(buckets)) % buckets;
}
public int currentSecond() {
return (int) (System.currentTimeMillis() / 1000);
}
public int secondsPerBucket(int buckets) {
return (_trackMinutes * 60 / buckets);
}
public long millisPerBucket(int buckets) {
return (long) secondsPerBucket(buckets) * 1000;
}
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
cleaner = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
Integer lastBucket = currentBucket(_numBuckets);
while(true) {
int currBucket = currentBucket(_numBuckets);
if(currBucket!=lastBucket) {
int bucketToWipe = (currBucket + 1) % _numBuckets;
synchronized(_objectCounts) {
Set objs = new HashSet(_objectCounts.keySet());
for (Object obj: objs) {
long[] counts = _objectCounts.get(obj);
long currBucketVal = counts[bucketToWipe];
counts[bucketToWipe] = 0; // *这行代码很关键*
long total = totalObjects(obj);
if(currBucketVal!=0) {
_collector.emit(new Values(obj, total));
}
if(total==0) {
_objectCounts.remove(obj);
}
}
}
lastBucket = currBucket;
}
long delta = millisPerBucket(_numBuckets) - (System.currentTimeMillis() % millisPerBucket(_numBuckets));
Utils.sleep(delta);
}
}
});
cleaner.start();
}
public void execute(Tuple tuple) {
Object obj = tuple.getValue(0);
int bucket = currentBucket(_numBuckets);
synchronized(_objectCounts) {
long[] curr = _objectCounts.get(obj);
if(curr==null) {
curr = new long[_numBuckets];
_objectCounts.put(obj, curr);
}
curr[bucket]++;
_collector.emit(new Values(obj, totalObjects(obj)));
_collector.ack(tuple);
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("obj", "count"));
}
}
RankObjects
package storm.starter.bolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;
@SuppressWarnings("serial")
public class RankObjects implements IBasicBolt {
@SuppressWarnings("rawtypes")
List<List> _rankings = new ArrayList<List>();
int _count;
Long _lastTime = null;
public RankObjects(int n) {
_count = n;
}
@SuppressWarnings("rawtypes")
private int _compare(List one, List two) {
long valueOne = (Long) one.get(1);
long valueTwo = (Long) two.get(1);
long delta = valueTwo - valueOne;
if(delta > 0) {
return 1;
} else if (delta < 0) {
return -1;
} else {
return 0;
}
} //end compare
private Integer _find(Object tag) {
for(int i = 0; i < _rankings.size(); ++i) {
Object cur = _rankings.get(i).get(0);
if (cur.equals(tag)) {
return i;
}
}
return null;
}
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context) {
}
@SuppressWarnings("rawtypes")
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object tag = tuple.getValue(0);
Integer existingIndex = _find(tag);
if (null != existingIndex) {
_rankings.set(existingIndex, tuple.getValues());
} else {
_rankings.add(tuple.getValues());
}
Collections.sort(_rankings, new Comparator<List>() {
public int compare(List o1, List o2) {
return _compare(o1, o2);
}
});
if (_rankings.size() > _count) {
_rankings.remove(_count);
}
long currentTime = System.currentTimeMillis();
if(_lastTime==null || currentTime >= _lastTime + 2000) {
collector.emit(new Values(JSONValue.toJSONString(_rankings)));
_lastTime = currentTime;
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("list"));
}
}
MergeObjects 对排序结果进行归并
package storm.starter.bolt;
import org.apache.log4j.Logger;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;
@SuppressWarnings("serial")
public class MergeObjects implements IBasicBolt {
public static Logger LOG = Logger.getLogger(MergeObjects.class);
@SuppressWarnings({ "rawtypes", "unchecked" })
private List<List> _rankings = new ArrayList();
int _count = 10;
Long _lastTime;
public MergeObjects(int n) {
_count = n;
}
@SuppressWarnings("rawtypes")
private int _compare(List one, List two) {
long valueOne = (Long) one.get(1);
long valueTwo = (Long) two.get(1);
long delta = valueTwo - valueOne;
if(delta > 0) {
return 1;
} else if (delta < 0) {
return -1;
} else {
return 0;
}
} //end compare
private Integer _find(Object tag) {
for(int i = 0; i < _rankings.size(); ++i) {
Object cur = _rankings.get(i).get(0);
if (cur.equals(tag)) {
return i;
}
}
return null;
}
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context) {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void execute(Tuple tuple, BasicOutputCollector collector) {
List<List> merging = (List) JSONValue.parse(tuple.getString(0));
for(List pair : merging) {
Integer existingIndex = _find(pair.get(0));
if (null != existingIndex) {
_rankings.set(existingIndex, pair);
} else {
_rankings.add(pair);
}
Collections.sort(_rankings, new Comparator<List>() {
public int compare(List o1, List o2) {
return _compare(o1, o2);
}
});
if (_rankings.size() > _count) {
_rankings.subList(_count, _rankings.size()).clear();
}
}
long currentTime = System.currentTimeMillis();
if(_lastTime==null || currentTime >= _lastTime + 2000) {
String fullRankings = JSONValue.toJSONString(_rankings);
collector.emit(new Values(fullRankings));
LOG.info("Rankings: " + fullRankings);
_lastTime = currentTime;
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("list"));
}
}
相关推荐
1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...
Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容
1、全面掌握Storm技术开发、运维、调优; 2、掌握Storm完整项目开发思路和架构设计,陡直提升经验值! 3、掌握Storm Trident项目开发模式; 4、掌握Storm集成Kafka开发及项目实战; 5、掌握HighCharts各类图表...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
STORM的TOPOLOGY在线上运行时,随着数据量的增加,在一定的服务器性能及集群规模下,会渐渐达到一个极限,到达极限后,服务器的load、io、cpu、mem等可能会出现耗尽,系统很卡,storm吞吐量骤降的情况。本文档中截图...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...