I have been working with a distributed architecture called Storm for a while. One issue I have with it is that I can not always get accurate metrics from the standard Ui that comes with storm. So I decided that I could implement a thread inside each BaseTopology , to respond to web requests with cluster(topology) information. Here it is:
package ottch.Tcp; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import backtype.storm.LocalCluster; import backtype.storm.generated.TopologySummary; public class TcpServer extends Thread { /** * @param args * @throws IOException */ private boolean isRunning = true; private LocalCluster _cluster; public TcpServer(LocalCluster cluster) { _cluster = cluster; } public void run() { try { String clientSentence; ServerSocket welcomeSocket = new ServerSocket(1443); while (isRunning) { Socket connectionSocket = welcomeSocket.accept(); BufferedReader inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream())); boolean autoflush = true; PrintWriter outToClient = new PrintWriter( connectionSocket.getOutputStream(), autoflush); clientSentence = inFromClient.readLine(); if (clientSentence.equals("GET / HTTP/1.1")) { System.out.println("Received: " + clientSentence); TopologySummary ts = _cluster.getClusterInfo() .get_topologies().get(0); StringBuilder text = new StringBuilder(); text.append("<h1>Topology Info</h1>"); text.append(printField("Name", ts.get_name())); text.append(printField("ID", ts.get_id())); text.append(printField("Updtime (seconds)", Integer.toString(ts.get_uptime_secs()))); text.append(printField("Status", ts.get_status())); text.append(printField("# Workers", Integer.toString(ts.get_num_workers()))); outToClient.println(text); } connectionSocket.close(); } } catch (Exception e) { System.out.println(e.getMessage()); } } @Override public void interrupt() { // TODO Auto-generated method stub super.interrupt(); isRunning = false; } private String printField(String field, String val) { return "<br/>" + field + ": " + val; } }
Now for the implementation into the cluster (Line 69-71):
package ottch.storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import ottch.Tcp.TcpServer; import ottch.storm.bo.Weapon; import ottch.storm.bolts.*; import ottch.storm.spouts.*; public class BaseTopology { /** * @param args * @throws InvalidTopologyException * @throws AlreadyAliveException */ public static void main(String[] args) { Config conf = new Config(); // change this to true if error occur. conf.setDebug(false); TopologyBuilder builder = new TopologyBuilder(); /* Used for emitting Strings from Spout to Bolt. */ /* builder.setSpout("stringspout", new StringSpout()); builder.setBolt("stringbolt", new StringBolt()).shuffleGrouping("stringspout");*/ /* Used for emitting a serialized class object from Spout to Bolt. */ /* * builder.setSpout("classspout", new ClassSpout()); * builder.setBolt("classbolt", new ClassBolt()).shuffleGrouping( * "classspout"); */ // register serialization for object. //conf.registerSerialization(Weapon.class); if (args != null && args.length > 0) { conf.setNumWorkers(1); try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); //launch web gui thread Thread newThread = new Thread(new TcpServer(cluster), "UI-THREAD"); newThread.start(); //Run Topology for 20 seconds, then kill. /*Utils.sleep(20000); newThread.interrupt(); cluster.killTopology("test"); cluster.shutdown();*/ } } }