122 lines
2.8 KiB
Perl
122 lines
2.8 KiB
Perl
package AnyEvent::NSQ::Reader;
|
|
|
|
# ABSTRACT: a NSQ.io asynchronous consumer
|
|
# VERSION
|
|
# AUTHORITY
|
|
|
|
use strict;
|
|
use warnings;
|
|
use Carp 'croak';
|
|
use Scalar::Util qw( weaken );
|
|
|
|
use parent 'AnyEvent::NSQ::Client';
|
|
|
|
sub new {
|
|
my $class = shift;
|
|
my $self = $class->SUPER::new(@_);
|
|
|
|
# To keep registry of the connection bound to each message
|
|
$self->{routing} = {};
|
|
|
|
return $self;
|
|
}
|
|
|
|
#### Parameter parsing
|
|
|
|
## Parse all Reader specific arguments to new()
|
|
sub _parse_args {
|
|
my ($self, $args) = @_;
|
|
|
|
$self->{topic} = delete $args->{topic} or croak q{FATAL: required 'topic' parameter is missing};
|
|
$self->{channel} = delete $args->{channel} or croak q{FATAL: required 'channel' parameter is missing};
|
|
$self->{message_cb} = delete $args->{message_cb} or croak q{FATAL: required 'message_cb' parameter is missing};
|
|
|
|
for my $arg (qw( ready_count requeue_delay )) {
|
|
$self->{$arg} = delete($args->{$arg}) if exists $args->{$arg};
|
|
}
|
|
|
|
return $self->SUPER::_parse_args($args);
|
|
}
|
|
|
|
|
|
### Connection setup and teardown
|
|
|
|
## called after a successful reply to IDENTIFY, we are ready to rock & roll
|
|
sub _identified {
|
|
my ($self, $conn, $info) = @_;
|
|
|
|
$self->SUPER::_identified(@_);
|
|
|
|
$conn->subscribe(
|
|
$self->{topic},
|
|
$self->{channel},
|
|
|
|
## our message handler...
|
|
sub {
|
|
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
|
my $msg = $_[1];
|
|
|
|
## Keep the connection in the registry in case the user
|
|
## wants to issue FIN/REQs inside the callback
|
|
$self->{routing}->{$msg->{message_id}} = $conn;
|
|
weaken( $self->{routing}->{$msg->{message_id}} );
|
|
|
|
my $action = $self->{message_cb}->($self, $msg);
|
|
|
|
## Action below -1 does nothing, we assume the user took care of it himself
|
|
if (not defined $action) { $conn->mark_as_done_msg($_[1]) }
|
|
elsif ($action >= -1) { $conn->requeue_msg($_[1], $action) }
|
|
|
|
delete($self->{routing}->{$msg->{message_id}});
|
|
}
|
|
);
|
|
|
|
$conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10));
|
|
}
|
|
|
|
sub mark_as_done_msg {
|
|
my ($self, $msg) = @_;
|
|
|
|
my $conn = $self->_find_message_connection($msg);
|
|
return 0 unless $conn;
|
|
|
|
$conn->mark_as_done_msg($msg);
|
|
return 1;
|
|
}
|
|
|
|
sub requeue_msg {
|
|
my ($self, $msg, $delay) = @_;
|
|
|
|
my $conn = $self->_find_message_connection($msg);
|
|
return 0 unless $conn;
|
|
|
|
$conn->requeue_msg($msg, $delay);
|
|
return 1;
|
|
}
|
|
|
|
sub touch_message {
|
|
my ($self, $msg) = @_;
|
|
|
|
my $conn = $self->_find_message_connection($msg);
|
|
return 0 unless $conn;
|
|
|
|
$conn->touch_msg($msg);
|
|
return 1;
|
|
}
|
|
|
|
sub _find_message_connection {
|
|
my ($self, $msg) = @_;
|
|
|
|
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
|
|
|
my $conn = $self->{routing}->{$id};
|
|
|
|
if ( !$conn ) {
|
|
warn "WARN: Could not find the connection to route msg $id";
|
|
}
|
|
|
|
return $conn;
|
|
}
|
|
|
|
1;
|