Jump to content

Photo

Watching an HDFS folder

hadoop hdfs java monitor folder watch folder

  • Please log in to reply
9 replies to this topic

#1 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 17 April 2013 - 01:26 PM

I am working on an OS service that will need to watch an HDFS folder for any new files.  If a new file appears then the file gets moved to a staging area where it does further processing...

 

Anyway, I just need to figure out a good way to pull files that show up.  One big requirements is that the file has to have finished copying to the HDFS mount point before I can move it.  So I am thinking something along these lines:

 

1.) Look for new files (Every 5000 ms)

2.) If found new file step 3, else step 1.

3.) check file size every 250 ms.

3.) If file size after 250ms equals the file size before 250 ms. Assume copy is complete. else, step 3.

4.) move file to staging area for processing.

 

Of course last night I was considering how long it will take to perform the 'checkFileSize()' method and remove that from the 250 ms... to make up the diff..  I plan on having my timers configurable through an argument to the main method or some conf yaml file external to the compiled jar.

 

Anyway, thoughts would be appreciated.. here is the main class i will be using: FileSystem

http://archive.cloud...FileSystem.html

 

 



#2 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 17 April 2013 - 09:46 PM

update:

 

File Size is not updated during the write process. It is only updated whenever a data block has finished being written to.  Thus a block size of 64MB will only show file status when it is finished, or if it progresses past the block size, at which point the file shows a new block size of 128MB, and so on...

 

I discovered this just by running the command # cat /dev/urandom > /hdfs/path/random.bin

 

This starts writing random data to the file specified after the > until you break out (Ctrl + c).  I let it run while checking file size.  It just started at 0KB, then 64MB, then 128MB.  When i stopped the file creation it flushed the byte array, closed the data stream and reported ~144.7 MB. 

 

 


Thus, i need to monitor the status of each file's associated data block(s) for growth/writes.



#3 K_N

K_N

    Megabyte

  • Members
  • 576 posts
  • LocationPhoenix

Posted 18 April 2013 - 01:58 AM

Do you have access to the service that uploads the files? If so, just tag files that haven't finished uploading with .part or similar. Otherwise it's pretty ridiculous to have to monitor a file for completion when you don't know the expected size or upload speed of the user.


Rumors of my demise have been greatly exaggerated.


#4 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 23 April 2013 - 06:04 AM

Unfortunately I do not.  Several different services will be dropping files using cross domain guards and what not.  I actually came up with an interesting solution.  I'll post the code when i get to the office today. (if i can remember)



#5 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 23 April 2013 - 07:49 AM

package com.saic.hdfs.monitor;

import java.util.HashMap;
import java.util.Properties;

import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;

public class Main {

	// environment settings.
	private static String zooHost = "gmdevlocal";
	private static String zooPort = "2181";
	private static String nameNode = "hdfs://namenodeHostname";
	private static String nameNodePort = "8020";
	private static String targetFolder = "/blah/attachments";
	private static String kafkaTopic = "EMBus";

	// default settings.
	private static long waitTime = 10000;
	private static Configuration hdfsConfig;
	private static boolean isRunning = true;
	static Producer<String, String> kafkaProducer;

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		// Instantiate HDFS Configuration.
		hdfsConfig = new Configuration();
		hdfsConfig.set("fs.default.name", nameNode + ":" + nameNodePort);

		// Instantiate Kafka Producer.
		setupKafka();

		// HashMap for File Checking.
		HashMap<String, HdfsFileEntry> fileMap = new HashMap<String, HdfsFileEntry>();

		// Start 'monitor' loop.
		while (isRunning) {

			// Wait n time.
			try {
				Thread.sleep(waitTime);

				// Instantiate FS from Cache.
				FileSystem fs = null;
				fs = FileSystem.get(hdfsConfig);

				// Poll the target folder
				FileStatus[] files = null;
				files = fs.listStatus(new Path(targetFolder));

				//process each file reported by FS cache.
				for (FileStatus fileStatus : files) {

					String uniquePath = fileStatus.getPath().toString();

					// If the file is new, add to the Map and continue.
					if (!fileMap.containsKey(uniquePath)) {

						fileMap.put(uniquePath, makeEntry(fileStatus));
						continue;

						// otherwise the file was there last loop.
					} else if (!fileMap.get(uniquePath).isProcessed()) {

						// if the file size is 0 or evenly divisable by
						// blocksize it is still growing
						if (fileStatus.getLen() == 0
								|| fileStatus.getLen()
										% fs.getDefaultBlockSize() == 0) {

							// Update with new size.
							fileMap.get(uniquePath).setLastReportedSize(
									fileStatus.getLen());

						} else {

							// Send notification to Kafka!
							sendKafkaString(fileStatus.getPath().getName());
							fileMap.get(uniquePath).setProcessed(true);

						}

					}

				}

				// Close the FS.
				fs.close();

			} catch (Exception e) {
				e.printStackTrace();
			}

		}
		// End of Main Thread.
		kafkaProducer.close();
	}

	private static void setupKafka() {

		Properties props = new Properties();
		props.put("zk.connect", zooHost + ":" + zooPort);
		props.put("serializer.class", "kafka.serializer.StringEncoder");

		try {
			ProducerConfig config = new ProducerConfig(props);
			kafkaProducer = new Producer<String, String>(config);
		} catch (ZkTimeoutException e) {
			System.out.println("Unable to connect to zookeeper host ("
					+ zooHost + ") within 6 seconds.");
			isRunning = false;
		}
	}

	private static HdfsFileEntry makeEntry(FileStatus fileStatus) {

		HdfsFileEntry newFile = new HdfsFileEntry();
		newFile.setAge(System.currentTimeMillis());
		newFile.setLastReportedSize(fileStatus.getLen());
		newFile.setName(fileStatus.getPath().toString());
		newFile.setProcessed(false);
		return newFile;

	}

	private static boolean sendKafkaString(String message) {

		try {

			// The message is sent to a randomly selected partition registered
			// in ZK
			ProducerData<String, String> data = new ProducerData<String, String>(
					kafkaTopic, message);
			kafkaProducer.send(data);
			System.out.println("Sent: " + message);

		} catch (Exception e) {
			System.out.println("Error sending Kafka Message: " + message);
			return false;
		}
		return true;
	}

}


#6 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 23 April 2013 - 07:53 AM

package com.saic.hdfs.monitor;

public class HdfsFileEntry {

	private String name;
	private long age;
	private long lastReportedSize;
	private boolean processed;
	
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public long getAge() {
		return age;
	}
	public void setAge(long age) {
		this.age = age;
	}
	public long getLastReportedSize() {
		return lastReportedSize;
	}
	public void setLastReportedSize(long lastReportedSize) {
		this.lastReportedSize = lastReportedSize;
	}
	public boolean isProcessed() {
		return processed;
	}
	public void setProcessed(boolean processed) {
		this.processed = processed;
	}
	
}
 

 

 

 



#7 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 23 April 2013 - 08:01 AM

So the cached HDFS file system only reports block size while copying...  the default block size being 64 MB is 1024*1024*64 Bytes = 67108864

 

So with the code above, one assumption is made that i still have to modify.

 

If your file's reported size is EXACTLY 67108864 bytes or a number divisible by 67108864 with 0 as remainder (134217728...and so on),

then we assume that we are looking at a reported block size and the file has not finished copying.  The next step will be to add a counter for number of times the file size has been reported as evenly divisible by the 64MB block size.  If after 2 or 3 loops, (20 or 30 seconds) if it is still EXACTLY 64 MB, then we assume that 64MB is the actual size of the file...and we send the file name message to our message bus.

 

Most files being processed are less than 10 MB, so for my problem, this actually works well.  The process size sits around 60MB when running, which is also acceptable.

 

I would really like to hear suggestions on a different technique.



#8 K_N

K_N

    Megabyte

  • Members
  • 576 posts
  • LocationPhoenix

Posted 23 April 2013 - 10:12 PM

That seems like a fairly reasonable solution. The memory use is a little crazy, but it's Java, so that's expected.


Rumors of my demise have been greatly exaggerated.


#9 Champion of Cyrodiil

Champion of Cyrodiil

    Gigabyte

  • Members
  • 776 posts
  • LocationVirginia

Posted 24 April 2013 - 07:59 PM

I was doing some work with moving files on HDFS and i came across this article about small files being an issue with heap space in the namenode(where the file system meta data is stored).  However, it takes A LOT of files and directory entries, as each is ~100 bytes.  Thus a 6GB or more heap space will accommodate quite a few entries.

 

Which got me thinking about this entire process further.  The file system meta data is stored in the namenode as an 'entry' which points to a set of data blocks.  when you perform a 'move' on the hdfs, you're actually just changing the name of the datablock pointer in the namenode, not changing the actual file data blocks on the datanodes.  So why would it matter if the original file hadoop.fs.Path changed, while the data blocks are being populated?

 

 

There could be some kind of boolean 'lease' on the file system entry in the namenode that prohibits another client from modifying it during writes.

 

 

I guess I'll have to take a closer look at the hadoop.fs package if I really care that much.  The code above works, but K_N is right, it is heavy for a task that could be much more optimized.  



#10 K_N

K_N

    Megabyte

  • Members
  • 576 posts
  • LocationPhoenix

Posted 24 April 2013 - 08:18 PM

Just remember, sometimes the effort to write something incredibly efficient isn't worth the time spent over using a perfectly acceptable kludge.

 

If no one else is going to copy or evaluate this code, you may not need to bother.


Rumors of my demise have been greatly exaggerated.






Also tagged with one or more of these keywords: hadoop, hdfs, java, monitor folder, watch folder