aggregate - report event counts from a stream

Another shell utility. This one is useful for, e.g. counting 404, 500, 200, 302 HTTP codes from a log file.

#!/usr/bin/perl
$|++;
use strict;
use Getopt::Long;
 
my $mode = 'line';
my $tick = 100;
my $help = undef;
my $keysfile = undef;
my %keys = ();
 
GetOptions(
  'mode|m=s' => \$mode,
  'tick|t=i' => \$tick,
  'help|h'   => \$help,
  'keys|k=f' => \$keysfile,
);
 
if ( $help || ( $mode ne 'line' && $mode ne 'time' ) || $tick <= 0 || ( defined($keysfile) && !-f $keysfile ) ) {
  my $USAGE = join '', <DATA>;
  print STDERR $USAGE and exit(1);
}
 
if ( $keysfile ) {
  open(K, $keysfile) or die "Couldn't open keys file '$keysfile': $!";
  while ( my $line = <K> ) {
    chomp $line;
    $keys{ $line }++;
  }
  close(K);
}
 
my %count = %keys;
my $offset = 0;
my $mark = 0;
my $offset = 0;
 
if ( $mode eq 'time' ) {
  $mark = time();
}
 
while ( my $element = <> ) {
  chomp $element;
  if ( scalar( %keys ) ) {
    $count{ $element }++ if $keys{ $element };
  }
  else {
    $count{ $element }++;
  }
 
  if ( $mode eq 'line' ) {
    $offset++;
    $mark++;
    if ( $mark >= $tick ) {
      $mark = 0;
      flush();
    }
  }
  elsif ( $mode eq 'time' ) {
    if ( $mark + $tick < time() ) {
      $offset = time();
      $mark = time();
      flush();
    }
  }
}
flush();
 
sub flush {
  print "summary/$tick @ $offset\n";
  foreach my $k ( sort keys %count ) {
    print "\t", $count{ $k }, "\t", $k, "\n";
  }
  %count = %keys;
}
 
__DATA__
Usage: aggregate [-h] [-m <time|line>] [-t <# of seconds or lines>] [-k <keys file>]
 
Read lines from STDIN.  Print lines by frequency per input lines or time.
 
  -h    show help (this message)
  -m    mode.  one of 'time' or 'line'.  defaults to 'line'.
  -t    aggregation size.  an integer.  value is # of lines ('line' mode) or # of
        seconds ('time' mode) after which an aggregation is triggered.  defaults to 100.
  -k    keys file.  a text file of strings to *exactly* match in the input, one per line.
        if a keys file is provided, lines not present in the keys file will be silently
        ignored.

Administration
Analytics
Perl

Comments (0)

Permalink

shuffle - randomize a stream of data

Here’s another little shell utility I’ve been sitting on for a while. This one shuffles the line-oriented data read from a pipe. It has the notion of buffering and partial flushing so we can handle streams / very large data sets.

#!/usr/bin/perl
$|++;
use strict;
use Getopt::Long;
 
my $USAGE = join '', <DATA>;
 
my $B = 0;
my $D = 1;
my $H = 0;
 
GetOptions ("buffer|b=i"   => \$B,
            "draw|d=i"     => \$D,
            "help|h"       => \$H,
           ); 
 
if ( $D == 1 && $B > 0 ) {
  $D = $B;
}
 
if (
  ($B < 0) ||
  ($D < 1) ||
  ($B > $D) ||
  ($H)
) {
  print $USAGE and exit(1);
}
 
 
my @buf = ();
 
while ( my $element = <> ) {
  #buffer whole stream
  if ( $B == 0 ) {
    push @buf, $element;
  }
  #no-op
  elsif ( $B == 1 ) {
    print $element;
  }
  #buffer window
  else {
    push @buf, $element;
    if ( scalar( @buf ) >= $D && scalar( @buf ) > $B ) {
      flush();
    }
  }
}
flush();
 
sub flush {
  for ( my $j = scalar( @buf ) - 1 ; $j >= 0 ; $j-- ) {
    my $swap = int(rand($j));
    if ( $swap != $j ) {
      ($buf[ $j ], $buf[ $swap ]) = ($buf[ $swap ], $buf[ $j ]);
    }
  }
  while ( scalar( @buf ) - 1 > $B - $D ) {
    print shift @buf;
  }
}
 
 
__DATA__
Usage: shuffle [-h] [-b <buffer size>] [-d <draw size>]
 
Shuffle lines from a stream on STDIN.  Write lines to STDOUT.
 
  -h    show help (this message)
  -b    buffer size
        (default 0.  indicates shuffle whole stream, then write)
        range: 1..
  -d    draw size
        (defaults to value of -b.  number of items to remove from the
        buffer when it fills)
        range: 1..buffer size
 
You have to parameters available (besides -h for help).
 
* buffer size (-b).  Determines how many elements to temporarily hold
before shuffling.  The advantage of this buffer is to allow shuffling on
very long streams that would not fit into system memory.  The
disadavantage is that it is not a truly random shuffle, as each input
element can appear at most buffer-size positions away from the original
position.  Buffer size defaults to zero, so make sure to set it if your
data set size is large.
 
* draw size (-d).  Determines how frequently the buffer is shuffled and
flushed.  Rather than shuffling/flushing all elements in the buffer, only
do D elements.  The advantage here is elements can appear more than
buffer-size positions away from the original position.  The disadvantage
is that shuffling is done B/D times more frequently.  Draw size defaults
to buffer size, and has no effect.  Set it to 1 to maximize randomness.
 
Copyright/License:
 
  Allen Day <allenday@ucla.edu>, licensed under GPL 2006-2008

Administration
Analytics
Perl

Comments (0)

Permalink

sample - probabilistic sampling from a stream of lines

I’m frequently monitoring webservers, cache servers, database servers, etc by tailing their log files. See my previous post on making logs easier to monitor by color.

Sometimes you also have too much data, and you don’t want to look at all of it. Use this to sample.

sample source:

#!/usr/bin/perl
$|++;
use strict;
use Getopt::Long;
 
my $USAGE = join '', <DATA>;
 
my $T = 0;
my $K = 0;
my $P = 1;
my $H = 0;
my $N = 0;
my $S = 0;
 
GetOptions ("time|t=i"     => \$T,
            "number|n=i"   => \$N,
            "count|k=i"    => \$K,
            "prob|p=f"     => \$P,
            "shuffle|s"    => \$S,
            "help|h"       => \$H,
           ); 
 
if (
  ($T > 0 && $P != 1) ||
  ($K > 0 && $P != 1) ||
  ($K < 0 || $P < 0 || $T < 0 || $N < 0 || $P > 1 ) ||
  ($T > 0 && $N > 0) ||
  ($H)
) {
  print $USAGE and exit(1);
}
 
my $position = 0;
my @buf = ();
my $before = time();
 
while ( my $element = <> ) {
  # sample full stream, report at the end
  # sample K elements every T seconds
  if ( $K > 0 ) {
    if ( scalar( @buf ) < $K ) {
      push @buf, [$position, $element];
    }
    elsif ( $K/$position < rand() ) {
      my $index = int(rand($K));
      $buf[ $index ] = [$position, $element]; #save position for sort
    }
    #time-based K-sampling
    if ( $T > 0 && time() > $before + $T ) {
      flush();
    }
    #event-based K-sampling
    elsif ( $N > 0 && $position > $N ) {
      flush();
    }
  }
  # sample with probability
  elsif ( $P < 1 && rand() < $P ) {
    print $element;
  }
  $position++;
}
flush();
 
sub flush {
  $before = time();
  #Knuth shuffle
  if ( $S ) {
    for ( my $j = scalar( @buf ) - 1 ; $j >= 0 ; $j-- ) {
      my $swap = int(rand($j));
      if ( $swap != $j ) {
        ($buf[ $j ], $buf[ $swap ]) = ($buf[ $swap ], $buf[ $j ]);
      }
      print $buf[ $j ]->[ 1 ];
    }
  }
  else {
    foreach my $b ( sort {$a->[0] <=> $b->[0]} @buf ) {
      print $b->[1];
    }
  }
  @buf = ();
  $position = 0;
}
 
 
__DATA__
Usage: sample -[[h][p][t[k[n]]]]
 
Sample lines from a stream on STDIN.  Write lines to STDOUT.
 
  -h    show help (this message)
  -k    sample K elements from stream
        (default 0)
        range: 0..
  -p    sample elements from stream with probability
        (default 1)
        range: 0 <= p <= 1
  -n    sample over windows of N elements
        (default 0)
        range: 0..
  -t    sample over windows of T seconds
        (default 0, instantaneous with -p, infinity with -k)
        range: 0..
  -s    shuffle outputs
        (default false)
 
There are two modes of sampling:
 
  * sample with probability (-p)
  * sample a fixed number of elements (-k)
 
Both modes sample over a given time interval in seconds (-t).
-t defaults to zero (process full stream).  -p can only be
used alone.  -n can only be used with -k
 
Examples:
 
  * sample K elements from a stream:
    cat /etc/passwd | sample -k 5
 
  * sample 1% of elements from a stream:
    tail -f /var/logs/httpd/access_log | sample -p 0.01
 
  * sample K elements from a stream every 30 seconds:
    tail -f /var/logs/httpd/access_log | sample -k 5 -t 30
 
  * sample K elements from a stream every 30 seconds, shuffled:
    tail -f /var/logs/httpd/access_log | sample -k 5 -t 30 -s
 
  * sample K elements from a stream every 100 elements:
    tail -f /var/logs/httpd/access_log | sample -k 5 -n 100
 
Copyright/License:
 
  Allen Day <allenday@ucla.edu>, licensed under GPL 2006-2008

Administration
Analytics
Perl

Comments (0)

Permalink

Hiromi’s Sonic Bloom @ the Jazz Bakery

I saw Hiromi Uehara’s Sonicbloom at the Jazz Bakery Saturday. Wow! I’m now a fan.

Check out this bootleg of Hiromi’s covering Gershwin on YT. She played this one last night.

Fun
Life

Comments (0)

Permalink

Some Kickass Wordpress Plugins

I was looking for a plugin to count unique views per post and found Lester Chan’s bunch of Wordpress plugins. I’ll be adding some of these shortly.

One of the most viewed posts on this blog is one of my first: A review of Costco stainless steel cookware. Would you have guessed that? I wonder if it’s highly rated… we’ll find out.

I’m still surprised it’s my #1 post. Perhaps it’s more evidence that I should be posting more non-technical stuff, and some kind of argument against making a super-niche blog.

Life
PHP
Random musings

Comments (0)

Permalink

Mahout ♥ HBase

Well, more accurately I’m using and liking both Mahout and HBase in my work at BiggerBoat, but this is a more fun title and I like using the more obscure HTML character entities.

Anyway, I posted an adapter class to Apache JIRA on Friday for Mahout / HBase integration that allows HTables to effectively be manipulated as sparse vectors (read through RowResult, write through BatchUpdate) in Mahout using the Vector interface. Check it out, MAHOUT-78.

Looks like Mahout needs to generate some HTML for their javadoc.

HBase
Mahout

Comments (0)

Permalink

Life beyond tech

IMG_0413

I noticed my blog has been really technically heavy lately. This probably has a lot to do with the fact that I took a position as a “Computer Scientist” at BiggerBoat recently, and I’m on a very steep learning curve of new technologies (Hadoop, HBase, Nutch, Mahout to name a few).

I’m beginning to see this blog as really being about documenting/recording what I’ve been doing so I can remember later. Who knows, maybe even some friends/family are reading! Technical HOWTOs and pasted scripts fit the blog manifest nicely, but so do some other things like:

Check out some of Ling’s and Sing’s recent photos on Flickr. I haven’t been running with them too much (working), but they’ve been having a good time.

Some photos from the Getty Villa in Malibu:
Ling LeeSing LeeChih Fen Lee

Family
Fun
Life

Comments (0)

Permalink

Thoughts on Hadoop JobTracker/TaskTracker Scheduling

Had a brief, interesting conversation on freenode #hadoop today with Rapleaf Engineer Nathan Marz today about scheduling in Hadoop.

Pretty much supports my sense that scheduling is not Hadoop’s strong suit. It’s really pretty shitty. Would be great to see some more cross-pollination between the Beowulf (SGE, PBS, Globus) and MapReduce (Hadoop, HBase) communities. The former have more mature scheduling, resource management and permissions models. They don’t really do a good job thought with providing a framework for distributed, parallel computing at the application level though — everything is roll-your-own. Perhaps Hadoop could be integrated as a parallel environment to consume resources from a SGE master [1, 2] rather than managing its own mapper/reducer pools.

A less ambitious scheduler improvement is to modify the way the Hadoop scheduler allocates map/reduce resources. The main itch I’m trying to scratch right now has to do with the coupling of map/reduce allocation. There are some cases where it seems this shouldn’t be done. Read the dialog with Nathan below if you care to know more.

allenday is it possible to decouple mapper and reducer slot allocation for jobs?
allenday i mean, if a job is #1 in the MR queue, but it is not yet ready to reduce, can it be prevented from consuming reducer slots?
|<– Smokinn has left irc.freenode.net ()
|<– savage- has left irc.freenode.net (Read error: 110 (Connection timed out))
–>| overlast (n=overlast@19.181.210.220.dy.bbexcite.jp) has joined #hadoop
|<– overlast has left irc.freenode.net (”Leaving…”)
nathanmarz allenday: i think that would be hard… reducing starts while the mapping is happening (copy stage)
allenday nathanmarz, i frequently find that while the reduce has “started”, it can just sit there for a long time doing nothing
allenday this is most common with nutch
allenday so there could be a bunch of other jobs further back in the queue that get starved for reduces b/c the head of the queue is squatting on the slots
nathanmarz it just sits there in the reduce phase?
allenday for sure nutch does, yeah
allenday during fetch, when it crawling
|<– cutting has left irc.freenode.net (”Leaving.”)
nathanmarz i see
nathanmarz i don’t have that much familiarity with nutch
nathanmarz is it possible to increase the number of reducers?
allenday yep, but then you can get into i/o trouble later
nathanmarz for the job i mean, not the cluster
allenday oh
allenday it sounds like you propose having these squatters consume minimal # of reducers (e.g. only 1)
nathanmarz actually, the opposite
nathanmarz let’s say you have 16 reduce slots
nathanmarz and the job i set to use 16 reducers
nathanmarz each one of those reducers potentially has to go over a lot of data
nathanmarz if the job is instead set to use a lot more reducers, like 100 or something
nathanmarz than an individual reducer will go a lot faster
nathanmarz and potentially, those freed reduce slots will go to jobs with higher priority
allenday ok, so you introduce priority to bump the further back ahead in the queue
nathanmarz yea
allenday is that settable in jobconf?
nathanmarz you can set num reducers
–>| tobias_au (n=opera@CPE-121-50-201-65.dsl.OntheNet.net) has joined #hadoop
allenday so let’s suppose the job that squats on reduce slots gets to the head of the queue. regardless of if it has 16 or 100 reducers configured
nathanmarz JobConf#setNumReduceTasks
allenday and that it it still in map phase only. has not begun reducing yet
allenday until one of those reduces finishes (i.e. the map has finished) all slots are still filled
allenday it’s only when the first reduce finishes that the job at #2 can take over a reduce slot
nathanmarz right
nathanmarz yea that’s true
allenday that’s bad
nathanmarz this scheme doesn’t help until mappers finished
allenday you really want this #1 job
allenday when it is allocating reducers
allenday to have low priority in acquiring the slots
nathanmarz right
nathanmarz well you don’t want it to acquire any slots until mappers finish
allenday so you give reduce slots to #2, #3, #4, etc. until everyone who wants slots has them. then you assign to #1
allenday or until #1 is ready…
allenday is it just me or does the queueing system in hadoop kind of suck?
allenday i am coming here from sun grid which puts a lot of emphasis on this aspect
nathanmarz well, the priority system will work if you start job #1 after the other jobs
nathanmarz if you start the other jobs after #1 then they will get starved of reducers
allenday heh, but the whole reason it is in #1 is because it was submitted first, right? isn’t hadoop FIFO wrt jobs?
nathanmarz if they’re the same priority
nathanmarz so maybe decreasing the reducers job #1 uses is the way to go
nathanmarz set it so it doesn’t use all the reduce slots on the cluster
allenday i need to do some research to see if there are jira open for improving the scheduler. or if there are some commercial plugins to improve the scheduling
nathanmarz definitely room for improvement, agreed
allenday yeah, that was what i thought you meant initially. it’s a hack too though, and breaks down when the number of jobs gets large
allenday i’m surprised they are coupled. do you understand how it works when the mapper hands off to the reducer?
allenday b/c i don’t and i need to
nathanmarz yes
allenday can i get the 2min version?
nathanmarz the reason the reducers start while the mappers are running is because there’s some work they can do without all the map data
nathanmarz each reducer needs to copy the relevant outputs from all the mappers to its machine
nathanmarz this is called the “copy” phase and can occur in parallel with mapping
allenday ok, i’ve seen that
allenday so what we need is a flag taht indicates there will be no data to copy until maps all finish
nathanmarz yea
nathanmarz a flag that says not to pipeline the process
allenday default behavior is to have the flag off and copy greedily
allenday which is like it does now
allenday turn the flag on says to wait until upstream map finishes before grabbing a reduce slot and kicking off the copy
allenday **all upstream maps
nathanmarz http://hadoop.apache.org/core/docs/current/hadoop-default.html
nathanmarz those are all the hadoop config parameters
nathanmarz you might be able to find something in there
allenday yeah, i fiind goodies in there every time i read that page :)
allenday i am only ~1mo into hadoop
allenday here’s another scheduling related question/issue i’m having
allenday i find that job i/o and cpu usage tend to synchronize after a while
allenday b/c if there is a slow moving job in the queue, all the others tend to get jammed behind it
allenday have you seen this?
nathanmarz no, i haven’t
nathanmarz but that’s interesting
allenday it comes back to resource (mis)allocation by the scheduler
nathanmarz how are you measuring that?
allenday it’s this same issue where jobs will consume all the slots
allenday so if you have a slow moving thing blocking all the resources
allenday no one else can get past
allenday then when the slow moving job finishes, the others all start getting processed very quickly (high cpu load during map), then as they begin to finish there is a flurry of i/o
allenday it’s like congestion on the freeway where one car slams on the breaks it sends this wave of traffic jam behind it
allenday assuming the freeway is already close to capacity (not sparse)

Distributed Systems
Hadoop
Java
Random musings
SGE

Comments (0)

Permalink

Cascading for Hadoop

Need to check this out as an alternative to hadoop streaming

Hadoop
Java

Comments (0)

Permalink

Ajax “busy, please wait” activity indicator images

Remember how awesome Hampster Dance (sp) was back in the pre-YouTube, pre-Star Wars Kid, pre dot-bomb late 90s? Well I have found an up-to-date, even geekier version!

Check out the AJAX Activity Indicators.


Personally, I think we should all just use the hamsters instead of spinning wheels.

[via coliss.com]

Fun
Random musings

Comments (0)

Permalink