From 7edb6bbfa0c2f5b74fb88c2c7db0f72ddbc5c423 Mon Sep 17 00:00:00 2001 From: Bruno Tavares Date: Wed, 3 Sep 2014 10:19:11 +0100 Subject: [PATCH] Connection routing registry --- lib/AnyEvent/NSQ/Reader.pm | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/lib/AnyEvent/NSQ/Reader.pm b/lib/AnyEvent/NSQ/Reader.pm index 9076e91..b288d5c 100644 --- a/lib/AnyEvent/NSQ/Reader.pm +++ b/lib/AnyEvent/NSQ/Reader.pm @@ -10,6 +10,15 @@ use Carp 'croak'; use parent 'AnyEvent::NSQ::Client'; +sub new { + my $class = shift; + my $self = $class->SUPER::new(@_); + + # To keep registry of the connection bound to each message + $self->{routing} = {}; + + return $self; +} #### Parameter parsing @@ -46,15 +55,45 @@ sub _identified { ## FIXME: bless it with the future AnyEvent::NSQ::Message my $msg = $_[1]; + ## Keep the connection in the registry in case the user + ## wants to issue FIN/REQs inside the callback + $self->{routing}->{$msg->{message_id}} = $conn; + my $action = $self->{message_cb}->($self, $msg); ## Action below -1 does nothing, we assume the user took care of it himself if (not defined $action) { $conn->mark_as_done_msg($_[1]) } elsif ($action >= -1) { $conn->requeue_msg($_[1], $action) } + + delete($self->{routing}->{$msg->{message_id}}); } ); $conn->ready($self->{ready_count} || int(($info->{max_rdy_count} || 2000) / 10)); } +sub mark_as_done_msg { + my $self = shift; + my $message = shift; +} + +sub requeue_msg { + my $self = shift; + my $message = shift; +} + +sub touch_message { + my $self = shift; + my $message = shift; +} + +sub _find_message_connection { + my $self = shift; + my $message = shift; + + my $message_id = ref($message) ? $message->{message_id} : $message; + + return $self->{routing}->{$message_id}; +} + 1;