Jump to content

Photo

Thread-able java web server in clustered process

Threadable java webserver code storm

  • Please log in to reply
No replies to this topic

#1 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 17 May 2013 - 08:10 AM

I have been working with a distributed architecture called Storm for a while.  One issue I have with it is that I can not always get accurate metrics from the standard Ui that comes with storm.  So I decided that I could implement a thread inside each BaseTopology , to respond to web requests with cluster(topology) information.  Here it is:

 

package ottch.Tcp;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

import backtype.storm.LocalCluster;
import backtype.storm.generated.TopologySummary;

public class TcpServer extends Thread {

	/**
	 * @param args
	 * @throws IOException
	 */
	private boolean isRunning = true;
	private LocalCluster _cluster;

	public TcpServer(LocalCluster cluster) {
		_cluster = cluster;
	}

	public void run() {
		try {
			String clientSentence;

			ServerSocket welcomeSocket = new ServerSocket(1443);

			while (isRunning) {
				Socket connectionSocket = welcomeSocket.accept();
				BufferedReader inFromClient = new BufferedReader(
						new InputStreamReader(connectionSocket.getInputStream()));
				boolean autoflush = true;
				PrintWriter outToClient = new PrintWriter(
						connectionSocket.getOutputStream(), autoflush);

				clientSentence = inFromClient.readLine();

				if (clientSentence.equals("GET / HTTP/1.1")) {
					System.out.println("Received: " + clientSentence);

					TopologySummary ts = _cluster.getClusterInfo()
							.get_topologies().get(0);

					StringBuilder text = new StringBuilder();
					text.append("<h1>Topology Info</h1>");
					text.append(printField("Name", ts.get_name()));
					text.append(printField("ID", ts.get_id()));
					text.append(printField("Updtime (seconds)",
							Integer.toString(ts.get_uptime_secs())));
					text.append(printField("Status", ts.get_status()));
					text.append(printField("# Workers",
							Integer.toString(ts.get_num_workers())));

					outToClient.println(text);

				}

				connectionSocket.close();
			}

		} catch (Exception e) {
			System.out.println(e.getMessage());
		}

	}

	@Override
	public void interrupt() {
		// TODO Auto-generated method stub
		super.interrupt();
		isRunning = false;
	}

	private String printField(String field, String val) {
		return "<br/>" + field + ": " + val;
	}

}

 

 

Now for the implementation into the cluster (Line 69-71):

package ottch.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

import ottch.Tcp.TcpServer;
import ottch.storm.bo.Weapon;
import ottch.storm.bolts.*;
import ottch.storm.spouts.*;

public class BaseTopology {

	/**
	 * @param args
	 * @throws InvalidTopologyException
	 * @throws AlreadyAliveException
	 */
	public static void main(String[] args) {

		Config conf = new Config();
		// change this to true if error occur.
		conf.setDebug(false);

		
		TopologyBuilder builder = new TopologyBuilder();

		
		 /* Used for emitting Strings from Spout to Bolt. */
		 /*
		  builder.setSpout("stringspout", new StringSpout());
		  builder.setBolt("stringbolt", new
		  StringBolt()).shuffleGrouping("stringspout");*/
		 

		/* Used for emitting a serialized class object from Spout to Bolt. */
		/*
		 * builder.setSpout("classspout", new ClassSpout());
		 * builder.setBolt("classbolt", new ClassBolt()).shuffleGrouping(
		 * "classspout");
		 */
		// register serialization for object.
		//conf.registerSerialization(Weapon.class);

		
		if (args != null && args.length > 0) {
			conf.setNumWorkers(1);

			try {
				StormSubmitter.submitTopology(args[0], conf,
						builder.createTopology());
			} catch (AlreadyAliveException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		} else {

			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("test", conf, builder.createTopology());
			
			//launch web gui thread
			Thread newThread = new Thread(new TcpServer(cluster), "UI-THREAD");
			newThread.start();
			
			//Run Topology for 20 seconds, then kill.
			/*Utils.sleep(20000);			
			newThread.interrupt();
			cluster.killTopology("test");
			cluster.shutdown();*/
		}
	}
}

 

 







Also tagged with one or more of these keywords: Threadable, java, webserver, code, storm