diff --git a/lib/AnyEvent/NSQ/Connection.pm b/lib/AnyEvent/NSQ/Connection.pm new file mode 100644 index 0000000..7f53be5 --- /dev/null +++ b/lib/AnyEvent/NSQ/Connection.pm @@ -0,0 +1,313 @@ +package AnyEvent::NSQ::Connection; + +# ABSTRACT: NSQd TCP connection +# VERSION +# AUTHORITY + +use strict; +use warnings; +use AnyEvent::Handle; +use Carp; +use JSON::XS (); +use Sys::Hostname; + +# Options from the python client, we might want to support later: +# timeout=1.0 +# heartbeat_interval=30 +# requeue_delay=90 +# tls_v1=False +# tls_options=None +# snappy=False +# deflate=False +# deflate_level=6 +# output_buffer_size=16384 +# output_buffer_timeout=250 +# sample_rate=0 +# auth_secret=None + +sub new { + my ($class, %args) = @_; + + my $self = bless( + { hostname => hostname(), + connect_timeout => undef, ## use kernel default + error_cb => sub { + croak(qq{FATAL: error from host '$_[0]->{host}' port $_[0]->{port}: $_[1]}); + }, + }, + $class + ); + + $self->{host} = delete $args{host} or croak q{FATAL: required 'host' parameter is missing}; + $self->{port} = delete $args{port} or croak q{FATAL: required 'port' parameter is missing}; + + for my $p (qw( client_id hostname connect_cb )) { + $self->{$p} = delete $args{$p} if exists $args{$p} and defined $args{$p}; + } + + croak(q{FATAL: required 'connect_cb' parameter is missing}) unless $self->{connect_cb}; + croak(q{FATAL: parameter 'connect_cb' must be a CodeRef}) unless ref($self->{connect_cb}) eq 'CODE'; + croak(q{FATAL: parameter 'error_cb' must be a CodeRef}) unless ref($self->{error_cb}) eq 'CODE'; + + $self->connect; + + return $self; +} + +sub connect { + my ($self) = @_; + return if $self->{handle}; + + my $err_cb = $self->{error_cb}; + + $self->{handle} = AnyEvent::Handle->new( + connect => [$self->{host}, $self->{port}], + + on_prepare => sub { $self->{connect_timeout} }, + on_connect => sub { $self->_connected(@_) }, + + on_connect_error => sub { + $self->_disconnected; + $err_cb->($self, '(connect) ' . ($_[1] || $!)); + }, + on_error => sub { + $self->_disconnected; + $err_cb->($self, '(read) ' . ($_[2] || $!)); + }, + on_eof => sub { + $self->_disconnected; + }, + ); + + return; +} + + +## Protocol + +sub identify { + my ($self, @rest) = @_; + my $cb = pop @rest; + return unless my $hdl = $self->{handle}; + + my $data = JSON::XS::encode_json($self->_build_identity_payload(@rest)); + $hdl->push_write("IDENTIFY\012"); + $hdl->push_write(pack('N', length($data))); + $hdl->push_write($data); + + $self->_on_success_frame(sub { $cb->($self, $self->{identify_info} = $_[1]) }); + + return; +} + +sub subscribe { + my ($self, $topic, $chan, $cb) = @_; + return unless my $hdl = $self->{handle}; + + $self->{message_cb} = $cb; + + $hdl->push_write("SUB $topic $chan\012"); + $self->_on_success_frame(sub { }); ## We don't care about the success ok + + return; +} + +sub ready { + my ($self, $n) = @_; + return unless my $hdl = $self->{handle}; + + $self->{ready_count} = $n; + $self->{in_flight} = 0; + $hdl->push_write("RDY $n\012"); + # print STDERR ">>>> READY SET $self->{in_flight} / $self->{ready_count}\n"; + + return; +} + +sub mark_as_done_msg { + my ($self, $msg) = @_; + return unless my $hdl = $self->{handle}; + + $hdl->push_write("FIN $msg->{message_id}\012"); + + return; +} + +sub requeue_msg { + my ($self, $msg, $delay) = @_; + return unless my $hdl = $self->{handle}; + + $hdl->push_write("FIN $msg->{message_id}\012"); + + return; +} + +sub nop { + my ($self, $n) = @_; + return unless my $hdl = $self->{handle}; + + $hdl->push_write("NOP\012"); + + return; +} + + +## Protocol helpers + +sub _build_identity_payload { + my ($self, @rest) = @_; + + my %data = ( + client_id => $self->{client_id}, + short_id => $self->{client_id}, + hostname => $self->{hostname}, + long_id => $self->{hostname}, + ## TODO: heartbeat_interval => ..., ## milliseconds between heartbeats + ## TODO: output_buffer_size => ..., + ## TODO: output_buffer_timeout => ..., + ## TODO: sample_rate => ..., + ## TODO: msg_timeout => ..., + @rest, + feature_negotiation => \1, + ); + + my $ua = "AnyEvent::NSQ::Connection/" . ($AnyEvent::NSQ::Connection::VERSION || 'developer'); + if (!$data{user_agent}) { $data{user_agent} = $ua } + elsif (substr($data{user_agent}, -1) eq ' ') { $data{user_agent} .= $ua } + + for my $k (keys %data) { + delete $data{$k} unless defined $data{$k}; + } + + return \%data; +} + + +## Connection setup and cleanup + +sub _connected { + my ($self) = @_; + + $self->{connected} = 1; + + $self->_send_magic_identifier; + $self->_start_recv_frames; + + $self->{connect_cb}->($self); +} + +sub _disconnected { + my ($self) = @_; + + $self->{handle}->destroy; + delete $self->{$_} for qw(conn connected); +} + +sub _force_disconnect { + my ($self) = @_; + return unless my $hdl = $self->{handle}; + + $hdl->push_shutdown; + $hdl->on_read(sub { }); + $hdl->on_eof(undef); + $hdl->on_error( + sub { + delete $hdl->{rbuf}; + $hdl->destroy; + $self->_disconnected; + } + ); +} + + +## low-level protocol details + +sub _send_magic_identifier { $_[0]{handle}->push_write(' V2') } + +sub _start_recv_frames { + my ($self) = @_; + my $hdl = $self->{handle}; + my $err_cb = $self->{error_cb}; + + my @read_frame; ## on a separate line, we need it circular + @read_frame = ( + chunk => 8, + sub { + my ($size, $frame_type) = unpack('NN', $_[1]); + $hdl->unshift_read( + chunk => $size - 4, ## remove size of frame_type... + sub { + my ($msg) = $_[1]; +# print STDERR ">>>> FRAME $size, $frame_type"; + + if ($frame_type == 0) { ## OK frame +# print STDERR ", success: $msg\n"; + my $info = {}; + if ($msg eq '_heartbeat_') { + $self->nop; + } + else { + if ($msg ne 'OK') { + $info = eval { JSON::XS::decode_json($msg) }; + unless ($info) { + $err_cb->($self, qq{unexpected/invalid JSON response '$msg'}); + $self->_force_disconnect; + @read_frame = (); + return; + } + my $cb = shift @{ $self->{success_cb_queue} || [] }; + $cb->($self, $info) if $cb; + } + } + } + elsif ($frame_type == 1) { ## error frame +# print STDERR ", error: $msg\n"; + $self->{error_cb}->($self, qq{received error '$msg'}); + $self->_force_disconnect; + @read_frame = (); + return; + } + elsif ($frame_type == 2) { ## message frame + my ($t1, $t2, $attempts, $message_id) = unpack('NNnA16', substr($msg, 0, 26, '')); + $msg = { + attempts => $attempts, + message_id => $message_id, + tstamp => ($t2 | ($t1 << 32)), + message => $msg, + }; +# print STDERR ", msg: $attempts, $message_id, $msg\n"; + $self->{in_flight}++; + $self->{message_cb}->($self, $msg) if $self->{message_cb}; + + ## FIXME: this logic was more of infered than learned, but I remember seeing 25% somewhere +# print STDERR ">>>> READY CHECK? $self->{in_flight} / $self->{ready_count}\n"; + $self->ready($self->{ready_count}) + if $self->{ready_count} and $self->{in_flight} / $self->{ready_count} > .25; + } + else { + $err_cb->($self, qq{unexpected frame type '$frame_type'}); + $self->_force_disconnect; + @read_frame = (); + return; + } + + ## ... and keep reading those frames +# print STDERR ">>>> RESET FRAME READER\n"; + $hdl->push_read(@read_frame); + } + ); + } + ); + + ## Start with first frame... + $hdl->push_read(@read_frame); +} + +sub _on_success_frame { + my ($self, $cb) = @_; + + push @{ $self->{success_cb_queue} }, $cb; +} + + +1; diff --git a/lib/AnyEvent/NSQ/Reader.pm b/lib/AnyEvent/NSQ/Reader.pm new file mode 100644 index 0000000..4652e2e --- /dev/null +++ b/lib/AnyEvent/NSQ/Reader.pm @@ -0,0 +1,118 @@ +package AnyEvent::NSQ::Reader; + +# ABSTRACT: a NSQ.io asynchronous consumer +# VERSION +# AUTHORITY + +use strict; +use warnings; +use AnyEvent; +use AnyEvent::Socket (); +use Carp 'croak'; +use AnyEvent::NSQ::Connection; + +sub new { + my ($class, %args) = @_; + my $self = bless {}, $class; + + $self->{topic} = delete $args{topic} or croak q{FATAL: required 'topic' parameter is missing}; + $self->{channel} = delete $args{channel} or croak q{FATAL: required 'channel' parameter is missing}; + $self->{message_cb} = delete($args{message_cb}) or croak q{FATAL: required 'message_cb' parameter is missing}; + + $self->{disconnect_cb} = delete($args{disconnect_cb}) || sub { }; + $self->{error_cb} = delete($args{error_cb}) || sub { croak($_[1]) }; + + $self->{ready_count} = delete($args{ready_count}) if exists $args{ready_count}; + + if (my $lookupd_http_addresses = delete $args{lookupd_http_addresses}) { + $lookupd_http_addresses = [$lookupd_http_addresses] unless ref($lookupd_http_addresses) eq 'ARRAY'; + $self->{lookupd_http_addresses} = $lookupd_http_addresses; + $self->{use_lookupd} = 1; + } + + if (my $nsqd_tcp_addresses = delete $args{nsqd_tcp_addresses}) { + croak(q{FATAL: only one of 'lookupd_http_addresses' and 'nsqd_tcp_addresses' is allowed}) if $self->{use_lookupd}; + + $nsqd_tcp_addresses = [$nsqd_tcp_addresses] unless ref($nsqd_tcp_addresses) eq 'ARRAY'; + $self->{nsqd_tcp_addresses} = $nsqd_tcp_addresses; + $self->{use_lookupd} = 0; + } + + ## There can be only one, there must be at least one + croak(q{FATAL: one of 'nsqd_tcp_addresses' or 'lookup'}) unless defined $self->{use_lookupd}; + + $self->connect(); + + return $self; +} + +sub connect { + my $self = shift; + + if ($self->{use_lookupd}) { + $self->_start_lookupd_poolers; + } + else { + $self->_start_nsqd_connections; + } + + return; +} + +sub _start_nsqd_connections { + my ($self) = @_; + + for my $nsqd_tcp_address (@{ $self->{nsqd_tcp_addresses} }) { + $self->_start_nsqd_connection($nsqd_tcp_address, reconnect => 1); + } +} + +sub _start_nsqd_connection { + my ($self, $nsqd_tcp_address, %args) = @_; + + my $conns = $self->{nsqd_conns} ||= {}; + return if $conns->{$nsqd_tcp_address}; + + my ($host, $port) = AnyEvent::Socket::parse_hostport($nsqd_tcp_address, 4150); ## 4150 is the default port for nsqd + croak(qq{FATAL: could not parse '$nsqd_tcp_address' as a valid address/port combination}) unless $host and $port; + + $conns->{$nsqd_tcp_address}{conn} = AnyEvent::NSQ::Connection->new( + host => $host, + port => $port, + error_cb => $self->{error_cb}, + connect_cb => sub { + my ($conn) = @_; + $conn->identify( + sub { + $conn->subscribe( + $self->{topic}, + $self->{channel}, + sub { + ## FIXME: bless it with the future AnyEvent::NSQ::Message + my $msg = $_[1]; + + my $action = $self->{message_cb}->($self, $msg); + + if (not defined $action) { $conn->mark_as_done_msg($_[1]) } + elsif ($action >= 0) { $conn->requeue_msg($_[1], $action) } + else { $conn->touch_msg($_[1], -$action) } + } + ); + + $conn->ready($self->{ready_count} || int(($_[1]{max_rdy_count} || 2000) / 10)); + } + ); + }, + disconnect_cb => sub { + ## FIXME: deal with reconnects here + $self->{disconnect_cb}->(@_) if $self->{disconnect_cb}; + }, + ); + $conns->{$nsqd_tcp_address}{state} = 'connecting'; + + return; +} + +sub _start_lookupd_poolers { } + +1; diff --git a/t/10-reader.t b/t/10-reader.t new file mode 100755 index 0000000..acadbd8 --- /dev/null +++ b/t/10-reader.t @@ -0,0 +1,28 @@ +#!perl + +use strict; +use warnings; +use Test::More; +use AnyEvent; +use AnyEvent::NSQ::Reader; + +subtest 'basic connection' => sub { + my $r = AnyEvent::NSQ::Reader->new( + topic => 'test', + channel => 'anyevent-nsq-reader-test', + nsqd_tcp_addresses => '127.0.0.1', + client_id => "10-reader.t/$$", + + ready_count => 5, + + message_cb => sub { + print STDERR "!!!! GOT MESSAGE '$_[1]{message}\n"; + return; + }, + ); + + AE::cv->recv; +}; + + +done_testing();