#!/usr/bin/perl
use strict;
use warnings;
use threads;
use threads::shared;
use Fuse;
use LWP::UserAgent;
use LWP::Simple;
use List::MoreUtils qw(uniq);
use MogileFS::Client;
use Path::Class;
use POSIX qw(ENOENT EISDIR EINVAL);
my $DEBUG = 0;
our (%FILE_CACHE, $CACHE_SIZE, $CACHE_AGE) = (('/' => {size => 0, age => 0}));
# create client object w/ server-configured namespace and IPs of trackers
my $ua = LWP::UserAgent->new;
my $class = 'default';
my $mogilefs = MogileFS::Client->new(
domain => 'ifap',
hosts => [ '10.10.0.100:6001', '10.10.0.101:6001' ],
);
my ($mountpoint) = "";
$mountpoint = shift(@ARGV) if @ARGV;
Fuse::main(
debug => $DEBUG,
mountpoint => $mountpoint,
threaded => 1,
getattr => "main::e_getattr",
getdir => "main::e_getdir",
mknod => "main::e_mknod",
open => "main::e_open",
read => "main::e_read",
rename => "main::e_rename",
statfs => "main::e_statfs",
unlink => "main::e_unlink",
write => "main::e_write",
);
sub e_getattr {
my $filename = shift;
$filename =~ s#^.*/##;
warn "main::e_getattr $filename\n" if $DEBUG;
my ( $size, $modes );
my ( $dev, $ino, $rdev, $blocks, $gid, $uid, $nlink, $blksize )
= ( 0, 0, 0, 1, 0, 0, 1, 1024 );
my ( $atime, $ctime, $mtime ) = ( time, time, time );
if ( $filename !~ m{\.} ) {
# warn "directory!";
$size = 0;
$modes = ( 0040 << 9 ) + 0755;
} else {
# warn "file!";
$size = 123;
$modes = ( 0100 << 9 ) + 0644;
my @paths = $mogilefs->get_paths( $filename, { noverify => 1 } );
my ( $content_type, $document_length, $modified_time, $expires, $server );
if ( scalar( @paths ) ) {
( $content_type, $document_length, $modified_time, $expires, $server ) = head( $paths[0] );
$size = $document_length;
( $atime, $ctime, $mtime ) = ($modified_time) x 3;
}
else {
@paths = $mogilefs->get_paths( '_big_info:' . $filename, { noverify => 1 } );
if ( scalar( @paths ) ) {
my $data = $mogilefs->get_file_data( '_big_info:' . $filename );
my ( $_des, $_type, $_compress, $_filename, $_chunks, $_size, undef, @_parts ) = split /\n/, $$data;
( $size ) = $_size =~ m#^size (\d+)$#;
OUTER: foreach my $_part ( @_parts ) {
$_part =~ m#paths: (.+?)$#;
my @_paths = split ', ', $_part;
foreach my $_path ( @_paths ) {
( $content_type, $document_length, $modified_time, $expires, $server ) = head( $paths[0] );
( $atime, $ctime, $mtime ) = ($modified_time) x 3;
last OUTER if $server;
}
}
}
}
return -ENOENT() unless @paths;
}
warn(
join(
",",
( $dev, $ino, $modes, $nlink, $uid,
$gid, $rdev, $size, $atime, $mtime,
$ctime, $blksize, $blocks
)
),
"\n"
) if $DEBUG;
return (
$dev, $ino, $modes, $nlink, $uid, $gid, $rdev,
$size, $atime, $mtime, $ctime, $blksize, $blocks
);
}
sub e_getdir {
my $prefix = shift;
warn "main::e_getdir $prefix\n" if $DEBUG;
my @filenames;
$mogilefs->foreach_key(
# prefix => $prefix,
sub {
my $filename = shift;
push @filenames, $filename;
push @filenames, file( $filename )->parent();
}
);
@filenames = uniq @filenames;
warn "returning: @filenames\n" if $DEBUG;
return ( @filenames, 0 );
}
sub e_mknod {
my $filename = shift;
$filename =~ s#.*/##;
warn "main::e_mknod $filename\n" if $DEBUG;
my $fh = $mogilefs->new_file($filename, undef);
if ( $fh ) {
print $fh "\n";
unless ($fh->close) {
my ($code, $str) = ($mogilefs->errcode || -1, $mogilefs->errstr || '');
warn "Error creating file:$code: $str" if $DEBUG;
$! = $str;
$? = $code;
return -1;
}
return 0;
}
else {
return -1;
}
}
sub e_open {
my $filename = shift;
$filename =~ s#.*/##;
warn "main::e_open $filename\n" if $DEBUG;
return -EISDIR() unless $filename =~ m{\.};
my @paths = $mogilefs->get_paths( $filename, { noverify => 1 } );
if ( ! scalar( @paths ) ) {
@paths = $mogilefs->get_paths( '_big_info:' . $filename, { noverify => 1 } );
}
return -ENOENT() unless @paths;
return 0;
}
sub e_read {
my ( $filename, $length, $offset ) = @_;
$filename =~ s#.*/##;
warn "main::e_read $filename $length $offset\n" if $DEBUG;
return -EISDIR() if $filename =~ m{/$};
my $maxoff = $offset + ( $length - 1 );
warn "requested offset=$offset length=$length bytes=$maxoff" if $DEBUG;
my @paths = $mogilefs->get_paths( $filename, { noverify => 1 } );
my $size;
my ( $content_type, $document_length, $modified_time, $expires, $server );
if ( scalar( @paths ) ) {
warn "it's a small file" if $DEBUG;
( $content_type, $document_length, $modified_time, $expires, $server ) = head( $paths[0] );
$size = $document_length;
return 0 if $offset == $document_length;
$maxoff = $document_length if $maxoff > $document_length;
my $range = $offset . "-" . $maxoff;
warn " Range: bytes=$range\n" if $DEBUG;
my $response = $ua->get( $paths[0], "Range" => "bytes=$range" );
if ( $response->is_success ) {
return $response->content;
} else {
warn $response->as_string if $DEBUG;
}
}
else {
@paths = $mogilefs->get_paths( '_big_info:' . $filename, { noverify => 1 } );
return -ENOENT() unless @paths;
warn "it's a big file" if $DEBUG;
my $data = $mogilefs->get_file_data( '_big_info:' . $filename );
my ( $_des, $_type, $_compress, $_filename, $_chunks, $_size, undef, @_parts ) = split /\n/, $$data;
( $size ) = $_size =~ m#^size (\d+)$#;
return 0 if $offset == $size;
$maxoff = $size if $maxoff > $size;
my $part_min = -1;
my $part_max = -1;
#part 2 bytes=23080964 md5=af45f7ac80ca34328db3c90de1db1ab0 paths: http://10.10.0.100:7500/dev8/0/000/119/0000119969.fid, http://10.10.0.101:7500/dev2/0/000/119/0000119969.fid
my $buf = '';
foreach my $_part ( @_parts ) {
my ( $_bytes, $_paths ) = $_part =~ m#bytes=(\d+) .+? paths: (.+?)$#;
$part_min = $part_max > 0 ? $part_max + 1 : 0;
$part_max += $_bytes;
warn "examining part $part_min -> $part_max" if $DEBUG;
#chunk too early
next if ( $part_max < $offset );
#chunk too late;
next if ( $part_min > $maxoff );
warn "using part $part_min -> $part_max" if $DEBUG;
my @_paths = split ', ', $_paths;
foreach my $_path ( @_paths ) {
warn "offset=$offset part_min=$part_min maxoff=$maxoff part_max=$part_max" if $DEBUG;
my $range = ($offset-$part_min) . "-" . ($maxoff-$part_min);
warn "getting Range: bytes=$range" if $DEBUG;
my $response = $ua->get( $_path, "Range" => "bytes=$range" );
if ( $response->is_success() ) {
$buf .= $response->content();
warn "data length=".length($buf) if $DEBUG;
if ( length( $buf ) == $length ) {
warn "got all the data (1)!" if $DEBUG;
return $buf;
}
elsif ( $offset + length( $buf ) == $maxoff ) {
warn "got all the data (2)!" if $DEBUG;
return $buf;
}
$offset += length( $buf );
last;
}
}
}
if ( length( $buf ) != $length ) {
return -ENOENT();
}
else {
return $buf;
}
}
}
sub e_rename {
my ( $old, $new ) = @_;
$old =~ s#.*/##;
$new =~ s#.*/##;
warn "main::e_rename: $old -> $new" if $DEBUG;
# Rename this file
$mogilefs->rename($old, $new);
return 0;
}
sub e_statfs { return 255, 1, 1, 1, 1, 2 }
sub e_unlink {
my $filename = shift;
$filename =~ s#.*/##;
warn "main::e_unlink: $filename" if $DEBUG;
$mogilefs->delete($filename);
return 0;
}
sub e_write {
my ( $filename, $buf, $offset ) = @_;
$filename =~ s#.*/##;
warn("main::e_write: $filename pos=$offset len=".length($buf)) if $DEBUG;
my $finfo = get_file_info($filename);
return -ENOENT() unless $finfo;
my $cont = get_file_data($filename);
substr($$cont, $offset, length($buf), $buf);
$mogilefs->store_content($filename, undef, $cont);
rm_file_cache($filename);
return length($buf);
}
#################################################
sub logmsg { warn(join "\t",@_) if $DEBUG }
sub get_file_data {
my ($file) = @_;
my $entry = $FILE_CACHE{$file};
my $meta = $FILE_CACHE{'/'};
if ($entry) {
# See if this data is too old
if ((time - $entry->{created}) < $CACHE_AGE) {
logmsg(1, "-- get_file_data: hit");
# If its still valid, return it
return $entry->{data};
} else {
logmsg(1, "-- get_file_data: miss - expired");
rm_file_cache($file);
}
}
my $cont = $mogilefs->get_file_data($file);
my $size = length($$cont);
if ($meta->{size} + $size > $CACHE_SIZE) {
# If adding this would go beyond our max cache size, delete things until
# we can fit it
foreach my $f (sort {$a->{age} <=> $b->{age}} keys %FILE_CACHE) {
next if $f eq '/';
my $rm_size = rm_file_cache($f);
logmsg(1, "-- get_file_data: purging - $rm_size bytes");
last if $meta->{size} + $size < $CACHE_SIZE;
}
}
logmsg(1, "-- get_file_data: added - $size bytes");
# Create a new entry
$FILE_CACHE{$file} = {created => time,
size => $size,
data => $cont};
$meta->{size} += $size;
return $cont;
}
sub get_file_info {
my ($path) = @_;
if ($path eq '/') {
return {name => '/',
is_directory => 1};
}
else {
foreach my $f ( $mogilefs->list_keys( $path ) ) {
return $f if $f eq $path;
}
}
return undef;
}
sub rm_file_cache {
my ($file) = @_;
my $entry = delete $FILE_CACHE{$file};
return unless $entry;
# Decrement how large our cache size is
my $size = $entry->{size};
$FILE_CACHE{'/'}->{size} -= $size;
return $size;
}
__DATA__
#des no description
#type file
#compressed 0
#filename somefile.mpeg
#chunks 2
#size 90189828
#
#part 1 bytes=67108864 md5=8066369552b71fd49cfbe9ccdce74051 paths: http://10.10.0.100:7500/dev6/0/000/119/0000119968.fid, http://10.10.0.101:7500/dev2/0/000/119/0000119968.fid
#part 2 bytes=23080964 md5=af45f7ac80ca34328db3c90de1db1ab0 paths: http://10.10.0.100:7500/dev8/0/000/119/0000119969.fid, http://10.10.0.101:7500/dev2/0/000/119/0000119969.fid