diff --git a/examples/consumer.pl b/examples/consumer.pl index be4751e..51539dd 100755 --- a/examples/consumer.pl +++ b/examples/consumer.pl @@ -18,9 +18,6 @@ usage("topic and channel are required parameters") unless $topic and $channel; my $cv = AE::cv; -## return undef => mark_as_done_msg() -my $message_cb = $print ? sub { print "$_[1]{message}\n"; return } : sub {return}; - my $t = my $p = 0; my $r = AnyEvent::NSQ::Reader->new( topic => $topic, @@ -28,7 +25,7 @@ my $r = AnyEvent::NSQ::Reader->new( nsqd_tcp_addresses => '127.0.0.1', client_id => "${channel}_consumer/pid_$$", - message_cb => sub { $t++; $p++; $message_cb->(@_) }, + message_cb => \&message_handler, error_cb => sub { warn "$_[1]\n" if $verbose }, disconnect_cb => sub { warn "Disconnected after $t total messages... exiting...\n" if $verbose; $cv->send }, @@ -85,3 +82,13 @@ Usage: consumer.pl [--help|-h] [--print|-p] topic channel exit(1); } + +sub message_handler { + my ($reader, $message) = @_; + + if ($print) { + print $message->{message}."\n"; + } + + $reader->mark_as_done_msg($message); +} diff --git a/lib/AnyEvent/NSQ/Connection.pm b/lib/AnyEvent/NSQ/Connection.pm index 1f00362..a80e9ad 100644 --- a/lib/AnyEvent/NSQ/Connection.pm +++ b/lib/AnyEvent/NSQ/Connection.pm @@ -167,7 +167,9 @@ sub mark_as_done_msg { my ($self, $msg) = @_; return unless my $hdl = $self->{handle}; - $hdl->push_write("FIN $msg->{message_id}\012"); + my $id = ref($msg) ? $msg->{message_id} : $msg; + + $hdl->push_write("FIN $id\012"); return; } @@ -176,10 +178,13 @@ sub requeue_msg { my ($self, $msg, $delay) = @_; return unless my $hdl = $self->{handle}; - $delay = 0 unless defined $delay; - $delay = $msg->{attempts} * $self->{requeue_delay} if $delay < 0; + my $id = ref($msg) ? $msg->{message_id} : $msg; + my $attempts = ref($msg) ? $msg->{attempts} : 1; - $hdl->push_write("REQ $msg->{message_id} $delay\012"); + $delay = 0 unless defined $delay; + $delay = $attempts * $self->{requeue_delay} if $delay < 0; + + $hdl->push_write("REQ $id $delay\012"); return; } @@ -188,7 +193,8 @@ sub touch_msg { my ($self, $msg) = @_; return unless my $hdl = $self->{handle}; - $hdl->push_write("TOUCH $msg->{message_id}\012"); + my $id = ref($msg) ? $msg->{message_id} : $msg; + $hdl->push_write("TOUCH $id\012"); return; } diff --git a/lib/AnyEvent/NSQ/Reader.pm b/lib/AnyEvent/NSQ/Reader.pm index 9076e91..4e0dec1 100644 --- a/lib/AnyEvent/NSQ/Reader.pm +++ b/lib/AnyEvent/NSQ/Reader.pm @@ -7,9 +7,19 @@ package AnyEvent::NSQ::Reader; use strict; use warnings; use Carp 'croak'; +use Scalar::Util qw( weaken ); use parent 'AnyEvent::NSQ::Client'; +sub new { + my $class = shift; + my $self = $class->SUPER::new(@_); + + # To keep registry of the connection bound to each message + $self->{routing} = {}; + + return $self; +} #### Parameter parsing @@ -46,15 +56,57 @@ sub _identified { ## FIXME: bless it with the future AnyEvent::NSQ::Message my $msg = $_[1]; - my $action = $self->{message_cb}->($self, $msg); + ## Keep the connection in the registry in case the user + ## wants to issue FIN/REQs inside the callback + $self->{routing}->{$msg->{message_id}} = $conn; + weaken( $self->{routing}->{$msg->{message_id}} ); - ## Action below -1 does nothing, we assume the user took care of it himself - if (not defined $action) { $conn->mark_as_done_msg($_[1]) } - elsif ($action >= -1) { $conn->requeue_msg($_[1], $action) } + $self->{message_cb}->($self, $msg); } ); $conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10)); } +sub mark_as_done_msg { + my ($self, $msg) = @_; + + my $conn = $self->_find_and_delete_message_connection($msg); + + $conn->mark_as_done_msg($msg); + return 1; +} + +sub requeue_msg { + my ($self, $msg, $delay) = @_; + + my $conn = $self->_find_and_delete_message_connection($msg); + + $conn->requeue_msg($msg, $delay); + return 1; +} + +sub touch_message { + my ($self, $msg) = @_; + + my $conn = $self->_find_and_delete_message_connection($msg); + + $conn->touch_msg($msg); + return 1; +} + +sub _find_and_delete_message_connection { + my ($self, $msg) = @_; + + my $id = ref($msg) ? $msg->{message_id} : $msg; + + my $conn = delete($self->{routing}->{$id}); + + if ( !$conn ) { + croak "WARN: Could not find the connection to route msg $id"; + } + + return $conn; +} + 1; diff --git a/t/10-reader.t b/t/10-reader.t index 1545b29..b686640 100755 --- a/t/10-reader.t +++ b/t/10-reader.t @@ -17,6 +17,7 @@ subtest 'basic connection' => sub { message_cb => sub { print STDERR "!!!! GOT MESSAGE '$_[1]{message}\n"; + $_[0]->mark_as_done_msg($_[1]); return; }, );