Files
perl-anyevent-nsq/lib/AnyEvent/NSQ/Reader.pm
Pedro Melo 1c5a3526fd Tidy
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:46:29 +00:00

114 lines
2.5 KiB
Perl

package AnyEvent::NSQ::Reader;
# ABSTRACT: a NSQ.io asynchronous consumer
# VERSION
# AUTHORITY
use strict;
use warnings;
use Carp 'croak';
use Scalar::Util qw( weaken );
use parent 'AnyEvent::NSQ::Client';
sub new {
my $class = shift;
my $self = $class->SUPER::new(@_);
# To keep registry of the connection bound to each message
$self->{routing} = {};
return $self;
}
#### Parameter parsing
## Parse all Reader specific arguments to new()
sub _parse_args {
my ($self, $args) = @_;
$self->{topic} = delete $args->{topic} or croak q{FATAL: required 'topic' parameter is missing};
$self->{channel} = delete $args->{channel} or croak q{FATAL: required 'channel' parameter is missing};
$self->{message_cb} = delete $args->{message_cb} or croak q{FATAL: required 'message_cb' parameter is missing};
for my $arg (qw( ready_count requeue_delay )) {
$self->{$arg} = delete($args->{$arg}) if exists $args->{$arg};
}
return $self->SUPER::_parse_args($args);
}
### Connection setup and teardown
## called after a successful reply to IDENTIFY, we are ready to rock & roll
sub _identified {
my ($self, $conn, $info) = @_;
$self->SUPER::_identified(@_);
$conn->subscribe(
$self->{topic},
$self->{channel},
## our message handler...
sub {
## FIXME: bless it with the future AnyEvent::NSQ::Message
my $msg = $_[1];
## Keep the connection in the registry in case the user
## wants to issue FIN/REQs inside the callback
$self->{routing}->{ $msg->{message_id} } = $conn;
weaken($self->{routing}->{ $msg->{message_id} });
$self->{message_cb}->($self, $msg);
}
);
$conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10));
}
sub mark_as_done_msg {
my ($self, $msg) = @_;
my $conn = $self->_find_message_connection($msg, {delete => 1});
$conn->mark_as_done_msg($msg);
return 1;
}
sub requeue_msg {
my ($self, $msg, $delay) = @_;
my $conn = $self->_find_message_connection($msg, {delete => 1});
$conn->requeue_msg($msg, $delay);
return 1;
}
sub touch_msg {
my ($self, $msg) = @_;
my $conn = $self->_find_message_connection($msg);
$conn->touch_msg($msg);
return 1;
}
*touch_message = \&touch_msg;
sub _find_message_connection {
my ($self, $msg, $opts) = @_;
$opts = {} unless ref $opts;
my $id = ref($msg) ? $msg->{message_id} : $msg;
my $conn = $self->{routing}{$id};
delete($self->{routing}{$id}) if $opts->{delete};
croak "WARN: Could not find the connection to route msg $id" unless $conn;
return $conn;
}
1;