server端:
import java.util.Map;
import org.apache.thrift7.TException; import backtype.storm.Config; import backtype.storm.ILocalDRPC; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.generated.DRPCExecutionException; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class RemoteDRCPTopology { public static class drcpBolt extends BaseRichBolt { private Map stormConf;private TopologyContext context;private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) { this.stormConf = stormConf;this.context = context;this.collector = collector;}/*** 第一个 :函数名 第二个 :參数*/public void execute(Tuple input) { String value = input.getString(1);value = "hello" + value;this.collector.emit(new Values(input.getValue(0), value));}public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name", "value"));}}public static void main(String[] args) throws Exception, TException { LinearDRPCTopologyBuilder topologyBuilder = new LinearDRPCTopologyBuilder("aaa");topologyBuilder.addBolt(new drcpBolt());Map stormConf;StormSubmitter.submitTopology(RemoteDRCPTopology.class.getSimpleName(),new Config(), topologyBuilder.createRemoteTopology());} }client:
import backtype.storm.utils.DRPCClient; public class ClientRemoteDRPC { public static void main(String[] args) throws Exception{ DRPCClient drpcClient = new DRPCClient("192.168.80.20", 3772);String res = drpcClient.execute("aaa", "哈哈哈444我被远程调用了");System.out.println(res); } }结果输出为:
hello哈哈哈444我被远程调用了