Distributed Systems

HBase bulk load/import example

Per my earlier post, I’ve almost finished an (actually compilable, functional) bulk loader example. Should be able to post it tomorrow afternoon to the HBase/MapReduce page, assuming I don’t get stuck in interviews/meetings all day.

HBase
Hadoop
Java

Comments (2)

Permalink

More thoughts on EC2 / EBS / Hadoop

I’m still getting up to speed on running Hadoop on EC2. Found this AWS post today describing how to easily port data into a hadoop cluster from S3, as well as easily create new Hadoop slaves using the AMI system images, start up clusters, and tear down clusters.

I made some comments yesterday about wanting to be able to scale the Hadoop cluster down as well as up, in particular being able to disable cores, which are the really expensive part of running a cluster on AWS.

Now we need to look into the AWS scripts and AMI images that are available to see how feasible it is to just maintain more data volumes than images. What I’m (roughly) thinking is that we might set up M data volumes for the DFS, but might want to run 1 <= N <= data/task nodes. In the case that N < M, you just load some of the N nodes with more than 1 EBS volume.

Also need to look into how HDFS deals with adding new volumes, i.e. will it just start replicating data onto nodes as they are added into the system? Is there a way to hot-add rather than restarting the data master? Hot-adding EBS volumes onto existing data nodes?

Distributed Systems
Java
Scalability

Comments (0)

Permalink

EC2 + EBS + Hadoop at BiggerBoat

Rodger and I at BiggerBoat got Hadoop and HBase up and running on Amazon EC2 today. We initially set up a cluster of 1 master and 10 slaves. After a quick calculation of how much this costs to keep running 24/7, we started trying to figure out how to scale the thing DOWN as well as UP, and to be able to do so dynamically. Seems like the tricky piece is the Hadoop storage, not so much the compute power available. Amazon just launched their Elastic Block Store a few days ago, so we’re seeing how that fits in. Seems like the EBS I/O is pretty good given our Bonnie++ tests.

Tom White has some architecture scenarios for building this kind of stuff.

Distributed Systems
Java
Scalability

Comments (1)

Permalink

Examples for data import, export, and transport with HBase

I’m in the process of setting up an analytic workflow at BiggerBoat. It’s looking like the main theme in data structures around here will be the sparse matrix. So I’ve been playing with opensource technologies for sparse matrices. Apache Hadoop’s HBase is looking like a good choice for now, maybe Hive later.

Right now I’m getting familiar with the former. As part of this, I’m improving the docs on the wiki to make them more user- (as opposed to core developer-) friendly. My documentation goal right now is to add some data transformation example code. There are already lots of hadoop examples for doing text -&gt text mapping, e.g. grep, cat, etc. For HBase not so much. I.e.

  • text to text (done, many examples
  • flatfile to HBase table (Bulk loader in the HBase wiki, I haven’t tried it yet)
  • HBase table to flatfile
  • HBase table to HBase table

I’ll be adding updated, complete, and simple code for the latter two (three?) in the next few days to the HBase/MapReduce page.

Analytics
Distributed Systems
Java

Comments (1)

Permalink

Hadoop / SGE Grid Engine Convergence

I’m an old hand with SGE and a more user of Hadoop / Pig.  Good to see that there is interest in making these technologies interoperate.

Distributed Systems
Java
Scalability
Science

Comments (0)

Permalink

Sun Grid Engine SGE state letter symbol codes meanings

Adapted from here.

Category State SGE Letter Code
Pending pending qw
pending, user hold qw
pending, system hold hqw
pending, user and system hold hqw
pending, user hold, re-queue hRwq
pending, system hold, re-queue hRwq
pending, user and system hold, re-queue hRwq
Running running r
transferring t
running, re-submit Rr
transferring, re-submit Rt
Suspended job suspended s, ts
queue suspended S, tS
queue suspended by alarm T, tT
all suspended with re-submit Rs, Rts, RS, RtS, RT, RtT
Error all pending states with error Eqw, Ehqw, EhRqw
Deleted all running and suspended states with deletion dr, dt, dRr, dRt, ds, dS, dT, dRs, dRS, dRT

Administration
Distributed Systems

Comments (0)

Permalink

MogileFS + FUSE + Bigfile support

Jordan and I have been migrating all the TinyTube data over to a MogileFS storage system.

Seems to be a very easy-to-use solution for scalable storage of lots of little files. Basically just works out of the box, including the alpha-quality mount-filepaths FUSE adapter available for MogileFS in the SixApart SVN.

I have two problems with mount-filepaths though:

  1. Doesn’t recognize data loaded outside FUSE. It assumes I want to see/read files that were loaded via its mapping of FUSE directory/file names to MogileFS keys, and it doesn’t show other keys at all. I have lots of preloaded data, so this is a dealbreaker.
  2. Doesn’t support large files. MogileFS has a 64MB limit per file, and if you want to load a bigger file, it splits it into chunks. This FUSE adapter is not aware of how to deal with big files.

So I did some heavy modification of the script. Here’s my version. No directory support, but it lets me read my big files by the original key (i.e. a stat on “bigfile1.mpg” may trigger a stat on “_big_info:bigfile1.mpg”), and any other file I’ve loaded outside of FUSE (e.g. with mogtool).

You can name it as myfuse.pl and then, assuming the script is in the current directory, use it to mount a filesystem like so: mkdir ./myfuse; perl ./myfuse.pl ./myfuse.

Maybe I’ll get around to re-formatting this as a patch for SixApart — but probably not.

#!/usr/bin/perl
use strict;
use warnings;
use threads;
use threads::shared;
use Fuse;
use LWP::UserAgent;
use LWP::Simple;
use List::MoreUtils qw(uniq);
use MogileFS::Client;
use Path::Class;
use POSIX qw(ENOENT EISDIR EINVAL);
my $DEBUG = 0;
our (%FILE_CACHE, $CACHE_SIZE, $CACHE_AGE) = (('/' => {size => 0, age => 0}));
 
# create client object w/ server-configured namespace and IPs of trackers
my $ua = LWP::UserAgent->new;
my $class = 'default';
my $mogilefs = MogileFS::Client->new(
  domain => 'ifap',
  hosts  => [ '10.10.0.100:6001', '10.10.0.101:6001' ],
);
 
my ($mountpoint) = "";
$mountpoint = shift(@ARGV) if @ARGV;
Fuse::main(
    debug      => $DEBUG,
    mountpoint => $mountpoint,
    threaded   => 1,
 
    getattr    => "main::e_getattr",
    getdir     => "main::e_getdir",
    mknod      => "main::e_mknod",
    open       => "main::e_open",
    read       => "main::e_read",
    rename     => "main::e_rename",
    statfs     => "main::e_statfs",
    unlink     => "main::e_unlink",
    write      => "main::e_write",
);
 
sub e_getattr {
    my $filename = shift;
    $filename =~ s#^.*/##;
    warn "main::e_getattr $filename\n" if $DEBUG;
 
    my ( $size, $modes );
    my ( $dev, $ino, $rdev, $blocks, $gid, $uid, $nlink, $blksize )
        = ( 0, 0, 0, 1, 0, 0, 1, 1024 );
    my ( $atime, $ctime, $mtime ) = ( time, time, time );
 
    if ( $filename !~ m{\.} ) {
        #        warn "directory!";
        $size  = 0;
        $modes = ( 0040 << 9 ) + 0755;
    } else {
        #        warn "file!";
        $size  = 123;
        $modes = ( 0100 << 9 ) + 0644;
 
        my @paths = $mogilefs->get_paths( $filename, { noverify => 1 } );
        my ( $content_type, $document_length, $modified_time, $expires, $server );
 
        if ( scalar( @paths ) ) {
            ( $content_type, $document_length, $modified_time, $expires, $server ) = head( $paths[0] );
            $size = $document_length;
            ( $atime, $ctime, $mtime ) = ($modified_time) x 3;
        }
        else {
            @paths = $mogilefs->get_paths( '_big_info:' . $filename, { noverify => 1 } );
            if ( scalar( @paths ) ) {
                my $data = $mogilefs->get_file_data( '_big_info:' . $filename );
                my ( $_des, $_type, $_compress, $_filename, $_chunks, $_size, undef, @_parts ) = split /\n/, $$data;
                ( $size ) = $_size =~ m#^size (\d+)$#;
                OUTER: foreach my $_part ( @_parts ) {
                    $_part =~ m#paths: (.+?)$#;
                    my @_paths = split ', ', $_part;
                    foreach my $_path ( @_paths ) {
                        ( $content_type, $document_length, $modified_time, $expires, $server ) = head( $paths[0] );
                        ( $atime, $ctime, $mtime ) = ($modified_time) x 3;
                        last OUTER if $server;
                    }
                }
            }
        }
        return -ENOENT() unless @paths;
    }
 
    warn(
        join(
            ",",
            (   $dev,   $ino,     $modes, $nlink, $uid,
                $gid,   $rdev,    $size,  $atime, $mtime,
                $ctime, $blksize, $blocks
            )
        ),
        "\n"
    ) if $DEBUG;
 
    return (
        $dev,  $ino,   $modes, $nlink, $uid,     $gid, $rdev,
        $size, $atime, $mtime, $ctime, $blksize, $blocks
    );
}
 
sub e_getdir {
    my $prefix = shift;
    warn "main::e_getdir $prefix\n" if $DEBUG;
    my @filenames;
    $mogilefs->foreach_key(
#        prefix => $prefix,
        sub {
          my $filename = shift;
          push @filenames, $filename;
          push @filenames, file( $filename )->parent();
        }
    );
 
    @filenames = uniq @filenames;
    warn "returning: @filenames\n" if $DEBUG;
    return ( @filenames, 0 );
}
 
sub e_mknod {
    my $filename = shift;
    $filename =~ s#.*/##;
 
    warn "main::e_mknod $filename\n" if $DEBUG;
 
    my $fh = $mogilefs->new_file($filename, undef);
    if ( $fh ) {
        print $fh "\n";
 
        unless ($fh->close) {
            my ($code, $str) = ($mogilefs->errcode || -1, $mogilefs->errstr || '');
            warn "Error creating file:$code: $str" if $DEBUG;
            $! = $str;
            $? = $code;
            return -1;
        }
        return 0;
    }
    else {
        return -1;
    }
}
 
sub e_open {
    my $filename = shift;
    $filename =~ s#.*/##;
    warn "main::e_open $filename\n" if $DEBUG;
 
    return -EISDIR() unless $filename =~ m{\.};
    my @paths = $mogilefs->get_paths( $filename, { noverify => 1 } );
    if ( ! scalar( @paths ) ) {
        @paths = $mogilefs->get_paths( '_big_info:' . $filename, { noverify => 1 } );
    }
    return -ENOENT() unless @paths;
    return 0;
}
 
sub e_read {
    my ( $filename, $length, $offset ) = @_;
    $filename =~ s#.*/##;
    warn "main::e_read $filename $length $offset\n" if $DEBUG;
 
    return -EISDIR() if $filename =~ m{/$};
 
 
 
    my $maxoff = $offset + ( $length - 1 );
warn "requested offset=$offset length=$length bytes=$maxoff" if $DEBUG;
    my @paths = $mogilefs->get_paths( $filename, { noverify => 1 } );
    my $size;
    my ( $content_type, $document_length, $modified_time, $expires, $server );
 
    if ( scalar( @paths ) ) {
warn "it's a small file" if $DEBUG;
        ( $content_type, $document_length, $modified_time, $expires, $server ) = head( $paths[0] );
        $size = $document_length;
        return 0 if $offset == $document_length;
 
        $maxoff = $document_length if $maxoff > $document_length;
        my $range = $offset . "-" . $maxoff;
        warn "  Range: bytes=$range\n" if $DEBUG;
        my $response = $ua->get( $paths[0], "Range" => "bytes=$range" );
        if ( $response->is_success ) {
            return $response->content;
        } else {
            warn $response->as_string if $DEBUG;
        }
    }
    else {
        @paths = $mogilefs->get_paths( '_big_info:' . $filename, { noverify => 1 } );
        return -ENOENT() unless @paths;
warn "it's a big file" if $DEBUG;
 
        my $data = $mogilefs->get_file_data( '_big_info:' . $filename );
        my ( $_des, $_type, $_compress, $_filename, $_chunks, $_size, undef, @_parts ) = split /\n/, $$data;
        ( $size ) = $_size =~ m#^size (\d+)$#;
        return 0 if $offset == $size;
 
        $maxoff = $size if $maxoff > $size;
 
        my $part_min = -1;
        my $part_max = -1;
#part 2 bytes=23080964 md5=af45f7ac80ca34328db3c90de1db1ab0 paths: http://10.10.0.100:7500/dev8/0/000/119/0000119969.fid, http://10.10.0.101:7500/dev2/0/000/119/0000119969.fid
 
        my $buf = '';
        foreach my $_part ( @_parts ) {
            my ( $_bytes, $_paths ) = $_part =~ m#bytes=(\d+) .+? paths: (.+?)$#;
 
            $part_min = $part_max > 0 ? $part_max + 1 : 0;
            $part_max += $_bytes;
 
 
warn "examining part $part_min -> $part_max" if $DEBUG;
 
            #chunk too early
            next if ( $part_max < $offset );
            #chunk too late;
            next if ( $part_min > $maxoff );
 
warn "using part $part_min -> $part_max" if $DEBUG;
 
            my @_paths = split ', ', $_paths;
            foreach my $_path ( @_paths ) {
warn "offset=$offset part_min=$part_min maxoff=$maxoff part_max=$part_max" if $DEBUG;
                my $range = ($offset-$part_min) . "-" . ($maxoff-$part_min);
warn "getting Range: bytes=$range" if $DEBUG;
                my $response = $ua->get( $_path, "Range" => "bytes=$range" );
                if ( $response->is_success() ) {
                    $buf .= $response->content();
warn "data length=".length($buf) if $DEBUG;
                    if ( length( $buf ) == $length ) {
warn "got all the data (1)!" if $DEBUG;
                        return $buf;
                    }
                    elsif ( $offset + length( $buf ) == $maxoff ) {
warn "got all the data (2)!" if $DEBUG;
                        return $buf;
                    }
                    $offset += length( $buf );
                    last;
                }
            }
        }
        if ( length( $buf ) != $length ) {
            return -ENOENT();
        }
        else {
            return $buf;
        }
    }
}
 
sub e_rename {
    my ( $old, $new ) = @_;
    $old =~ s#.*/##;
    $new =~ s#.*/##;
 
    warn "main::e_rename: $old -> $new" if $DEBUG;
 
    # Rename this file
    $mogilefs->rename($old, $new);
 
    return 0;
}
 
sub e_statfs { return 255, 1, 1, 1, 1, 2 }
 
sub e_unlink {
    my $filename = shift;
    $filename =~ s#.*/##;
 
    warn "main::e_unlink: $filename" if $DEBUG;
 
    $mogilefs->delete($filename);
 
    return 0;
}
 
sub e_write {
    my ( $filename, $buf, $offset ) = @_;
    $filename =~ s#.*/##;
 
    warn("main::e_write: $filename pos=$offset len=".length($buf)) if $DEBUG;
 
    my $finfo = get_file_info($filename);
 
    return -ENOENT() unless $finfo;
 
    my $cont = get_file_data($filename);
 
    substr($$cont, $offset, length($buf), $buf);
 
    $mogilefs->store_content($filename, undef, $cont);
    rm_file_cache($filename);
 
    return length($buf);
}
 
#################################################
sub logmsg { warn(join "\t",@_) if $DEBUG }
 
sub get_file_data {
    my ($file) = @_;
    my $entry = $FILE_CACHE{$file};
    my $meta  = $FILE_CACHE{'/'};
 
    if ($entry) {
        # See if this data is too old
        if ((time - $entry->{created}) < $CACHE_AGE) {
            logmsg(1, "-- get_file_data: hit");
 
            # If its still valid, return it
            return $entry->{data};
        } else {
            logmsg(1, "-- get_file_data: miss - expired");
 
            rm_file_cache($file);
        }
    }
 
    my $cont = $mogilefs->get_file_data($file);
    my $size = length($$cont);
 
    if ($meta->{size} + $size > $CACHE_SIZE) {
        # If adding this would go beyond our max cache size, delete things until
        # we can fit it
        foreach my $f (sort {$a->{age} <=> $b->{age}} keys %FILE_CACHE) {
            next if $f eq '/';
 
            my $rm_size = rm_file_cache($f);
 
            logmsg(1, "-- get_file_data: purging - $rm_size bytes");
 
            last if $meta->{size} + $size < $CACHE_SIZE;
        }
    }
 
    logmsg(1, "-- get_file_data: added - $size bytes");
 
    # Create a new entry
    $FILE_CACHE{$file} = {created => time,
                          size    => $size,
                          data    => $cont};
    $meta->{size} += $size;
 
    return $cont;
}
 
sub get_file_info {
    my ($path) = @_;
 
    if ($path eq '/') {
        return {name         => '/',
                is_directory => 1};
    }
    else {
      foreach my $f ( $mogilefs->list_keys( $path ) ) {
        return $f if $f eq $path;
      }
    }
    return undef;
}
 
sub rm_file_cache {
    my ($file) = @_;
    my $entry = delete $FILE_CACHE{$file};
    return unless $entry;
 
    # Decrement how large our cache size is
    my $size = $entry->{size};
    $FILE_CACHE{'/'}->{size} -= $size;
 
    return $size;
}
 
__DATA__
#des no description
#type file
#compressed 0
#filename somefile.mpeg
#chunks 2
#size 90189828
#
#part 1 bytes=67108864 md5=8066369552b71fd49cfbe9ccdce74051 paths: http://10.10.0.100:7500/dev6/0/000/119/0000119968.fid, http://10.10.0.101:7500/dev2/0/000/119/0000119968.fid
#part 2 bytes=23080964 md5=af45f7ac80ca34328db3c90de1db1ab0 paths: http://10.10.0.100:7500/dev8/0/000/119/0000119969.fid, http://10.10.0.101:7500/dev2/0/000/119/0000119969.fid

Administration
Distributed Systems
Perl
Scalability

Comments (2)

Permalink