diff --git a/lib/AnyEvent/NSQ/Client.pm b/lib/AnyEvent/NSQ/Client.pm index 5ea01ad..f5c9852 100644 --- a/lib/AnyEvent/NSQ/Client.pm +++ b/lib/AnyEvent/NSQ/Client.pm @@ -11,7 +11,9 @@ use AnyEvent::Socket (); use Carp 'croak'; use AnyEvent::NSQ::Connection; +#### Public API +## constructor sub new { my ($class, %args) = @_; my $self = bless {}, $class; @@ -22,12 +24,29 @@ sub new { return $self; } +## disconnect from our pool of nsqd connection sub disconnect { my ($self, $cb) = @_; $_->{conn}->disconnect($cb) for values %{ $self->{nsqd_conns} }; } +## Publish a single or multiple message - callback is only called if we succedd +sub publish { + my ($self, $topic, @data) = @_; + + my $conn = $self->_random_connected_conn; + croak "ERROR: there no active connections at this moment," unless $conn; + + if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) { + my $cb = pop @data; + push @data, sub { $cb->($self, $topic, \@data, @_) } + if $cb; + } + + return $conn->publish($topic, @data); +} + #### Argument parsing diff --git a/lib/AnyEvent/NSQ/Writer.pm b/lib/AnyEvent/NSQ/Writer.pm index 230937f..90294ed 100644 --- a/lib/AnyEvent/NSQ/Writer.pm +++ b/lib/AnyEvent/NSQ/Writer.pm @@ -10,23 +10,5 @@ use Carp 'croak'; use parent 'AnyEvent::NSQ::Client'; -#### Producer API - -## Publish a single or multiple message - callback is only called if we succedd -sub publish { - my ($self, $topic, @data) = @_; - - my $conn = $self->_random_connected_conn; - croak "ERROR: there no active connections at this moment," unless $conn; - - if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) { - my $cb = pop @data; - push @data, sub { $cb->($self, $topic, \@data, @_) } - if $cb; - } - - return $conn->publish($topic, @data); -} - 1;