Connection routing registry
This commit is contained in:
@@ -10,6 +10,15 @@ use Carp 'croak';
|
|||||||
|
|
||||||
use parent 'AnyEvent::NSQ::Client';
|
use parent 'AnyEvent::NSQ::Client';
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my $class = shift;
|
||||||
|
my $self = $class->SUPER::new(@_);
|
||||||
|
|
||||||
|
# To keep registry of the connection bound to each message
|
||||||
|
$self->{routing} = {};
|
||||||
|
|
||||||
|
return $self;
|
||||||
|
}
|
||||||
|
|
||||||
#### Parameter parsing
|
#### Parameter parsing
|
||||||
|
|
||||||
@@ -46,15 +55,45 @@ sub _identified {
|
|||||||
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
||||||
my $msg = $_[1];
|
my $msg = $_[1];
|
||||||
|
|
||||||
|
## Keep the connection in the registry in case the user
|
||||||
|
## wants to issue FIN/REQs inside the callback
|
||||||
|
$self->{routing}->{$msg->{message_id}} = $conn;
|
||||||
|
|
||||||
my $action = $self->{message_cb}->($self, $msg);
|
my $action = $self->{message_cb}->($self, $msg);
|
||||||
|
|
||||||
## Action below -1 does nothing, we assume the user took care of it himself
|
## Action below -1 does nothing, we assume the user took care of it himself
|
||||||
if (not defined $action) { $conn->mark_as_done_msg($_[1]) }
|
if (not defined $action) { $conn->mark_as_done_msg($_[1]) }
|
||||||
elsif ($action >= -1) { $conn->requeue_msg($_[1], $action) }
|
elsif ($action >= -1) { $conn->requeue_msg($_[1], $action) }
|
||||||
|
|
||||||
|
delete($self->{routing}->{$msg->{message_id}});
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
$conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10));
|
$conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub mark_as_done_msg {
|
||||||
|
my $self = shift;
|
||||||
|
my $message = shift;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub requeue_msg {
|
||||||
|
my $self = shift;
|
||||||
|
my $message = shift;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub touch_message {
|
||||||
|
my $self = shift;
|
||||||
|
my $message = shift;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _find_message_connection {
|
||||||
|
my $self = shift;
|
||||||
|
my $message = shift;
|
||||||
|
|
||||||
|
my $message_id = ref($message) ? $message->{message_id} : $message;
|
||||||
|
|
||||||
|
return $self->{routing}->{$message_id};
|
||||||
|
}
|
||||||
|
|
||||||
1;
|
1;
|
||||||
|
|||||||
Reference in New Issue
Block a user