package POEDaemon::TCPServer::Main::Cmds::HTTP; use strict; use warnings FATAL => 'all'; no warnings 'redefine'; use Data::Dumper; use HTTP::Response; use HTTP::Status qw(:constants); use HTTP::Date; use HTML::Entities; use Protocol::WebSocket::Handshake::Server; use Protocol::WebSocket::Frame; use POSIX qw(strftime); #use Digest::SHA qw(sha256_hex); use String::Urandom; use URL::Encode qw(url_encode url_decode); use POE qw(Filter::HTTPD); use POEDaemon; sub debug_http_raw_io () { return undef; } my $poesession_mode = cfg->{poesession_mode} || ''; my $is_shcp; $is_shcp = 1 if $poesession_mode eq 'shcp'; sub is_shcp () { return $is_shcp; } my ($html_template, $shcp_inline_files); if (is_shcp) { use Cpanel::JSON::XS; $html_template = read_file 'httpd-static/smarthome-controlpanel.html'; if (cfg->{shcp}->{use_inline_files}) { foreach my $file (@{cfg->{shcp}->{inline_files} || []}) { $shcp_inline_files->{$file} = read_file $file; } } } else { $html_template = read_file 'httpd-static/poe-httpd.html'; } my ($have_mime_types, $mime_types); eval { require MIME::Types; }; unless ($@) { $have_mime_types = 1; $mime_types = MIME::Types->new; } { my $chars; push @$chars, chr foreach 33 .. 126; my $string_urandom = String::Urandom->new ( LENGTH => 128, CHARS => $chars, ); sub gen_sessid () { return $string_urandom->rand_string; } } sub states { return $_[0], [qw( tcpserver_client_commands_http_start tcpserver_client_commands_http tcpserver_output_http tcpserver_client_input_websocket tcpserver_output_websocket tcpserver_client_switch_to_websocket tcpserver_client_aio_send_file tcpserver_client_remove_http_filter tcpserver_client_add_http_filter tcpserver_client_http_response_sent tcpserver_client_aio_send_file_done tcpserver_periodic1s_client_websocket_color tcpserver_client_websocket_colorchanger tcpserver_httpclient_response tcpserver_periodic3s_session_gc )]; } sub tcpserver_client_commands_http_start { my ($kernel, $heap) = @_[KERNEL, HEAP]; #$heap->{periodic}->{tcpserver_periodic1s_client_websocket_color} = 1; $heap->{periodic}->{tcpserver_periodic3s_session_gc} = 3; $kernel->yield('tcpserver_client_websocket_colorchanger'); } sub tcpserver_client_commands_http { my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; return unless ref $args eq 'HASH'; my $wheel_id = $args->{wheel_id} || return; my $input = $args->{input}; return unless defined $input; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http}; if ($client->{websocket}) { $kernel->call($session => tcpserver_client_input_websocket => { wheel_id => $wheel_id, input => $input, }); return; } my $sock_wheel_id = $client->{sock_wheel_id}; my $wheel = $client->{wheel}; my $client_addr = $client->{client_addr}; my $client_port = $client->{client_port}; my $ssl = $client->{ssl}; my $cn = $client->{cn}; my $cipher = $client->{cipher}; return unless $wheel; my $no_output_log; if (ref $input eq 'HTTP::Response') { log_enabled && logline "[tcpserver #%s conn #%s%s%s] invalid HTTP input", $sock_wheel_id, $wheel_id, (cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''), (cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''); my $response_status = HTTP_BAD_REQUEST; my $response_content = "invalid request\n"; $wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent'); $kernel->call($session => tcpserver_output_http => { wheel_id => $wheel_id, status => $response_status, content => $response_content, no_log => $no_output_log, }); return; } my $request = $input; if (debug_http_raw_io) { log_enabled && logline "request:\n%s", Dumper $request; log_enabled && logline "request->as_string:\n%s", Dumper $request->as_string; } #log_enabled && logline "request->content:\n%s", Dumper $request->content; my $request_protocol = $request->protocol; my $request_method = $request->method; my $request_uri = $request->uri; my $request_content = $request->content; log_enabled && logline "[tcpserver #%s conn #%s%s%s] HTTP input = '%s' '%s' '%s'", $sock_wheel_id, $wheel_id, (cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''), (cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''), defined $request_method ? $request_method : 'n/a', defined $request_uri ? $request_uri : 'n/a', defined $request_protocol ? $request_protocol : 'n/a'; my $request_headers = {}; $request->headers->scan(sub { my ($header, $value) = @_; $request_headers->{$header} = $value; }); my ($request_filename, $request_query_string) = split '\?', $request_uri, 2; my $request_http_cookie = $request_headers->{Cookie}; $request_query_string = '' unless defined $request_query_string; $request_http_cookie = '' unless defined $request_http_cookie; my $request_args = {}; my $request_cookies = {}; foreach (split '&', $request_query_string) { next unless defined; my ($key, $val) = split '=', $_, 2; next unless defined $key && defined $val; $request_args->{$key} = $val; } foreach (split ';', $request_http_cookie) { next unless defined; my ($key, $val) = split '=', $_, 2; next unless defined $key && defined $val; foreach ($key, $val) { s/^\s+//; s/\s+$//; } $request_cookies->{$key} = $val; } my $useragent = $request_headers->{'User-Agent'}; $client->{http_clientinfo}->{useragent} = $useragent if !$client->{http_clientinfo}->{useragent} && $useragent && length $useragent <= 1024; my $client_useragent = $client->{http_clientinfo}->{useragent}; my ($response_status, $response_content, $response_content_type, $response_headers, $response_rawcontent, $websocket_handshake_request); if ($request_filename eq '/ws' || ($request_headers->{Connection} && $request_headers->{Connection} =~ /Upgrade/i) || ($request_headers->{Upgrade} && $request_headers->{Upgrade} =~ /WebSocket/i)) { $websocket_handshake_request = 1; log_enabled && logline "[tcpserver #%s conn #%s%s%s] WebSocket handshake request received", $sock_wheel_id, $wheel_id, (cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''), (cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''); my $websocket_handshake = $client->{websocket_handshake}; unless ($websocket_handshake) { $websocket_handshake = Protocol::WebSocket::Handshake::Server->new; $client->{websocket_handshake} = $websocket_handshake; } my $request_rawcontent = $request->as_string; $websocket_handshake->parse($request_rawcontent); if ($websocket_handshake->is_done) { $response_rawcontent = $websocket_handshake->to_string; delete $client->{websocket_handshake}; $wheel->event(FlushedEvent => 'tcpserver_client_switch_to_websocket'); } else { $response_status = HTTP_BAD_REQUEST; $response_content = "WebSocket handshake error\n"; } } else { if ($request_filename eq '/test') { my $gmt_time = time2str; $response_content = sprintf "test @ %s\n", $gmt_time; } elsif ($request_filename =~ m|^/static/([\w\./\-]{1,128})$|) { my $file = $1; $file =~ s|^/+||; $file =~ s|^|httpd-static/|; if ($file =~ m|\.\.|) { $response_status = HTTP_FORBIDDEN; $response_content = "not allowed\n"; } else { my $content_type; if ($file =~ /\.([a-z]{1,10})$/i) { my $extension = lc $1; #if ($extension eq 'ico') #{ # #$content_type = sprintf "application/x-shcp-%s", $extension; # $content_type = 'image/x-icon'; #} #elsif ($extension eq 'ogg') #{ # $content_type = 'audio/ogg'; #} #els if ($have_mime_types && ref $mime_types eq 'MIME::Types') { my $mime_type = $mime_types->mimeTypeOf($extension); if (ref $mime_type eq 'MIME::Type') { my $type = $mime_type->type; if ($type) { utf8::downgrade $type if utf8::is_utf8 $type; $content_type = $type; } } } } if (cfg->{shcp}->{use_inline_files}) { my $file_content = $shcp_inline_files->{$file}; if ($file_content) { log_enabled && logline "sending inline cached file '%s', content-type '%s'", $file, $content_type; #log_enabled && logline "file '%s', content sha256 hex '%s'", # $file, # sha256_hex $file_content; $response_content_type = $content_type; $response_headers->{'X-SHCP-Inline-Cached-File'} = $file; $response_content = $file_content; } else { log_enabled && logline "no content for inline file '%s' (content-type '%s')", $file, $content_type; $response_status = HTTP_NOT_FOUND; $response_content = sprintf "inline file '%s' not found\n", $file; } } else { log_enabled && logline "sending file '%s', content-type '%s'", $file, $content_type; $kernel->yield(aio_send_file_to_client => { wheel_id => $wheel_id, content_type => $content_type, file => $file, http_protocol => $request_protocol, http_connection => $request_headers->{Connection}, }); return; } } } else { if (is_shcp) { my ($auth, $auth_source, $request_shcp_password, $pass_source, $request_shcp_session_id, $sessid_source); my $shcp_ip_acl_re = cfg->{shcp}->{ip_acl}; my $shcp_password_re = cfg->{shcp}->{password}; if ($request_cookies->{auth}) { $pass_source = 'cookie'; $request_shcp_password = url_decode $request_cookies->{auth}; } elsif ($request_headers->{'X-SHCP-Auth'}) { $pass_source = 'header'; $request_shcp_password = $request_headers->{'X-SHCP-Auth'}; } elsif ($request_method eq 'POST' && $request_filename eq '/auth' && $request_content) { $pass_source = 'post'; my $jsonhash; eval { $jsonhash = decode_json $request_content; }; if ($@) { log_enabled && logline "'%s' '%s' json decode error", $request_method, $request_filename; } else { if (ref $jsonhash eq 'HASH') { my $jsonarg_auth = $jsonhash->{auth}; if ($jsonarg_auth) { $request_shcp_password = $jsonarg_auth; } } } } if ($pass_source) { log_enabled && logline "found password from '%s'", $pass_source; } if ($request_cookies->{sess}) { $sessid_source = 'cookie'; $request_shcp_session_id = url_decode $request_cookies->{sess}; } elsif ($request_headers->{'X-SHCP-Sess'}) { $sessid_source = 'header'; $request_shcp_session_id = $request_headers->{'X-SHCP-Sess'}; } elsif ($request_args->{sess}) { $sessid_source = 'get'; $request_shcp_session_id = url_decode $request_args->{sess}; } if ($sessid_source) { log_enabled && logline "found session id from '%s', id = %s", $sessid_source, dumper_oneline $request_shcp_session_id; } if ($client_addr && ref $shcp_ip_acl_re eq 'Regexp' && $client_addr =~ $shcp_ip_acl_re) { $auth_source = 'ip'; $auth = 1; } elsif ($request_shcp_session_id && $heap->{shcp}->{sessions}->{$request_shcp_session_id}) { $auth_source = 'session'; $auth = 1; $response_headers->{'Set-Cookie'} = 'auth=' if exists $request_cookies->{auth}; } elsif ($request_shcp_password && ref $shcp_password_re eq 'Regexp' && $request_shcp_password =~ $shcp_password_re) { $auth_source = 'password'; $auth = 1; my @sessions = keys %{$heap->{shcp}->{sessions} || {}}; my $session_count = $#sessions + 1; my $max_session_count = 100; if ($session_count >= $max_session_count) { log_enabled && logline "too many active sessions ('%s' >= '%s')", $session_count, $max_session_count; } else { my $sess_id = gen_sessid; log_enabled && logline "found '%s' active sessions, create new session '%s'", $session_count, $sess_id; $heap->{shcp}->{sessions}->{$sess_id} = { time => time_hires, client_addr => $client_addr, client_port => $client_port, client_useragent => $client_useragent, ssl => $ssl, cn => $cn, cipher => $cipher, }; log_enabled && logline "force request_shcp_session_id = '%s'", $sess_id; $request_shcp_session_id = $sess_id; my $sess_cookie = sprintf "sess=%s", url_encode $sess_id; log_enabled && logline "set session id cookie '%s'", $sess_cookie; $response_headers->{'Set-Cookie'} = $sess_cookie; } } if ($auth_source) { log_enabled && logline "auth reason '%s'", $auth_source; } if ($request_filename eq '/') { my $template = $html_template; my $http_host = $request_headers->{Host}; my $lib_urlbase = 'static'; my $full_url = $http_host ? sprintf "http%s://%s/", ($ssl ? 's' : ''), $http_host : '/'; my $ts = time; $template =~ s/%lib_urlbase%/$lib_urlbase/g; $template =~ s/%full_url%/$full_url/g; $template =~ s/%timestamp_seconds%/$ts/g; $response_content_type = 'text/html'; $response_content = $template; } elsif ($request_filename =~ /^\/powercontrol(?:\/(\w+)\/(on|off))?$/) { if ($auth) { my $item = $1; my $status = $2; my $result = $heap->{shcp}->{powercontrol} || {}; my $output = sprintf "%s shcppowercontrol", POECLIENT_CMD_PREFIX;; if ($item && $status) { $result->{$item} = $status; $output = sprintf "%s %s=%s", $output, $item, $status; } else { $output = sprintf "%s all", $output; } foreach my $curr_wheel_id (keys %{$heap->{tcpclient}->{connections}}) { next unless exists $heap->{tcpclient}->{connections}->{$curr_wheel_id}; next unless exists $heap->{tcpclient}->{connections}->{$curr_wheel_id}->{wheel}; $kernel->call($session => tcpclient_output => { wheel_id => $curr_wheel_id, output => $output, use_flush => 1, }); } $response_content_type = 'application/json'; $response_content = sprintf "%s\n", encode_json $result; } else { $response_status = HTTP_FORBIDDEN; $response_content = "power control - auth failed\n"; } } elsif ($request_filename =~ m|^/cache/([\w\-\.]{1,128})$|) { if ($auth) { my $item = $1; if ($item eq 'uprecords.exec') { $response_content = sprintf "uptime: %s\n", duration_exact time - $^T; } else { my $get_cached_items_map = cfg->{shcp}->{get_cached_items_map} || {}; my $url = $get_cached_items_map->{$item}; if ($url) { $kernel->yield(httpclient_request => { url => $url, response_event => 'tcpserver_httpclient_response', wheel_id => $wheel_id, protocol => $request_protocol, connection => $request_headers->{Connection}, headers => { 'X-SHCP-Cached-Item' => $item, }, client_addr => $client_addr, client_port => $client_port, client_useragent => $client_useragent, }); return; } else { $response_status = HTTP_NOT_FOUND; $response_content = "invalid cached item\n"; } } } else { $response_status = HTTP_FORBIDDEN; $response_content = "cached items - auth failed\n"; } } elsif ($request_filename eq '/auth') { if ($auth) { if ($request_shcp_session_id) { log_enabled && logline "send client response body '%s'", $request_shcp_session_id; $response_content = sprintf "%s\n", $request_shcp_session_id; } else { $response_content = "\n"; } } else { $response_status = HTTP_FORBIDDEN; $response_content = "auth failed\n"; } } else { $response_status = HTTP_NOT_FOUND; $response_content = "not found\n"; } } else { my $ts_hires = time_hires; my $gmt_time = time2str; my $name = 'POE HTTPD'; my @lines; push @lines, sprintf "%s reply @", $name; push @lines, sprintf "%s local", strftime "%F %T %z", localtime $ts_hires; push @lines, sprintf "%s", $gmt_time; push @lines, sprintf "hires timestamp = %s", $ts_hires; push @lines, ''; push @lines, sprintf "client ID #%s from %s port %s", $wheel_id, $client->{client_addr}, $client->{client_port}; push @lines, sprintf "using SSL cipher %s", $client->{cipher} || 'n/a'; push @lines, sprintf "connected %s", duration_exact $ts_hires - $client->{connect_time}; $response_content_type = 'text/html'; $response_content = sprintf $html_template, $name, join("
\n\n", @lines); } } } return unless $wheel; $wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent') unless $websocket_handshake_request; $kernel->call($session => tcpserver_output_http => { wheel_id => $wheel_id, status => $response_status, content => $response_content, content_type => $response_content_type, headers => $response_headers, rawcontent => $response_rawcontent, protocol => $request_protocol, connection => $request_headers->{Connection}, no_log => $no_output_log, }); } sub tcpserver_output_http { my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; return unless ref $args eq 'HASH'; my $wheel_id = $args->{wheel_id} || return; my $status = $args->{status}; my $content = $args->{content}; my $content_type = $args->{content_type}; my $headers = $args->{headers}; my $content_length = $args->{content_length}; my $last_modified = $args->{last_modified}; my $rawcontent = $args->{rawcontent}; my $protocol = $args->{protocol}; my $connection = $args->{connection}; my $no_log = $args->{no_log}; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http}; my $sock_wheel_id = $client->{sock_wheel_id}; my $wheel = $client->{wheel}; my $cn = $client->{cn}; my $cipher = $client->{cipher}; return unless $wheel; my $output; my $gmt_time = time2str; my $response; if (defined $content || defined $content_type || defined $content_length || defined $last_modified) { $response = HTTP::Response->new(HTTP_OK); my ($protocol_version, $length, $conn); if (defined $protocol && $protocol =~ m|^http/(\d+(?:\.\d+)?)$|i) { $protocol_version = $1; } else { $protocol_version = 1.0; } if (defined $content_length) { $length = $content_length; } elsif (defined $content) { $length = length $content; } if ($protocol_version >= 1.1) { if (defined $connection && $connection =~ /close/i) { $conn = 'close'; delete $client->{http_keepalive}; } else { $conn = 'keep-alive'; $client->{http_keepalive} = 1; } } else { $conn = 'close'; delete $client->{http_keepalive}; } $response->code($status) if defined $status; $response->protocol(sprintf "HTTP/%s", $protocol_version) if defined $protocol_version; $response->push_header ( 'Content-Type' => ($content_type || 'text/plain'), (defined $length ? ('Content-Length' => $length) : ()), 'Last-Modified' => (defined $last_modified ? $last_modified : $gmt_time), (defined $conn ? (Connection => $conn) : ()), 'Accept-Ranges' => 'bytes', (ref $headers eq 'HASH' ? %$headers : ()), ); #log_enabled && logline "content sha256 hex '%s'", defined $content ? sha256_hex $content : 'n/a'; $response->content($content) if defined $content; } elsif (defined $rawcontent) { $response = HTTP::Response->parse($rawcontent); } if ($response) { $response->push_header ( 'Server' => 'POE', 'Date' => $gmt_time, 'Expires' => time2str(0), 'Cache-Control' => 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0', 'Pragma' => 'no-cache', ); my $response_code = $response->code; my $response_message = $response->message; log_enabled && logline "[tcpserver #%s conn #%s%s%s] HTTP output = '%s' '%s'", $sock_wheel_id, $wheel_id, (cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''), (cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''), defined $response_code ? $response_code : 'n/a', defined $response_message ? $response_message : 'n/a'; if (debug_http_raw_io) { log_enabled && logline "response:\n%s", Dumper $response; log_enabled && logline "response->as_string:\n%s", Dumper $response->as_string; } $output = $response; } return unless $output && $wheel; $kernel->call($session => tcpserver_output => { wheel_id => $wheel_id, output => $output, no_log => $no_log, }); } sub tcpserver_client_input_websocket { my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; return unless ref $args eq 'HASH'; my $wheel_id = $args->{wheel_id} || return; my $input = $args->{input}; return unless defined $input; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http} && $client->{websocket}; my $sock_wheel_id = $client->{sock_wheel_id}; my $client_addr = $client->{client_addr}; my $cn = $client->{cn}; my $cipher = $client->{cipher}; my ($output, $no_output_log); my $websocket_frame = $client->{websocket_frame}; unless ($websocket_frame) { $websocket_frame = Protocol::WebSocket::Frame->new; $client->{websocket_frame} = $websocket_frame; } $websocket_frame->append($input); while (my $websocket_input_message = $websocket_frame->next) { my $frame_type = websocket_frame_type $websocket_frame; $client->{input_message_count} = 0 unless defined $client->{input_message_count}; $client->{last_input_message_speed} = 0 unless defined $client->{last_input_message_speed}; $client->{input_message_count}++; log_enabled && logline "[tcpserver #%s conn #%s] WebSocket input '%s' message = '%s'", $sock_wheel_id, $wheel_id, $frame_type, ($frame_type eq 'text' ? $websocket_input_message : hexval $websocket_input_message); if ($frame_type eq 'text') { if ($websocket_input_message =~ /^\s*ready(?:\s+(.{0,1024}))?\s*$/i) { my $ready_args = $1; if (defined $ready_args) { foreach (split /\s+/, $ready_args) { next unless /^\s*(rtt|color)\s*$/i; my $type = lc $1; $client->{websocket_readyargs}->{$type} = 1; } } } elsif ($websocket_input_message =~ /^\s*rtt\s+(\d+(?:\.\d+)?)\s*$/i) { my $response_time = $1; my $rtt = time_hires - $response_time; $client->{websocket_rtt} = $rtt; if (!defined $client->{websocket_rtt_max} || $client->{websocket_rtt_max} < $rtt) { $client->{websocket_rtt_max} = $rtt; } if (!defined $client->{websocket_rtt_min} || $client->{websocket_rtt_min} > $rtt) { $client->{websocket_rtt_min} = $rtt; } log_enabled && logline "[tcpserver #%s conn #%s] client rtt = %.3f ms, one way = ~ %.3f ms", $sock_wheel_id, $wheel_id, $rtt * 1000, ($rtt / 2) * 1000; } } elsif ($frame_type eq 'close') { log_enabled && logline "[tcpserver #%s conn #%s%s%s] disconnect, received WebSocket close", $sock_wheel_id, $wheel_id, (defined $cn ? sprintf " cn %s", $cn : ''), (defined $cipher ? sprintf " cipher %s", $cipher : ''); my $reason = 'received WebSocket close'; $kernel->call($session => tcpserver_client_kick => { wheel_id => $wheel_id, reason => $reason, }); return; } } return unless defined $output && $client->{wheel}; $kernel->call($session => tcpserver_output_websocket => { wheel_id => $wheel_id, output => $output, no_log => $no_output_log, }); } sub tcpserver_output_websocket { my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; return unless ref $args eq 'HASH'; my $wheel_id = $args->{wheel_id} || return; my $output = $args->{output}; my $no_log = $args->{no_log}; return unless defined $output; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http} && $client->{websocket}; my $sock_wheel_id = $client->{sock_wheel_id}; my $type = 'text'; if (ref $output eq 'HASH' && %$output && $output->{type} && $output->{type} eq 'ping') { $type = $output->{type}; $output = ' '; } my $websocket_frame = Protocol::WebSocket::Frame->new ( type => $type, buffer => $output, ); my $raw_output = $websocket_frame->to_bytes; unless ($no_log) { log_enabled && logline "[tcpserver #%s conn #%s] WebSocket output '%s' message = '%s'", $sock_wheel_id, $wheel_id, $type, ($type eq 'text' ? $output : hexval $output); } return unless defined $raw_output && $client->{wheel}; $client->{output_message_count} = 0 unless defined $client->{output_message_count}; $client->{last_output_message_speed} = 0 unless defined $client->{last_output_message_speed}; $client->{output_message_count}++; $kernel->call($session => tcpserver_output => { wheel_id => $wheel_id, output => $raw_output, no_log => $no_log, }); } sub tcpserver_client_switch_to_websocket { my ($kernel, $session, $heap, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0]; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $sock_wheel_id = $client->{sock_wheel_id}; my $http = $client->{http}; my $wheel = $client->{wheel}; return unless $sock_wheel_id && $http && $wheel; $wheel->event('FlushedEvent'); unless ($kernel->call($session => tcpserver_client_remove_http_filter => { wheel_id => $wheel_id })) { log_enabled && logline 'error calling tcpserver_client_remove_http_filter'; return; } $client->{websocket} = 1; $client->{client_max_idle_time} = 10; $client->{client_max_idle_time_disconnect} = 30; } sub tcpserver_client_aio_send_file { my ($kernel, $session, $heap, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0]; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $sock_wheel_id = $client->{sock_wheel_id}; my $http = $client->{http}; my $wheel = $client->{wheel}; return unless $sock_wheel_id && $http && $wheel; $wheel->event('FlushedEvent'); unless ($kernel->call($session => tcpserver_client_remove_http_filter => { wheel_id => $wheel_id })) { log_enabled && logline 'error calling tcpserver_client_remove_http_filter'; return; } my $aio_args = $client->{aio_args}; unless (ref $aio_args eq 'HASH') { log_enabled && logline 'invalid aio_args'; return; } $kernel->yield(aio_read_file => $aio_args); } sub tcpserver_client_remove_http_filter { my ($heap, $args) = @_[HEAP, ARG0]; unless (ref $args eq 'HASH') { log_enabled && logline 'args not HASHref'; return; } my $wheel_id = $args->{wheel_id}; unless ($wheel_id) { log_enabled && logline 'invalid wheel_id'; return; } my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; unless ($client) { log_enabled && logline 'no client'; return; } my $sock_wheel_id = $client->{sock_wheel_id}; my $http = $client->{http}; my $wheel = $client->{wheel}; unless ($sock_wheel_id && $http && $wheel) { log_enabled && logline 'no sock_wheel_id|http|wheel'; return; } my $input_filter_stack = $wheel->get_input_filter; if (ref $input_filter_stack eq 'POE::Filter::Stackable') { my @input_filters = $input_filter_stack->filters; if (ref $input_filters[$#input_filters] eq 'POE::Filter::HTTPD') { my $last_popped_input_filter = $input_filter_stack->pop; log_enabled && logline "[tcpserver #%s conn #%s] remove HTTP filter ('%s' -> pop -> '%s')", $sock_wheel_id, $wheel_id, ref $input_filter_stack, ref $last_popped_input_filter; return 1; } else { log_enabled && logline "last element in input_filters is not POE::Filter::HTTPD ref\n%s", Dumper \@input_filters; return; } } else { log_enabled && logline "input_filter_stack is not POE::Filter::Stackable ref\n%s", Dumper $input_filter_stack; return; } } sub tcpserver_client_add_http_filter { my ($heap, $args) = @_[HEAP, ARG0]; return unless ref $args eq 'HASH'; my $wheel_id = $args->{wheel_id} || return; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client; my $sock_wheel_id = $client->{sock_wheel_id}; my $http = $client->{http}; my $wheel = $client->{wheel}; return unless $sock_wheel_id && $http && $wheel; my $input_filter_stack = $wheel->get_input_filter; if (ref $input_filter_stack eq 'POE::Filter::Stackable') { my @input_filters = $input_filter_stack->filters; if (ref $input_filters[$#input_filters] ne 'POE::Filter::HTTPD') { my $poe_filter_httpd_new_params = $client->{poe_filter_httpd_new_params} || {}; my $poe_filter_httpd = POE::Filter::HTTPD->new(%$poe_filter_httpd_new_params); my $last_popped_input_filter = $input_filter_stack->push($poe_filter_httpd); log_enabled && logline "[tcpserver #%s conn #%s] add HTTP filter ('%s' -> push -> '%s'), poe_filter_httpd_new_params = %s", $sock_wheel_id, $wheel_id, ref $input_filter_stack, ref $poe_filter_httpd, dumper_oneline $poe_filter_httpd_new_params; return 1; } } } sub tcpserver_client_http_response_sent { my ($kernel, $session, $heap, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0]; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http}; my $wheel = $client->{wheel}; return unless $wheel; $wheel->event('FlushedEvent'); unless ($client->{http_keepalive}) { my $reason = 'http close'; $kernel->call($session => tcpserver_client_kick => { wheel_id => $wheel_id, reason => $reason, }); return; } if (ref $client->{aio_args} eq 'HASH') { delete $client->{aio_args}; unless ($kernel->call($session => tcpserver_client_add_http_filter => { wheel_id => $wheel_id })) { log_enabled && logline 'error calling tcpserver_client_add_http_filter'; return; } } } sub tcpserver_client_aio_send_file_done { my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; return unless ref $args eq 'HASH'; my $wheel_id = $args->{wheel_id} || return; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http}; my $wheel = $client->{wheel}; return unless $wheel; if ($wheel->get_driver_out_octets) { log_enabled && logline 'found unflushed data, adding FlushedEvent => tcpserver_client_http_response_sent'; $wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent'); return; } unless ($client->{http_keepalive}) { my $reason = 'http close'; $kernel->call($session => tcpserver_client_kick => { wheel_id => $wheel_id, reason => $reason, }); return; } if (ref $client->{aio_args} eq 'HASH') { delete $client->{aio_args}; unless ($kernel->call($session => tcpserver_client_add_http_filter => { wheel_id => $wheel_id })) { log_enabled && logline 'error calling tcpserver_client_add_http_filter'; return; } } } sub tcpserver_periodic1s_client_websocket_color { my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP]; foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}}) { my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; next unless $client && $client->{websocket}; for (my $i = 0; $i < 100; $i++) { #my $output = 'FF0000'; my $output = sprintf "%02X%02X%02X", int(rand() * 255), int(rand() * 255), int(rand() * 255); $kernel->call($session => tcpserver_output_websocket => { wheel_id => $wheel_id, output => $output, }); } } } sub tcpserver_client_websocket_colorchanger { my ($kernel, $session, $heap, $state) = @_[KERNEL, SESSION, HEAP, STATE]; foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}}) { my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; next unless $client && $client->{websocket} && exists $client->{websocket_readyargs} && $client->{websocket_readyargs}->{color}; $heap->{websocket_colorchanger} = {} unless exists $heap->{websocket_colorchanger}; my $colors = $heap->{websocket_colorchanger}; my $value_min = 0; my $value_max = 255; $colors->{value} = $value_min unless defined $colors->{value}; $colors->{inc} = 1 unless defined $colors->{inc}; $colors->{curr_cval} = 0 unless defined $colors->{curr_cval}; $colors->{cvalues} = { 0 => $value_min, 1 => $value_min, 2 => $value_min } unless defined $colors->{cvalues}; my $cvalues = $colors->{cvalues}; my $step_up = 1; my $step_down = 1; #if ($cvalues->{$colors->{curr_cval}} == $value_max) #if ($cvalues->{$colors->{curr_cval}} == 127) #if ($cvalues->{$colors->{curr_cval}} == int($value_max / 2)) if ($cvalues->{$colors->{curr_cval}} == $value_min || $cvalues->{$colors->{curr_cval}} == int($value_max / 2) || $cvalues->{$colors->{curr_cval}} == $value_max) #if ($cvalues->{$colors->{curr_cval}} == $value_min) #if ($cvalues->{$colors->{curr_cval}} =~ /^(?:127|255)$/) #if ($cvalues->{$colors->{curr_cval}} =~ /^(?:0|127|255)$/) #if ($cvalues->{$colors->{curr_cval}} == int(rand() * 256)) { $colors->{inc} = int(rand() * 2); #$colors->{curr_cval}++; $colors->{curr_cval} = int(rand() * 3); #if ($colors->{curr_cval} > 2) #{ # $colors->{curr_cval} = 0; #} } if ($colors->{inc}) { $colors->{value} += $step_up; $cvalues->{$colors->{curr_cval}} += $step_up; } else { $colors->{value} -= $step_down; $cvalues->{$colors->{curr_cval}} -= $step_up; } if ($colors->{value} < $value_min) { $colors->{value} = $value_min; } elsif ($colors->{value} > $value_max) { $colors->{value} = $value_max; } if ($cvalues->{$colors->{curr_cval}} < $value_min) { $cvalues->{$colors->{curr_cval}} = $value_min; } elsif ($cvalues->{$colors->{curr_cval}} > $value_max) { $cvalues->{$colors->{curr_cval}} = $value_max; } if ($cvalues->{$colors->{curr_cval}} == $value_min) { $colors->{inc} = 1; } elsif ($cvalues->{$colors->{curr_cval}} == $value_max) { $colors->{inc} = 0; } my $output = sprintf "%02X%02X%02X", #int(rand() * $value_max), #$value_max, #$colors->{value}, $cvalues->{0}, #int(rand() * $value_max), #$value_max, #$colors->{value}, $cvalues->{1}, #int(rand() * $value_max); #$value_max; #$colors->{value}; $cvalues->{2}; $kernel->call($session => tcpserver_output_websocket => { wheel_id => $wheel_id, output => $output, }); } unless ($heap->{shutdown}) { $kernel->delay($state => 0.025000); } } sub tcpserver_httpclient_response { my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; return unless ref $args eq 'HASH'; my $orig_args = $args->{orig_args} || return; my $wheel_id = $orig_args->{wheel_id} || return; my $headers = $orig_args->{headers}; my $protocol = $orig_args->{protocol}; my $connection = $orig_args->{connection}; my $content = $args->{content}; my $content_type = $args->{content_type}; return unless defined $content; #log_enabled && logline Dumper #{ # headers => $headers, # content_sha256_hex => sha256_hex $content, #}; my $client = $heap->{tcpserver}->{connections}->{$wheel_id}; return unless $client && $client->{http}; my $wheel = $client->{wheel}; return unless $wheel; $wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent'); $kernel->call($session => tcpserver_output_http => { wheel_id => $wheel_id, content => $content, content_type => $content_type, headers => $headers, protocol => $protocol, connection => $connection, no_log => 1, }); } sub tcpserver_periodic3s_session_gc { my $heap = $_[HEAP]; return unless $heap->{shcp}->{sessions}; my @sessions = keys %{$heap->{shcp}->{sessions} || {}}; my $session_count = $#sessions + 1; my $max_session_count = 50; return if $session_count <= $max_session_count; my $session_times = {}; foreach my $session_id (@sessions) { my $session = $heap->{shcp}->{sessions}->{$session_id}; #log_enabled && logline "session '%s':\n%s", # $session_id, # Dumper $session; next unless ref $session eq 'HASH'; my $session_time = $session->{time}; next unless $session_time; $session_times->{$session_time} = $session_id; } my @session_times = sort keys %$session_times; #log_enabled && logline "session_times:\n%s", Dumper \@session_times; if (@session_times) { my $session_time = $session_times[0]; my $session_id = $session_times->{$session_time}; #log_enabled && logline "oldest session id: '%s'", $session_id; log_enabled && logline "session count '%s', deleting oldest session '%s'", $session_count, $session_id; delete $heap->{shcp}->{sessions}->{$session_id}; } } 1;