diff --git a/lib/AnyEvent/NSQ/Client.pm b/lib/AnyEvent/NSQ/Client.pm new file mode 100644 index 0000000..6fd00ea --- /dev/null +++ b/lib/AnyEvent/NSQ/Client.pm @@ -0,0 +1,127 @@ +package AnyEvent::NSQ::Client; + +# ABSTRACT: base class for NSQ.io consumers and producers +# VERSION +# AUTHORITY + +use strict; +use warnings; +use AnyEvent; +use AnyEvent::Socket (); +use Carp 'croak'; +use AnyEvent::NSQ::Connection; + + +sub new { + my ($class, %args) = @_; + my $self = bless {}, $class; + + $self->_parse_args(\%args); + $self->_connect(); + + return $self; +} + + +#### Argument parsing + +## Parse all common arguments +sub _parse_args { + my ($self, $args) = @_; + + $self->{disconnect_cb} = delete($args->{disconnect_cb}) || sub { }; + $self->{error_cb} = delete($args->{error_cb}) || sub { croak($_[1]) }; + + for my $arg (qw( client_id hostname connect_timeout )) { + $self->{$arg} = delete($args->{$arg}) if exists $args->{$arg}; + } + + if (my $lookupd_http_addresses = delete $args->{lookupd_http_addresses}) { + $lookupd_http_addresses = [$lookupd_http_addresses] unless ref($lookupd_http_addresses) eq 'ARRAY'; + $self->{lookupd_http_addresses} = $lookupd_http_addresses; + $self->{use_lookupd} = 1; + } + + if (my $nsqd_tcp_addresses = delete $args->{nsqd_tcp_addresses}) { + croak(q{FATAL: only one of 'lookupd_http_addresses' and 'nsqd_tcp_addresses' is allowed}) if $self->{use_lookupd}; + + $nsqd_tcp_addresses = [$nsqd_tcp_addresses] unless ref($nsqd_tcp_addresses) eq 'ARRAY'; + $self->{nsqd_tcp_addresses} = $nsqd_tcp_addresses; + $self->{use_lookupd} = 0; + } + + ## There can be only one, there must be at least one + croak(q{FATAL: one of 'nsqd_tcp_addresses' or 'lookup'}) unless defined $self->{use_lookupd}; +} + + +#### Connection management + +## support both modes of operation, direct or with lookupd discovery +sub _connect { + my $self = shift; + + if ($self->{use_lookupd}) { + $self->_start_lookupd_poolers; + } + else { + $self->_start_nsqd_connections; + } + + return; +} + +## direct nsqd connection +sub _start_nsqd_connections { + my ($self) = @_; + + for my $nsqd_tcp_address (@{ $self->{nsqd_tcp_addresses} }) { + $self->_start_nsqd_connection($nsqd_tcp_address, reconnect => 1); + } +} + +## nsqlookupd support - not there yet +sub _start_lookupd_poolers { } + + +#### nsqd pool connection management + +## connect to a single element of the pool +sub _start_nsqd_connection { + my ($self, $nsqd_tcp_address, %args) = @_; + + my $conns = $self->{nsqd_conns} ||= {}; + return if $conns->{$nsqd_tcp_address}; + + my ($host, $port) = AnyEvent::Socket::parse_hostport($nsqd_tcp_address, 4150); ## 4150 is the default port for nsqd + croak(qq{FATAL: could not parse '$nsqd_tcp_address' as a valid address/port combination}) unless $host and $port; + + my %conn = (host => $host, port => $port); + for my $arg (qw( client_id hostname error_cb connect_timeout )) { + $conn{$arg} = $self->{$arg} if exists $self->{$arg}; + } + + $conn{connect_cb} = sub { $self->_connected(@_) }; + $conn{disconnect_cb} = sub { $self->_disconnected(@_) }; + + $conns->{$nsqd_tcp_address}{conn} = AnyEvent::NSQ::Connection->new(%conn); + $conns->{$nsqd_tcp_address}{state} = 'connecting'; + + return; +} + + +#### Hooks for the main states of the connection +sub _connected { + my $self = shift; + + ## FIXME: access $nsqd_tcp_address, and update state + + $self->{connect_cb}->(@_) if $self->{connect_cb}; + $_[0]->identify(sub { $self->_identified(@_) }); +} + +sub _identified { $_[0]->{identify_cb}->(@_) if $_[0]->{identify_cb} } +sub _disconnected { $_[0]->{disconnect_cb}->(@_) if $_[0]->{disconnect_cb} } + +1;