diff --git a/examples/publisher.pl b/examples/publisher.pl new file mode 100755 index 0000000..0128d8e --- /dev/null +++ b/examples/publisher.pl @@ -0,0 +1,52 @@ +#!/usr/bin/env perl + +use strict; +use warnings; +use FindBin; +use lib "$FindBin::Bin/../lib"; +use AnyEvent; +use AnyEvent::Handle; +use AnyEvent::NSQ::Writer; + +my ($topic, $factor) = @ARGV; +die "Usage: publisher.pl topic [msgs_per_call]\n" unless $topic; + +$factor = 1 unless $factor; + +my $cv = AE::cv; + +## Our Publisher +my $r = AnyEvent::NSQ::Writer->new( + nsqd_tcp_addresses => '127.0.0.1', + client_id => "${topic}_producer/pid_$$", + + error_cb => sub { warn "$_[1]\n" }, + disconnect_cb => sub { warn "Got disconnected... exiting...\n"; $cv->send }, +); + +## Publish each input line +my $hdl; +$hdl = AnyEvent::Handle->new(fh => \*STDIN, on_error => sub { $r->disconnect; $hdl->destroy; undef $hdl }); + +my $c = 1; +my @readline; +@readline = ( + line => sub { + my $line = $_[1]; + chomp($line); + return unless length($line); + + $r->publish($topic, "$c: $line"); + $c++; + $_[0]->push_read(@readline); + } +); +$hdl->push_read(@readline); + +if (-t \*STDOUT) { + print "Type lines, to publish"; + print " (multiplication factor $factor)" if $factor > 1; + print ", type Ctrl-D to end:\n"; +} + +$cv->recv;