Distributed Systems

Parallel DNS reverse lookups

Need to do lots of reverse DNS lookups for some reason? Maybe b/c you’re trying to get a seed list for a web crawl or hack attempt on a bunch of ISPs. Who cares. Here’s a quick way to generate names from a big list of IPs like:

1.1.1.1
1.1.1.2
[...]
254.254.254.253
254.254.254.254

We can use hadoop streaming to chunk the list so we can do the DNS lookups in parallel. Easy and requires little to know thought:

./bin/hadoop jar contrib/streaming/*-streaming.jar -input /home/aday/classC.dat -output /home/aday/classC_dns.dat -mapper 'perl -ne '\''print `host $_`'\''' -numReduceTasks 0

We wrap the host call in backticks so we can trap non-zero exit codes and get an error message on stdout courtesy of perl.

Distributed Systems
Hadoop
Java

Comments (0)

Permalink

Mahout ♥ HBase

Well, more accurately I’m using and liking both Mahout and HBase in my work at BiggerBoat, but this is a more fun title and I like using the more obscure HTML character entities.

Anyway, I posted an adapter class to Apache JIRA on Friday for Mahout / HBase integration that allows HTables to effectively be manipulated as sparse vectors (read through RowResult, write through BatchUpdate) in Mahout using the Vector interface. Check it out, MAHOUT-78.

Looks like Mahout needs to generate some HTML for their javadoc.

HBase
Mahout

Comments (0)

Permalink

Thoughts on Hadoop JobTracker/TaskTracker Scheduling

Had a brief, interesting conversation on freenode #hadoop today with Rapleaf Engineer Nathan Marz today about scheduling in Hadoop.

Pretty much supports my sense that scheduling is not Hadoop’s strong suit. It’s really pretty shitty. Would be great to see some more cross-pollination between the Beowulf (SGE, PBS, Globus) and MapReduce (Hadoop, HBase) communities. The former have more mature scheduling, resource management and permissions models. They don’t really do a good job thought with providing a framework for distributed, parallel computing at the application level though — everything is roll-your-own. Perhaps Hadoop could be integrated as a parallel environment to consume resources from a SGE master [1, 2] rather than managing its own mapper/reducer pools.

A less ambitious scheduler improvement is to modify the way the Hadoop scheduler allocates map/reduce resources. The main itch I’m trying to scratch right now has to do with the coupling of map/reduce allocation. There are some cases where it seems this shouldn’t be done. Read the dialog with Nathan below if you care to know more.

allenday is it possible to decouple mapper and reducer slot allocation for jobs?
allenday i mean, if a job is #1 in the MR queue, but it is not yet ready to reduce, can it be prevented from consuming reducer slots?
|<– Smokinn has left irc.freenode.net ()
|<– savage- has left irc.freenode.net (Read error: 110 (Connection timed out))
–>| overlast (n=overlast@19.181.210.220.dy.bbexcite.jp) has joined #hadoop
|<– overlast has left irc.freenode.net (”Leaving…”)
nathanmarz allenday: i think that would be hard… reducing starts while the mapping is happening (copy stage)
allenday nathanmarz, i frequently find that while the reduce has “started”, it can just sit there for a long time doing nothing
allenday this is most common with nutch
allenday so there could be a bunch of other jobs further back in the queue that get starved for reduces b/c the head of the queue is squatting on the slots
nathanmarz it just sits there in the reduce phase?
allenday for sure nutch does, yeah
allenday during fetch, when it crawling
|<– cutting has left irc.freenode.net (”Leaving.”)
nathanmarz i see
nathanmarz i don’t have that much familiarity with nutch
nathanmarz is it possible to increase the number of reducers?
allenday yep, but then you can get into i/o trouble later
nathanmarz for the job i mean, not the cluster
allenday oh
allenday it sounds like you propose having these squatters consume minimal # of reducers (e.g. only 1)
nathanmarz actually, the opposite
nathanmarz let’s say you have 16 reduce slots
nathanmarz and the job i set to use 16 reducers
nathanmarz each one of those reducers potentially has to go over a lot of data
nathanmarz if the job is instead set to use a lot more reducers, like 100 or something
nathanmarz than an individual reducer will go a lot faster
nathanmarz and potentially, those freed reduce slots will go to jobs with higher priority
allenday ok, so you introduce priority to bump the further back ahead in the queue
nathanmarz yea
allenday is that settable in jobconf?
nathanmarz you can set num reducers
–>| tobias_au (n=opera@CPE-121-50-201-65.dsl.OntheNet.net) has joined #hadoop
allenday so let’s suppose the job that squats on reduce slots gets to the head of the queue. regardless of if it has 16 or 100 reducers configured
nathanmarz JobConf#setNumReduceTasks
allenday and that it it still in map phase only. has not begun reducing yet
allenday until one of those reduces finishes (i.e. the map has finished) all slots are still filled
allenday it’s only when the first reduce finishes that the job at #2 can take over a reduce slot
nathanmarz right
nathanmarz yea that’s true
allenday that’s bad
nathanmarz this scheme doesn’t help until mappers finished
allenday you really want this #1 job
allenday when it is allocating reducers
allenday to have low priority in acquiring the slots
nathanmarz right
nathanmarz well you don’t want it to acquire any slots until mappers finish
allenday so you give reduce slots to #2, #3, #4, etc. until everyone who wants slots has them. then you assign to #1
allenday or until #1 is ready…
allenday is it just me or does the queueing system in hadoop kind of suck?
allenday i am coming here from sun grid which puts a lot of emphasis on this aspect
nathanmarz well, the priority system will work if you start job #1 after the other jobs
nathanmarz if you start the other jobs after #1 then they will get starved of reducers
allenday heh, but the whole reason it is in #1 is because it was submitted first, right? isn’t hadoop FIFO wrt jobs?
nathanmarz if they’re the same priority
nathanmarz so maybe decreasing the reducers job #1 uses is the way to go
nathanmarz set it so it doesn’t use all the reduce slots on the cluster
allenday i need to do some research to see if there are jira open for improving the scheduler. or if there are some commercial plugins to improve the scheduling
nathanmarz definitely room for improvement, agreed
allenday yeah, that was what i thought you meant initially. it’s a hack too though, and breaks down when the number of jobs gets large
allenday i’m surprised they are coupled. do you understand how it works when the mapper hands off to the reducer?
allenday b/c i don’t and i need to
nathanmarz yes
allenday can i get the 2min version?
nathanmarz the reason the reducers start while the mappers are running is because there’s some work they can do without all the map data
nathanmarz each reducer needs to copy the relevant outputs from all the mappers to its machine
nathanmarz this is called the “copy” phase and can occur in parallel with mapping
allenday ok, i’ve seen that
allenday so what we need is a flag taht indicates there will be no data to copy until maps all finish
nathanmarz yea
nathanmarz a flag that says not to pipeline the process
allenday default behavior is to have the flag off and copy greedily
allenday which is like it does now
allenday turn the flag on says to wait until upstream map finishes before grabbing a reduce slot and kicking off the copy
allenday **all upstream maps
nathanmarz http://hadoop.apache.org/core/docs/current/hadoop-default.html
nathanmarz those are all the hadoop config parameters
nathanmarz you might be able to find something in there
allenday yeah, i fiind goodies in there every time i read that page :)
allenday i am only ~1mo into hadoop
allenday here’s another scheduling related question/issue i’m having
allenday i find that job i/o and cpu usage tend to synchronize after a while
allenday b/c if there is a slow moving job in the queue, all the others tend to get jammed behind it
allenday have you seen this?
nathanmarz no, i haven’t
nathanmarz but that’s interesting
allenday it comes back to resource (mis)allocation by the scheduler
nathanmarz how are you measuring that?
allenday it’s this same issue where jobs will consume all the slots
allenday so if you have a slow moving thing blocking all the resources
allenday no one else can get past
allenday then when the slow moving job finishes, the others all start getting processed very quickly (high cpu load during map), then as they begin to finish there is a flurry of i/o
allenday it’s like congestion on the freeway where one car slams on the breaks it sends this wave of traffic jam behind it
allenday assuming the freeway is already close to capacity (not sparse)

Distributed Systems
Hadoop
Java
Random musings
SGE

Comments (0)

Permalink

Cascading for Hadoop

Need to check this out as an alternative to hadoop streaming

Hadoop
Java

Comments (0)

Permalink

SGE / Hadoop integration

Yet another interesting blog post I’ve found today on integrating Hadoop and Sun Grid Engine.

Distributed Systems
Hadoop
Java
SGE

Comments (1)

Permalink

Quality Control and Monitoring at Last.FM

I found the Last.fm blog today. They’re having a lot of fun with QC tools. Worth a read!

Distributed Systems
Java
Scalability

Comments (0)

Permalink

A new HBase blog

HBase commiter Jean-Daniel Cryans put up a blog for HBase. Yay, now I can get some distilled project news from my newsreader instead of having to wade through mailing list archives and/or lurk in IRC.

HBase

Comments (0)

Permalink

Hadoop streaming recipes

I started playing with Hadoop Streaming today because I needed to do the equivalent of the shell script

cat /some/input | cut -f 1 | sort | uniq &gt; /some/output

on an HDFS file.

The basic thing you want to do to get a map working follows. The general rule of thumb is that if there is one or more lines of output for each line of input, then you don’t need to use any reducers, hence the -numReduceTasks 0 option.

$HADOOP_HOME/bin/hadoop jar contrib/streaming/*-streaming.jar -input /some/input -output /some/output -mapper 'cut -f 1' -numReduceTasks 0

In my case though, I wanted to uniqify my list. Putting uniq into the mapper chain would cause the job to fail. Instead I had to drop the -numReduceTasks 0 and do like so:

$HADOOP_HOME/bin/hadoop jar contrib/streaming/*-streaming.jar -input /some/input -output /some/output -mapper 'cut -f 1' -reducer 'uniq'

Note also that I didn’t need to include the sort from my original shell command. That’s because sorting is implicit in the MapReduce process.

As usual, I’m new to all of this, so if you have any insights leave a comment.

Distributed Systems
Hadoop
Java

Comments (0)

Permalink

Using Nutch to download large binary media and image files

Here’s a recipe for using Nutch to crawl some site(s) and extract out the images.  I’m blogging this because I couldn’t find (no surprise here, sigh) any documentation or complete examples via mailing list archives for how to do this.

Step 1: modify Nutch URL filters

Okay, so first thing, modify $NUTCH_HOME/conf/crawl-urlfilter.txt .  Let’s assume you only care about JPEG images, change this line:

-\.(gif|GIF|jpg|JPG|png|PNG|ico|ICO|css|sit|eps|wmf|zip|ppt|mpg|xls|gz|rpm|tgz|mov|MOV|exe|jpeg|JPEG|bmp|BMP)$

to this:

-\.(gif|GIF|png|PNG|ico|ICO|css|sit|eps|wmf|zip|ppt|mpg|xls|gz|rpm|tgz|mov|MOV|exe|bmp|BMP)$

also update the “MY.DOMAIN.NAME” section appropriately.

Step 2: set up crawl configuration

Edit $NUTCH_HOME/conf/nutch-site.xml. You want to update/add properties for http.content.limit and file.content.limit so that your big files don’t get truncated. Look at $NUTCH_HOME/conf/nutch-default.xml for examples of how to do this. You might also want to adjust Protocol.CHECK_ROBOTS ;)

Step 3: crawl

I’m not going to go into this here as it is well-covered elsewhere. Basically you just want to make a list of seed URLs, then let nutch do its thing, e.g. like:

$NUTCH_HOME/bin/nutch crawl /home/allenday/urls -dir /home/allenday/crawled -depth 5

This is going to generate some directories under /home/allenday/crawled/segments.

Step 4: massage crawl outputs and extract images

Merge the crawl segments into one big segment. This makes the following steps easier.

$NUTCH_HOME/bin/nutch mergesegs /tmp/merged /home/allenday/crawled/segments

Now dump the segment and show the image URLs fro the crawl.

$NUTCH_HOME/bin/nutch readseg -dump /tmp/merged/* /tmp/dump
$NUTCH_HOME/bin/hadoop dfs -cat /tmp/dump/dump | grep -aE 'URL'

The grep should show something like this:

URL:: http://spicylogic.com/some-url.html
URL:: http://spicylogic.com/some-url.jpg

Obviously you’re interested in grepping for jpg, jpeg, etc. Do it.

Once you have the image list, you can use this little Java program to pull the images out of the segment one by one.

package com.spicylogic.allenday;
 
//JDK imports
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.zip.InflaterInputStream;
 
//Hadoop imports
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayFile;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
 
//Nutch imports
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.util.MimeUtil;
import org.apache.nutch.util.NutchConfiguration;
 
public final class ExtractFile {
  public static void main(String argv[]) throws Exception {
 
    String usage = "Content (-local | -dfs &lt;namenode:port&gt;) url segment";
 
    if (argv.length &lt; 3) {
      System.out.println("usage:" + usage);
      return;
    }
    Configuration conf = NutchConfiguration.create();
    FileSystem fs = FileSystem.parseArgs(argv, 0, conf);
    try {
      String segment = argv[1];
 
      Path file = new Path(segment, Content.DIR_NAME + "/part-00000/data");
      SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
 
      Text key = new Text();
      Content content = new Content();
 
      while (reader.next(key, content)) {
        //System.err.println( key + "\t=\t" + argv[0] );
        if (key.equals(new Text(argv[0]))) {
          System.out.write( content.getContent(), 0, content.getContent().length );
          break;
        }
      }
      reader.close();
    } finally {
      fs.close();
    }
  }
}

Compile it, then you can do like so:

$NUTCH_HOME/bin/hadoop --config YOUR:CLASS:PATH com.spicylogic.allenday.ExtractFile http://spicylogic.com/some-url.jpg /tmp/merged/* &gt; out.jpg

Hope that helps. Let me know if you have corrections/clarifications (or a complete script!) for this post and I’ll be happy to merge them in with attribution.

Thanks for this post go to Kaz Muzik, who was doing something similar to back up his blog. Also, the com.spicylogic.allenday.ExtractFile class is based on org.apache.nutch.protocol.Content class.

Java
Nutch

Comments (0)

Permalink

HBase bulk load/import example

Per my earlier post, I’ve almost finished an (actually compilable, functional) bulk loader example. Should be able to post it tomorrow afternoon to the HBase/MapReduce page, assuming I don’t get stuck in interviews/meetings all day.

HBase
Hadoop
Java

Comments (0)

Permalink