package POEDaemon::TCPClient; use strict; use warnings FATAL => 'all'; no warnings 'redefine'; use Data::Dumper; use Socket qw(PF_INET PF_INET6 IPPROTO_TCP TCP_NODELAY sockaddr_family inet_ntop); use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::SSL); use POEDaemon; sub IPPROTO_SCTP () { return getprotobyname 'sctp'; } sub SCTP_NODELAY () { return 0x00000004; } sub states { return $_[0], [qw( tcpclient_start tcpclient_init tcpclient_find_target tcpclient_stop tcpclient_connect_success tcpclient_connect_error tcpclient_error tcpclient_periodic3s_idle_check tcpclient_shutdown_all tcpclient_shutdown )]; } sub tcpclient_start { my ($kernel, $heap) = @_[KERNEL, HEAP]; log_enabled && logline "using POE::Filter::SSL v%s", $POE::Filter::SSL::VERSION || 'n/a'; $heap->{periodic}->{tcpclient_periodic3s_idle_check} = 3; } sub tcpclient_init { my ($heap, $args) = @_[HEAP, ARG0]; my $socket_domain = $args->{socketdomain}; my $address = $args->{remoteaddress}; my $port = cfg->{tcpclient_port}; return unless defined $socket_domain && $address && $port; my $socket_domain_number; if ($socket_domain =~ /^\d+$/) { $socket_domain_number = $socket_domain; } elsif ($socket_domain =~ /^inet6?$/) { if ($socket_domain eq 'inet') { $socket_domain_number = PF_INET; } elsif ($socket_domain eq 'inet6') { $socket_domain_number = PF_INET6; } } else { log_enabled && logline "invalid socketdomain '%s'", $socket_domain; return; } my $socket_protocol = 'tcp'; $socket_protocol = 'sctp' if $heap->{tcpclient_connretry_count} <= 5 && !$heap->{tcpclient_disable_sctp}; log_enabled && logline "using POE::Filter::SSL v%s", $POE::Filter::SSL::VERSION || 'n/a'; my $wheel = POE::Wheel::SocketFactory->new ( SocketDomain => $socket_domain_number, SocketProtocol => $socket_protocol, RemoteAddress => $address, RemotePort => $port, (cfg->{nodelay_disabled} ? () : (NoDelay => 1)), SuccessEvent => 'tcpclient_connect_success', FailureEvent => 'tcpclient_connect_error', ); my $wheel_id = $wheel->ID; my $sockinfo_local = get_sockinfo $wheel->getsockname; log_enabled && logline "[tcpclient #%s] trying connect to [%s]:%s %s (local: %s %s %s)", $wheel_id, $address, $port, uc $socket_protocol, uc($sockinfo_local->{family} || 'n/a'), $sockinfo_local->{addr} || 'n/a', $sockinfo_local->{port} || 'n/a'; $heap->{tcpclient}->{clients}->{$wheel_id} = { wheel => $wheel, wheel_id => $wheel_id, socket_domain => $socket_domain, socket_protocol => $socket_protocol, address => $address, port => $port, sockinfo_local => $sockinfo_local, }; } sub tcpclient_find_target { my ($kernel, $heap, $state) = @_[KERNEL, HEAP, STATE]; return if $heap->{shutdown}; $heap->{tcpclient_connretry_count} = 0 unless defined $heap->{tcpclient_connretry_count}; $heap->{tcpclient_connretry_count}++; if ($heap->{tcpclient_connretry_count} >= 3) { $heap->{tcpclient_disable_sctp} = 1; } my $tcpclient_address = cfg->{tcpclient_address}; if (ref $tcpclient_address eq 'ARRAY') { $tcpclient_address = $tcpclient_address->[int rand($#{$tcpclient_address} + 1)]; } if ($tcpclient_address =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/) { if ($heap->{tcpclient_connretry_count} <= 10) { log_enabled && logline 'inet addr found... trying only inet6 first 10 times, retry'; $kernel->delay($state => connretry_rand); return; } $kernel->delay(tcpclient_init => connretry_rand() => { socketdomain => 'inet', remoteaddress => $tcpclient_address, }); } elsif ($tcpclient_address =~ /:/) { $kernel->delay(tcpclient_init => connretry_rand() => { socketdomain => 'inet6', remoteaddress => $tcpclient_address, }); } else { unless ($heap->{dns} || $heap->{wheel_run}->{children_by_name}->{dnschild}) { log_enabled && logline 'dns resolver not found yet, retry'; $kernel->delay($state => connretry_rand); return; } $kernel->delay(dns_query => connretry_rand() => { host => $tcpclient_address, }); } } sub tcpclient_stop { my ($kernel, $heap) = @_[KERNEL, HEAP]; log_enabled && logline 'tcpclient stop'; #delete $heap->{tcpclient}; $kernel->yield('tcpclient_shutdown_all'); } sub tcpclient_connect_success { my ($kernel, $heap, $socket, $addr_packed, $port, $sock_wheel_id) = @_[KERNEL, HEAP, ARG0 .. ARG3]; my $socket_protocol = $heap->{tcpclient}->{clients}->{$sock_wheel_id}->{socket_protocol}; unless (defined $socket) { log_enabled && logline 'invalid (undef) socket'; return; } unless (defined $addr_packed) { log_enabled && logline 'invalid (undef) addr_packed'; return; } my $sockaddr = getpeername $socket; unless (defined $sockaddr) { log_enabled && logline "invalid (undef) sockaddr, error = '%s'", $!; $kernel->delay(tcpclient_find_target => connretry_rand); return; } my $addr; if ($addr_packed =~ /^[\w\.\-\:]+$/i) { log_enabled && logline "using workaround for addr, because addr = '%s'", hexval $addr_packed; $addr = get_sockinfo($sockaddr)->{addr}; } else { #log_enabled && logline "binary addr = '%s'", hexval $addr_packed; my $sockaddr_family = sockaddr_family $sockaddr; unless (defined $sockaddr_family) { log_enabled && logline "invalid (undef) sockaddr_family, error = '%s'", $!; $kernel->delay(tcpclient_find_target => connretry_rand); return; } $addr = inet_ntop $sockaddr_family, $addr_packed; } unless ($port =~ /^\d+$/) { log_enabled && logline "using workaround for port, because port = '%s'", hexval $port; my $sockaddr_family = sockaddr_family $sockaddr; unless (defined $sockaddr_family) { log_enabled && logline "invalid (undef) sockaddr_family, error = '%s'", $!; $kernel->delay(tcpclient_find_target => connretry_rand); return; } $port = get_sockinfo($sockaddr_family)->{port}; } my $line_filter = POE::Filter::Line->new ( MaxBuffer => 8192, MaxLength => 4096, InputLiteral => "\n", OutputLiteral => "\n", ); my $ssl_filter = POE::Filter::SSL->new(%{cfg->{tcpclient_poe_filter_ssl_new_params}}); my ($tcp_nodelay, $sctp_nodelay); if ($socket_protocol eq 'tcp') { my $tcp_nodelay_packed = getsockopt $socket, IPPROTO_TCP, TCP_NODELAY; $tcp_nodelay = unpack 'I', $tcp_nodelay_packed if defined $tcp_nodelay_packed; } elsif ($socket_protocol eq 'sctp') { my $sctp_nodelay_packed = getsockopt $socket, IPPROTO_SCTP, SCTP_NODELAY; $sctp_nodelay = unpack 'I', $sctp_nodelay_packed if defined $sctp_nodelay_packed; } my $wheel = POE::Wheel::ReadWrite->new ( Handle => $socket, Filter => POE::Filter::Stackable->new ( Filters => [ $ssl_filter, $line_filter, ], ), InputEvent => 'tcpclient_input', ErrorEvent => 'tcpclient_error', ); my $wheel_id = $wheel->ID; log_enabled && logline "[tcpclient #%s conn #%s] connected to [%s]:%s %s%s%s", $sock_wheel_id, $wheel_id, $addr, $port, uc $socket_protocol, $tcp_nodelay ? ' (TCP_NODELAY)' : '', $sctp_nodelay ? ' (SCTP_NODELAY)' : ''; $heap->{tcpclient}->{connections}->{$wheel_id} = { wheel => $wheel, sock_wheel_id => $sock_wheel_id, ($ssl_filter ? (ssl_filter => $ssl_filter) : ()), addr => $addr, port => $port, connect_time => time_hires, }; delete $heap->{tcpclient_connretry_count}; $kernel->yield(tcpclient_onconnect => $wheel_id); } sub tcpclient_connect_error { my ($kernel, $heap, $operation, $errnum, $errstr, $sock_wheel_id) = @_[KERNEL, HEAP, ARG0 .. ARG3]; log_enabled && logline "[tcpclient #%s] %s error %s: %s", $sock_wheel_id, $operation, $errnum, $errstr; delete $heap->{tcpclient}->{clients}->{$sock_wheel_id}; $kernel->yield('tcpclient_ondisconnect'); unless ($heap->{shutdown}) { $kernel->delay(tcpclient_find_target => connretry_rand); } } sub tcpclient_error { my ($kernel, $heap, $operation, $errnum, $errstr, $wheel_id) = @_[KERNEL, HEAP, ARG0 .. ARG3]; my $sock_wheel_id = $heap->{tcpclient}->{connections}->{$wheel_id}->{sock_wheel_id}; log_enabled && logline "[tcpclient #%s conn #%s] disconnected: %s error %s: %s", $sock_wheel_id, $wheel_id, $operation, $errnum, $errstr; delete $heap->{tcpclient}->{clients}->{$sock_wheel_id}; delete $heap->{tcpclient}->{connections}->{$wheel_id}; $kernel->yield('tcpclient_ondisconnect'); unless ($heap->{shutdown}) { $kernel->delay(tcpclient_find_target => connretry_rand); } } sub tcpclient_periodic3s_idle_check { my ($kernel, $heap) = @_[KERNEL, HEAP]; foreach my $wheel_id (keys %{$heap->{tcpclient}->{connections}}) { my $client = $heap->{tcpclient}->{connections}->{$wheel_id}; next unless $client; my $sock_wheel_id = $client->{sock_wheel_id}; my $max_idle_time = 20; my $max_idle_time_disconnect = 60; my ($connected, $input_idle, $output_idle); if ($client->{connect_time}) { $connected = time_hires - $client->{connect_time}; } if ($client->{last_input_time}) { $input_idle = time_hires - $client->{last_input_time}; } if ($client->{last_output_time}) { $output_idle = time_hires - $client->{last_output_time}; } #log_enabled && logline "WID #%s max_idle_time=%s connected=%s input_idle=%s output_idle=%s", # $wheel_id, # $max_idle_time || 'n/a', # defined $connected ? concise_duration $connected : 'n/a', # defined $input_idle ? concise_duration $input_idle : 'n/a', # defined $output_idle ? concise_duration $output_idle : 'n/a'; if (defined $connected && $connected >= $max_idle_time_disconnect && (!defined $input_idle || $input_idle >= $max_idle_time_disconnect || !defined $output_idle || $output_idle >= $max_idle_time_disconnect)) { log_enabled && logline "[tcpclient #%s conn #%s] killed, idle %s (max before disconnect %s)", $sock_wheel_id, $wheel_id, (defined $input_idle ? concise_duration $input_idle : 'n/a'), concise_duration $max_idle_time_disconnect; delete $heap->{tcpclient}->{clients}->{$sock_wheel_id}; delete $heap->{tcpclient}->{connections}->{$wheel_id}; $kernel->yield('tcpclient_ondisconnect'); unless ($heap->{shutdown}) { $kernel->delay(tcpclient_find_target => connretry_rand); } } elsif ($max_idle_time && (!defined $input_idle || $input_idle >= $max_idle_time)) { log_enabled && logline "[tcpclient #%s conn #%s] idle for more than %s (max %s)", $sock_wheel_id, $wheel_id, (defined $input_idle ? concise_duration $input_idle : 'n/a'), concise_duration $max_idle_time; $kernel->yield(tcpclient_idle => $wheel_id); } } } sub tcpclient_shutdown_all { my ($kernel, $heap) = @_[KERNEL, HEAP]; foreach my $wheel_id (keys %{$heap->{tcpclient}->{connections}}) { my $client = $heap->{tcpclient}->{connections}->{$wheel_id}; next unless $client; $kernel->yield(tcpclient_graceful_disconnect => $wheel_id); } } sub tcpclient_shutdown { my ($heap, $wheel_id) = @_[HEAP, ARG0]; my $sock_wheel_id = $heap->{tcpclient}->{connections}->{$wheel_id}->{sock_wheel_id}; log_enabled && logline "[tcpclient #%s conn #%s] shutdown", $sock_wheel_id, $wheel_id; delete $heap->{tcpclient}->{clients}->{$sock_wheel_id}; delete $heap->{tcpclient}->{connections}->{$wheel_id}; #log_enabled && logline "heap->{tcpclient}:\n%s", Dumper $heap->{tcpclient}; } 1;