Merge pull request #4 from tavaresb/master
Interface to issue message commands
This commit is contained in:
@@ -18,9 +18,6 @@ usage("topic and channel are required parameters") unless $topic and $channel;
|
|||||||
|
|
||||||
my $cv = AE::cv;
|
my $cv = AE::cv;
|
||||||
|
|
||||||
## return undef => mark_as_done_msg()
|
|
||||||
my $message_cb = $print ? sub { print "$_[1]{message}\n"; return } : sub {return};
|
|
||||||
|
|
||||||
my $t = my $p = 0;
|
my $t = my $p = 0;
|
||||||
my $r = AnyEvent::NSQ::Reader->new(
|
my $r = AnyEvent::NSQ::Reader->new(
|
||||||
topic => $topic,
|
topic => $topic,
|
||||||
@@ -28,7 +25,7 @@ my $r = AnyEvent::NSQ::Reader->new(
|
|||||||
nsqd_tcp_addresses => '127.0.0.1',
|
nsqd_tcp_addresses => '127.0.0.1',
|
||||||
client_id => "${channel}_consumer/pid_$$",
|
client_id => "${channel}_consumer/pid_$$",
|
||||||
|
|
||||||
message_cb => sub { $t++; $p++; $message_cb->(@_) },
|
message_cb => \&message_handler,
|
||||||
|
|
||||||
error_cb => sub { warn "$_[1]\n" if $verbose },
|
error_cb => sub { warn "$_[1]\n" if $verbose },
|
||||||
disconnect_cb => sub { warn "Disconnected after $t total messages... exiting...\n" if $verbose; $cv->send },
|
disconnect_cb => sub { warn "Disconnected after $t total messages... exiting...\n" if $verbose; $cv->send },
|
||||||
@@ -85,3 +82,13 @@ Usage: consumer.pl [--help|-h] [--print|-p] topic channel
|
|||||||
|
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub message_handler {
|
||||||
|
my ($reader, $message) = @_;
|
||||||
|
|
||||||
|
if ($print) {
|
||||||
|
print $message->{message}."\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
$reader->mark_as_done_msg($message);
|
||||||
|
}
|
||||||
|
|||||||
@@ -167,7 +167,9 @@ sub mark_as_done_msg {
|
|||||||
my ($self, $msg) = @_;
|
my ($self, $msg) = @_;
|
||||||
return unless my $hdl = $self->{handle};
|
return unless my $hdl = $self->{handle};
|
||||||
|
|
||||||
$hdl->push_write("FIN $msg->{message_id}\012");
|
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||||
|
|
||||||
|
$hdl->push_write("FIN $id\012");
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -176,10 +178,13 @@ sub requeue_msg {
|
|||||||
my ($self, $msg, $delay) = @_;
|
my ($self, $msg, $delay) = @_;
|
||||||
return unless my $hdl = $self->{handle};
|
return unless my $hdl = $self->{handle};
|
||||||
|
|
||||||
$delay = 0 unless defined $delay;
|
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||||
$delay = $msg->{attempts} * $self->{requeue_delay} if $delay < 0;
|
my $attempts = ref($msg) ? $msg->{attempts} : 1;
|
||||||
|
|
||||||
$hdl->push_write("REQ $msg->{message_id} $delay\012");
|
$delay = 0 unless defined $delay;
|
||||||
|
$delay = $attempts * $self->{requeue_delay} if $delay < 0;
|
||||||
|
|
||||||
|
$hdl->push_write("REQ $id $delay\012");
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -188,7 +193,8 @@ sub touch_msg {
|
|||||||
my ($self, $msg) = @_;
|
my ($self, $msg) = @_;
|
||||||
return unless my $hdl = $self->{handle};
|
return unless my $hdl = $self->{handle};
|
||||||
|
|
||||||
$hdl->push_write("TOUCH $msg->{message_id}\012");
|
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||||
|
$hdl->push_write("TOUCH $id\012");
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,9 +7,19 @@ package AnyEvent::NSQ::Reader;
|
|||||||
use strict;
|
use strict;
|
||||||
use warnings;
|
use warnings;
|
||||||
use Carp 'croak';
|
use Carp 'croak';
|
||||||
|
use Scalar::Util qw( weaken );
|
||||||
|
|
||||||
use parent 'AnyEvent::NSQ::Client';
|
use parent 'AnyEvent::NSQ::Client';
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my $class = shift;
|
||||||
|
my $self = $class->SUPER::new(@_);
|
||||||
|
|
||||||
|
# To keep registry of the connection bound to each message
|
||||||
|
$self->{routing} = {};
|
||||||
|
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
#### Parameter parsing
|
#### Parameter parsing
|
||||||
|
|
||||||
@@ -46,15 +56,57 @@ sub _identified {
|
|||||||
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
||||||
my $msg = $_[1];
|
my $msg = $_[1];
|
||||||
|
|
||||||
my $action = $self->{message_cb}->($self, $msg);
|
## Keep the connection in the registry in case the user
|
||||||
|
## wants to issue FIN/REQs inside the callback
|
||||||
|
$self->{routing}->{$msg->{message_id}} = $conn;
|
||||||
|
weaken( $self->{routing}->{$msg->{message_id}} );
|
||||||
|
|
||||||
## Action below -1 does nothing, we assume the user took care of it himself
|
$self->{message_cb}->($self, $msg);
|
||||||
if (not defined $action) { $conn->mark_as_done_msg($_[1]) }
|
|
||||||
elsif ($action >= -1) { $conn->requeue_msg($_[1], $action) }
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
$conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10));
|
$conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub mark_as_done_msg {
|
||||||
|
my ($self, $msg) = @_;
|
||||||
|
|
||||||
|
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||||
|
|
||||||
|
$conn->mark_as_done_msg($msg);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub requeue_msg {
|
||||||
|
my ($self, $msg, $delay) = @_;
|
||||||
|
|
||||||
|
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||||
|
|
||||||
|
$conn->requeue_msg($msg, $delay);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub touch_message {
|
||||||
|
my ($self, $msg) = @_;
|
||||||
|
|
||||||
|
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||||
|
|
||||||
|
$conn->touch_msg($msg);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _find_and_delete_message_connection {
|
||||||
|
my ($self, $msg) = @_;
|
||||||
|
|
||||||
|
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||||
|
|
||||||
|
my $conn = delete($self->{routing}->{$id});
|
||||||
|
|
||||||
|
if ( !$conn ) {
|
||||||
|
croak "WARN: Could not find the connection to route msg $id";
|
||||||
|
}
|
||||||
|
|
||||||
|
return $conn;
|
||||||
|
}
|
||||||
|
|
||||||
1;
|
1;
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ subtest 'basic connection' => sub {
|
|||||||
|
|
||||||
message_cb => sub {
|
message_cb => sub {
|
||||||
print STDERR "!!!! GOT MESSAGE '$_[1]{message}\n";
|
print STDERR "!!!! GOT MESSAGE '$_[1]{message}\n";
|
||||||
|
$_[0]->mark_as_done_msg($_[1]);
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|||||||
Reference in New Issue
Block a user