package POEDaemon::TCPServer::Main; use strict; use warnings FATAL => 'all'; no warnings 'redefine'; use Data::Dumper; #use Digest::SHA qw(sha256_hex); use POE; use POEDaemon; sub states { return $_[0], [qw( tcpserver_main_start tcpserver_client_input tcpserver_output tcpserver_client_idle tcpserver_client_rtt tcpserver_client_disconnected tcpserver_periodic30s_get_adc_all_allclients tcpserver_periodic22s_get_data_allclients tcpserver_periodic7s_get_rtt_allclients )]; } sub tcpserver_main_start { my $heap = $_[HEAP]; #$heap->{periodic}->{tcpserver_periodic30s_get_adc_all_allclients} = 30; $heap->{periodic}->{tcpserver_periodic22s_get_data_allclients} = 22; $heap->{periodic}->{tcpserver_periodic7s_get_rtt_allclients} = 7; } sub tcpserver_client_input { my ($kernel, $session, $heap, $input, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0, ARG1]; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $sock_wheel_id = $client->{sock_wheel_id}; my $ssl_filter = $client->{ssl_filter}; my $http = $client->{http}; my $websocket = $client->{websocket}; $client->{last_input_time} = time_hires; if ($http) { unless ($websocket) { $client->{httprequest_count} = 0 unless defined $client->{httprequest_count}; $client->{last_httprequest_speed} = 0 unless defined $client->{last_httprequest_speed}; $client->{httprequest_count}++; } } else { $client->{input_line_count} = 0 unless defined $client->{input_line_count}; $client->{last_input_line_speed} = 0 unless defined $client->{last_input_line_speed}; $client->{input_line_count}++; } my $ssl_certdebug = 1; if (!$client->{ssl_ok} && $ssl_filter) { unless ($ssl_filter->handshakeDone) { log_enabled && logline "[tcpserver #%s conn #%s] ssl handshake not done yet", $sock_wheel_id, $wheel_id; return; } unless ($ssl_filter->clientCertValid) { if ($http) { log_enabled && logline "[tcpserver #%s conn #%s] ssl client cert invalid (allowed for http client)", $sock_wheel_id, $wheel_id; } else { my $reason = 'ssl client cert invalid'; $kernel->call($session => tcpserver_client_kick => { wheel_id => $wheel_id, reason => $reason, }); return; } } my $ssl_version = $ssl_filter->getVersion; $client->{ssl_version} = $ssl_version if $ssl_version; my $ssl_cipher = $ssl_filter->getCipher; if ($ssl_cipher =~ /none/i) { my $reason = 'ssl client cipher = none'; $kernel->call($session => tcpserver_client_kick => { wheel_id => $wheel_id, reason => $reason, }); return; } $client->{cipher} = $ssl_cipher if $ssl_cipher; my $ssl_client_cert_ids = parse_ssl_cert_ids $ssl_filter->clientCertIds; my $ssl_cn; $ssl_cn = $ssl_client_cert_ids->[$#$ssl_client_cert_ids]->{subject}->{CN} if $ssl_client_cert_ids; if ($ssl_cn) { if ($heap->{tcpserver}->{connections_by_cn}->{$ssl_cn}) { my $reason = sprintf "ssl duplicate cn '%s'", $ssl_cn; $kernel->call($session => tcpserver_client_kick => { wheel_id => $wheel_id, reason => $reason, }); return; } $client->{cn} = $ssl_cn; $heap->{tcpserver}->{connections_by_cn}->{$ssl_cn} = $client; my $admin_acl_re = cfg->{admin_cn_acl}; if (ref $admin_acl_re eq 'Regexp' && $ssl_cn =~ $admin_acl_re) { $client->{admin} = 1; } my $poeserver_acl_re = cfg->{poeserver_cn_acl}; if (ref $poeserver_acl_re eq 'Regexp' && $ssl_cn =~ $poeserver_acl_re) { $client->{poeclient} = 1; $client->{client_max_idle_time} = 10; $client->{client_max_idle_time_disconnect} = 30; } my $fastcgi_acl_re = cfg->{fastcgi_cn_acl}; if (ref $fastcgi_acl_re eq 'Regexp' && $ssl_cn =~ $fastcgi_acl_re) { $client->{fastcgi} = 1; } $kernel->yield(eventsystem_input => { type => 'clientsslhandshake', cn => $client->{cn}, ip => $client->{client_addr}, time => time_hires, }); } if (log_enabled && $ssl_certdebug) { my @certids_list; foreach my $certid (@{$ssl_client_cert_ids || []}) { push @certids_list, sprintf "subject=%s issuer=%s serial=%s", $certid->{subject_string}, $certid->{issuer_string}, $certid->{serial}; } log_enabled && logline "[tcpserver #%s conn #%s]%s sslver=%s cipher=%s%s", $sock_wheel_id, $wheel_id, (defined $ssl_cn ? sprintf " cn='%s'", $ssl_cn : ''), $ssl_version, $ssl_cipher, (@certids_list ? sprintf(" certs %s", join(', ', @certids_list)) : ''); #log_enabled && logline Dumper \@cid; # #log_enabled && logline "client CN = '%s'", $cid[$#cid]->{subject}->{CN}; } $client->{client_cert_ids} = $ssl_client_cert_ids if $ssl_client_cert_ids; my @certids_list; foreach my $certid (@{$ssl_client_cert_ids || []}) { push @certids_list, sprintf "subject=%s issuer=%s serial=%s", dumper_oneline($certid->{subject_string}), dumper_oneline($certid->{issuer_string}), dumper_oneline($certid->{serial}); } prod_log_enabled && prod_logline "ssl handshake from %s - cipher=%s%s%s", dumper_oneline($client->{client_addr}), dumper_oneline($client->{cipher}), (defined $client->{cn} ? sprintf " cn=%s", dumper_oneline $client->{cn} : ''), (@certids_list ? sprintf " certs %s", join(', ', @certids_list) : ''); $client->{ssl_handshake_time} = time_hires; $client->{ssl_ok} = 1; } return unless $client->{admin} || $client->{poeclient} || $client->{fastcgi} || $http || $websocket; my $cipher = $client->{cipher}; my $cn = $client->{cn}; my $no_log; if ($http) { if ($websocket) { $no_log = 1 unless cfg->{log_raw_websocket_io}; } else { $no_log = 1 unless cfg->{log_httpfilter_io}; } } unless ($no_log) { log_enabled && logline "[tcpserver #%s conn #%s%s%s%s] input = '%s'", $sock_wheel_id, $wheel_id, (cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''), (cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''), ($websocket ? ' WebSocket' : ''), ($websocket ? hexval $input : $input); } #log_enabled && logline "input:\n%s", Dumper $input; if ($http) { $kernel->call($session => tcpserver_client_commands_http => { wheel_id => $wheel_id, input => $input, }); return; } return unless $client->{ssl_ok}; foreach my $client_type (qw(fastcgi poeclient admin)) { next unless $client->{$client_type}; $kernel->call($session => sprintf("tcpserver_client_commands_%s", $client_type) => { wheel_id => $wheel_id, input => $input, }); return; } } sub tcpserver_output { my ($kernel, $heap, $args) = @_[KERNEL, HEAP, ARG0]; my $wheel_id = $args->{wheel_id}; my $output = $args->{output}; my $no_log = $args->{no_log}; my $use_flush = $args->{use_flush}; return unless $wheel_id && defined $output; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $sock_wheel_id = $client->{sock_wheel_id}; my $cipher = $client->{cipher}; my $cn = $client->{cn}; my $http = $client->{http}; my $websocket = $client->{websocket}; my $line_count; unless ($http) { if (ref $output eq 'ARRAY') { return if $#$output == -1; $line_count = $#$output + 1; $output = join "\n", @$output; } else { $line_count = substr_count($output, "\n") + 1; } } if ($http) { if ($websocket) { $no_log = 1 unless cfg->{log_raw_websocket_io}; } else { $no_log = 1 unless cfg->{log_httpfilter_io}; } } unless ($no_log) { log_enabled && logline "[tcpserver #%s conn #%s%s%s%s]%s output = '%s'", (defined $sock_wheel_id ? $sock_wheel_id : 'undef'), $wheel_id, (cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''), (cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''), ($websocket ? ' WebSocket' : ''), ($use_flush ? ' flush' : ''), ($websocket ? hexval $output : $output); } my $wheel = $client->{wheel}; return unless $wheel; $client->{last_output_time} = time_hires; unless ($http) { $client->{output_line_count} = 0 unless defined $client->{output_line_count}; $client->{last_output_line_speed} = 0 unless defined $client->{last_output_line_speed}; $client->{output_line_count} += $line_count; } #if (ref $output eq 'HTTP::Response') #{ # log_enabled && logline "HTTP::Response->content sha256 hex '%s'", sha256_hex $output->content; #} $wheel->put($output); $wheel->flush if $use_flush; } sub tcpserver_client_idle { my ($kernel, $heap, $wheel_id) = @_[KERNEL, HEAP, ARG0]; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $poeclient = $client->{poeclient}; my $websocket = $client->{websocket}; if ($poeclient) { my $output = sprintf "%s ping", POESERVER_CMD_PREFIX; $kernel->yield(tcpserver_output => { wheel_id => $wheel_id, output => $output, }); } elsif ($websocket) { if ($heap->{statelist}->{tcpserver_output_websocket}) { my $output = { type => 'ping' }; $kernel->yield(tcpserver_output_websocket => { wheel_id => $wheel_id, output => $output, }); } } } sub tcpserver_client_rtt { my ($kernel, $heap, $wheel_id) = @_[KERNEL, HEAP, ARG0]; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $poeclient = $client->{poeclient}; my $websocket = $client->{websocket}; if ($poeclient) { my $output = sprintf "%s rtt %s", POESERVER_CMD_PREFIX, time_hires; $kernel->yield(tcpserver_output => { wheel_id => $wheel_id, output => $output, }); } elsif ($websocket) { if ($heap->{statelist}->{tcpserver_output_websocket}) { my $output = sprintf "rtt %s", time_hires; $kernel->yield(tcpserver_output_websocket => { wheel_id => $wheel_id, output => $output, }); } } } sub tcpserver_client_disconnected { my ($kernel, $heap, $caller_state, $args) = @_[KERNEL, HEAP, CALLER_STATE, ARG0]; if (exists $heap->{statelist}->{eventsystem_input_clientdisconnect} && $heap->{statelist}->{eventsystem_input_clientdisconnect}) { $kernel->yield(eventsystem_input_clientdisconnect => { orig_caller_state => $caller_state, orig_args => $args, }); } } sub tcpserver_periodic30s_get_adc_all_allclients { my ($kernel, $heap) = @_[KERNEL, HEAP]; foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}}) { my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; next unless $client; next unless $client->{poeclient}; my $output = sprintf "%s adc all", POESERVER_CMD_PREFIX; $kernel->yield(tcpserver_output => { wheel_id => $wheel_id, output => $output, }); } } sub tcpserver_periodic22s_get_data_allclients { my ($kernel, $heap) = @_[KERNEL, HEAP]; my @cmds = ( 'loadavg', 'sysuptime', 'ntpsyspeer', 'adc all', 'i2csysutil', ); foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}}) { my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; next unless $client; next unless $client->{poeclient}; my @client_cmds = (); @client_cmds = (@client_cmds, @cmds); if ($client->{systemupgrade_requested}) { push @client_cmds, 'systemupgrade status'; } foreach my $cmd (@client_cmds) { my $output = sprintf "%s %s", POESERVER_CMD_PREFIX, $cmd; $kernel->delay_set(tcpserver_output => client_periodic_cmd_delay_rand() => { wheel_id => $wheel_id, output => $output, }); } } } sub tcpserver_periodic7s_get_rtt_allclients { my ($kernel, $heap) = @_[KERNEL, HEAP]; foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}}) { my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; next unless $client; next unless $client->{poeclient} || ($client->{websocket} && $heap->{statelist}->{tcpserver_output_websocket}); $kernel->delay_set(tcpserver_client_rtt => client_periodic_cmd_delay_rand() => $wheel_id); } } 1;