From 0d8f13b95002a09570c851d9607e9d40c6811591 Mon Sep 17 00:00:00 2001 From: Pedro Melo Date: Mon, 21 Jul 2014 05:13:36 +0100 Subject: [PATCH] Update publish() on both Connection and Writer to use MPUB if need be: We accept a list of messages now, before the trailing optional callback. If multiple messages are present, we switch to MPUB. Signed-off-by: Pedro Melo --- lib/AnyEvent/NSQ/Connection.pm | 18 +++++++++++++----- lib/AnyEvent/NSQ/Writer.pm | 14 ++++++++------ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/lib/AnyEvent/NSQ/Connection.pm b/lib/AnyEvent/NSQ/Connection.pm index 347f451..98a247c 100644 --- a/lib/AnyEvent/NSQ/Connection.pm +++ b/lib/AnyEvent/NSQ/Connection.pm @@ -130,15 +130,23 @@ sub subscribe { } sub publish { - my ($self, $topic, $data, $cb) = @_; + my ($self, $topic, @data) = @_; return unless my $hdl = $self->{handle}; - $cb = sub { } - unless $cb; + my $cb; + $cb = pop @data if ref($data[-1]) eq 'CODE' or !defined($data[-1]); + return unless @data; - $hdl->push_write("PUB $topic\012" . pack('N', length($data)) . $data); + my $body = join('', map { pack('N', length($_)) . $_ } @data); - $self->_on_next_success_frame($cb); + if (@data == 1) { + $hdl->push_write("PUB $topic\012$body"); + $self->_on_next_success_frame($cb); + } + else { + $hdl->push_write("MPUB $topic\012" . pack('N', length($body)) . pack('N', scalar(@data)) . $body); + $self->_on_next_success_frame($cb); + } return; } diff --git a/lib/AnyEvent/NSQ/Writer.pm b/lib/AnyEvent/NSQ/Writer.pm index 6827cf9..230937f 100644 --- a/lib/AnyEvent/NSQ/Writer.pm +++ b/lib/AnyEvent/NSQ/Writer.pm @@ -12,19 +12,21 @@ use parent 'AnyEvent::NSQ::Client'; #### Producer API -## Publish a single message - callback is only called if we succedd +## Publish a single or multiple message - callback is only called if we succedd sub publish { - my ($self, $topic, $data, $cb) = @_; + my ($self, $topic, @data) = @_; my $conn = $self->_random_connected_conn; croak "ERROR: there no active connections at this moment," unless $conn; - $cb = sub { $cb->($self, $topic, $data, @_) } - if $cb; + if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) { + my $cb = pop @data; + push @data, sub { $cb->($self, $topic, \@data, @_) } + if $cb; + } - return $conn->publish($topic, $data, $cb); + return $conn->publish($topic, @data); } - 1;