package POEDaemon::TCPServer::Main::Cmds::HTTP;
use strict;
use warnings FATAL => 'all';
no warnings 'redefine';
use Data::Dumper;
use HTTP::Response;
use HTTP::Status qw(:constants);
use HTTP::Date;
use HTML::Entities;
use Protocol::WebSocket::Handshake::Server;
use Protocol::WebSocket::Frame;
use POSIX qw(strftime);
#use Digest::SHA qw(sha256_hex);
use String::Urandom;
use URL::Encode qw(url_encode url_decode);
use POE qw(Filter::HTTPD);
use POEDaemon;
sub debug_http_raw_io () { return undef; }
my $poesession_mode = cfg->{poesession_mode} || '';
my $is_shcp;
$is_shcp = 1 if $poesession_mode eq 'shcp';
sub is_shcp () { return $is_shcp; }
my ($html_template, $shcp_inline_files);
if (is_shcp)
{
use Cpanel::JSON::XS;
$html_template = read_file 'httpd-static/smarthome-controlpanel.html';
if (cfg->{shcp}->{use_inline_files})
{
foreach my $file (@{cfg->{shcp}->{inline_files} || []})
{
$shcp_inline_files->{$file} = read_file $file;
}
}
}
else
{
$html_template = read_file 'httpd-static/poe-httpd.html';
}
my ($have_mime_types, $mime_types);
eval
{
require MIME::Types;
};
unless ($@)
{
$have_mime_types = 1;
$mime_types = MIME::Types->new;
}
{
my $chars;
push @$chars, chr foreach 33 .. 126;
my $string_urandom = String::Urandom->new
(
LENGTH => 128,
CHARS => $chars,
);
sub gen_sessid () { return $string_urandom->rand_string; }
}
sub states
{
return $_[0],
[qw(
tcpserver_client_commands_http_start
tcpserver_client_commands_http
tcpserver_output_http
tcpserver_client_input_websocket
tcpserver_output_websocket
tcpserver_client_switch_to_websocket
tcpserver_client_aio_send_file
tcpserver_client_remove_http_filter
tcpserver_client_add_http_filter
tcpserver_client_http_response_sent
tcpserver_client_aio_send_file_done
tcpserver_periodic1s_client_websocket_color
tcpserver_client_websocket_colorchanger
tcpserver_httpclient_response
tcpserver_periodic3s_session_gc
)];
}
sub tcpserver_client_commands_http_start
{
my ($kernel, $heap) = @_[KERNEL, HEAP];
#$heap->{periodic}->{tcpserver_periodic1s_client_websocket_color} = 1;
$heap->{periodic}->{tcpserver_periodic3s_session_gc} = 3;
$kernel->yield('tcpserver_client_websocket_colorchanger');
}
sub tcpserver_client_commands_http
{
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
return unless ref $args eq 'HASH';
my $wheel_id = $args->{wheel_id} || return;
my $input = $args->{input};
return unless defined $input;
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http};
if ($client->{websocket})
{
$kernel->call($session => tcpserver_client_input_websocket =>
{
wheel_id => $wheel_id,
input => $input,
});
return;
}
my $sock_wheel_id = $client->{sock_wheel_id};
my $wheel = $client->{wheel};
my $client_addr = $client->{client_addr};
my $client_port = $client->{client_port};
my $ssl = $client->{ssl};
my $cn = $client->{cn};
my $cipher = $client->{cipher};
return unless $wheel;
my $no_output_log;
if (ref $input eq 'HTTP::Response')
{
log_enabled && logline "[tcpserver #%s conn #%s%s%s] invalid HTTP input",
$sock_wheel_id,
$wheel_id,
(cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''),
(cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : '');
my $response_status = HTTP_BAD_REQUEST;
my $response_content = "invalid request\n";
$wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent');
$kernel->call($session => tcpserver_output_http =>
{
wheel_id => $wheel_id,
status => $response_status,
content => $response_content,
no_log => $no_output_log,
});
return;
}
my $request = $input;
if (debug_http_raw_io)
{
log_enabled && logline "request:\n%s", Dumper $request;
log_enabled && logline "request->as_string:\n%s", Dumper $request->as_string;
}
#log_enabled && logline "request->content:\n%s", Dumper $request->content;
my $request_protocol = $request->protocol;
my $request_method = $request->method;
my $request_uri = $request->uri;
my $request_content = $request->content;
log_enabled && logline "[tcpserver #%s conn #%s%s%s] HTTP input = '%s' '%s' '%s'",
$sock_wheel_id,
$wheel_id,
(cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''),
(cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''),
defined $request_method ? $request_method : 'n/a',
defined $request_uri ? $request_uri : 'n/a',
defined $request_protocol ? $request_protocol : 'n/a';
my $request_headers = {};
$request->headers->scan(sub
{
my ($header, $value) = @_;
$request_headers->{$header} = $value;
});
my ($request_filename, $request_query_string) = split '\?', $request_uri, 2;
my $request_http_cookie = $request_headers->{Cookie};
$request_query_string = '' unless defined $request_query_string;
$request_http_cookie = '' unless defined $request_http_cookie;
my $request_args = {};
my $request_cookies = {};
foreach (split '&', $request_query_string)
{
next unless defined;
my ($key, $val) = split '=', $_, 2;
next unless defined $key && defined $val;
$request_args->{$key} = $val;
}
foreach (split ';', $request_http_cookie)
{
next unless defined;
my ($key, $val) = split '=', $_, 2;
next unless defined $key && defined $val;
foreach ($key, $val)
{
s/^\s+//;
s/\s+$//;
}
$request_cookies->{$key} = $val;
}
my $useragent = $request_headers->{'User-Agent'};
$client->{http_clientinfo}->{useragent} = $useragent if !$client->{http_clientinfo}->{useragent} && $useragent && length $useragent <= 1024;
my $client_useragent = $client->{http_clientinfo}->{useragent};
my ($response_status, $response_content, $response_content_type, $response_headers, $response_rawcontent, $websocket_handshake_request);
if ($request_filename eq '/ws' || ($request_headers->{Connection} && $request_headers->{Connection} =~ /Upgrade/i) || ($request_headers->{Upgrade} && $request_headers->{Upgrade} =~ /WebSocket/i))
{
$websocket_handshake_request = 1;
log_enabled && logline "[tcpserver #%s conn #%s%s%s] WebSocket handshake request received",
$sock_wheel_id,
$wheel_id,
(cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''),
(cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : '');
my $websocket_handshake = $client->{websocket_handshake};
unless ($websocket_handshake)
{
$websocket_handshake = Protocol::WebSocket::Handshake::Server->new;
$client->{websocket_handshake} = $websocket_handshake;
}
my $request_rawcontent = $request->as_string;
$websocket_handshake->parse($request_rawcontent);
if ($websocket_handshake->is_done)
{
$response_rawcontent = $websocket_handshake->to_string;
delete $client->{websocket_handshake};
$wheel->event(FlushedEvent => 'tcpserver_client_switch_to_websocket');
}
else
{
$response_status = HTTP_BAD_REQUEST;
$response_content = "WebSocket handshake error\n";
}
}
else
{
if ($request_filename eq '/test')
{
my $gmt_time = time2str;
$response_content = sprintf "test @ %s\n", $gmt_time;
}
elsif ($request_filename =~ m|^/static/([\w\./\-]{1,128})$|)
{
my $file = $1;
$file =~ s|^/+||;
$file =~ s|^|httpd-static/|;
if ($file =~ m|\.\.|)
{
$response_status = HTTP_FORBIDDEN;
$response_content = "not allowed\n";
}
else
{
my $content_type;
if ($file =~ /\.([a-z]{1,10})$/i)
{
my $extension = lc $1;
#if ($extension eq 'ico')
#{
# #$content_type = sprintf "application/x-shcp-%s", $extension;
# $content_type = 'image/x-icon';
#}
#elsif ($extension eq 'ogg')
#{
# $content_type = 'audio/ogg';
#}
#els
if ($have_mime_types && ref $mime_types eq 'MIME::Types')
{
my $mime_type = $mime_types->mimeTypeOf($extension);
if (ref $mime_type eq 'MIME::Type')
{
my $type = $mime_type->type;
if ($type)
{
utf8::downgrade $type if utf8::is_utf8 $type;
$content_type = $type;
}
}
}
}
if (cfg->{shcp}->{use_inline_files})
{
my $file_content = $shcp_inline_files->{$file};
if ($file_content)
{
log_enabled && logline "sending inline cached file '%s', content-type '%s'",
$file,
$content_type;
#log_enabled && logline "file '%s', content sha256 hex '%s'",
# $file,
# sha256_hex $file_content;
$response_content_type = $content_type;
$response_headers->{'X-SHCP-Inline-Cached-File'} = $file;
$response_content = $file_content;
}
else
{
log_enabled && logline "no content for inline file '%s' (content-type '%s')",
$file,
$content_type;
$response_status = HTTP_NOT_FOUND;
$response_content = sprintf "inline file '%s' not found\n", $file;
}
}
else
{
log_enabled && logline "sending file '%s', content-type '%s'",
$file,
$content_type;
$kernel->yield(aio_send_file_to_client =>
{
wheel_id => $wheel_id,
content_type => $content_type,
file => $file,
http_protocol => $request_protocol,
http_connection => $request_headers->{Connection},
});
return;
}
}
}
else
{
if (is_shcp)
{
my ($auth, $auth_source, $request_shcp_password, $pass_source, $request_shcp_session_id, $sessid_source);
my $shcp_ip_acl_re = cfg->{shcp}->{ip_acl};
my $shcp_password_re = cfg->{shcp}->{password};
if ($request_cookies->{auth})
{
$pass_source = 'cookie';
$request_shcp_password = url_decode $request_cookies->{auth};
}
elsif ($request_headers->{'X-SHCP-Auth'})
{
$pass_source = 'header';
$request_shcp_password = $request_headers->{'X-SHCP-Auth'};
}
elsif ($request_method eq 'POST' && $request_filename eq '/auth' && $request_content)
{
$pass_source = 'post';
my $jsonhash;
eval
{
$jsonhash = decode_json $request_content;
};
if ($@)
{
log_enabled && logline "'%s' '%s' json decode error",
$request_method,
$request_filename;
}
else
{
if (ref $jsonhash eq 'HASH')
{
my $jsonarg_auth = $jsonhash->{auth};
if ($jsonarg_auth)
{
$request_shcp_password = $jsonarg_auth;
}
}
}
}
if ($pass_source)
{
log_enabled && logline "found password from '%s'", $pass_source;
}
if ($request_cookies->{sess})
{
$sessid_source = 'cookie';
$request_shcp_session_id = url_decode $request_cookies->{sess};
}
elsif ($request_headers->{'X-SHCP-Sess'})
{
$sessid_source = 'header';
$request_shcp_session_id = $request_headers->{'X-SHCP-Sess'};
}
elsif ($request_args->{sess})
{
$sessid_source = 'get';
$request_shcp_session_id = url_decode $request_args->{sess};
}
if ($sessid_source)
{
log_enabled && logline "found session id from '%s', id = %s",
$sessid_source,
dumper_oneline $request_shcp_session_id;
}
if ($client_addr && ref $shcp_ip_acl_re eq 'Regexp' && $client_addr =~ $shcp_ip_acl_re)
{
$auth_source = 'ip';
$auth = 1;
}
elsif ($request_shcp_session_id && $heap->{shcp}->{sessions}->{$request_shcp_session_id})
{
$auth_source = 'session';
$auth = 1;
$response_headers->{'Set-Cookie'} = 'auth=' if exists $request_cookies->{auth};
}
elsif ($request_shcp_password && ref $shcp_password_re eq 'Regexp' && $request_shcp_password =~ $shcp_password_re)
{
$auth_source = 'password';
$auth = 1;
my @sessions = keys %{$heap->{shcp}->{sessions} || {}};
my $session_count = $#sessions + 1;
my $max_session_count = 100;
if ($session_count >= $max_session_count)
{
log_enabled && logline "too many active sessions ('%s' >= '%s')",
$session_count,
$max_session_count;
}
else
{
my $sess_id = gen_sessid;
log_enabled && logline "found '%s' active sessions, create new session '%s'",
$session_count,
$sess_id;
$heap->{shcp}->{sessions}->{$sess_id} =
{
time => time_hires,
client_addr => $client_addr,
client_port => $client_port,
client_useragent => $client_useragent,
ssl => $ssl,
cn => $cn,
cipher => $cipher,
};
log_enabled && logline "force request_shcp_session_id = '%s'", $sess_id;
$request_shcp_session_id = $sess_id;
my $sess_cookie = sprintf "sess=%s", url_encode $sess_id;
log_enabled && logline "set session id cookie '%s'", $sess_cookie;
$response_headers->{'Set-Cookie'} = $sess_cookie;
}
}
if ($auth_source)
{
log_enabled && logline "auth reason '%s'", $auth_source;
}
if ($request_filename eq '/')
{
my $template = $html_template;
my $http_host = $request_headers->{Host};
my $lib_urlbase = 'static';
my $full_url = $http_host ? sprintf "http%s://%s/", ($ssl ? 's' : ''), $http_host : '/';
my $ts = time;
$template =~ s/%lib_urlbase%/$lib_urlbase/g;
$template =~ s/%full_url%/$full_url/g;
$template =~ s/%timestamp_seconds%/$ts/g;
$response_content_type = 'text/html';
$response_content = $template;
}
elsif ($request_filename =~ /^\/powercontrol(?:\/(\w+)\/(on|off))?$/)
{
if ($auth)
{
my $item = $1;
my $status = $2;
my $result = $heap->{shcp}->{powercontrol} || {};
my $output = sprintf "%s shcppowercontrol", POECLIENT_CMD_PREFIX;;
if ($item && $status)
{
$result->{$item} = $status;
$output = sprintf "%s %s=%s",
$output,
$item,
$status;
}
else
{
$output = sprintf "%s all", $output;
}
foreach my $curr_wheel_id (keys %{$heap->{tcpclient}->{connections}})
{
next unless exists $heap->{tcpclient}->{connections}->{$curr_wheel_id};
next unless exists $heap->{tcpclient}->{connections}->{$curr_wheel_id}->{wheel};
$kernel->call($session => tcpclient_output =>
{
wheel_id => $curr_wheel_id,
output => $output,
use_flush => 1,
});
}
$response_content_type = 'application/json';
$response_content = sprintf "%s\n", encode_json $result;
}
else
{
$response_status = HTTP_FORBIDDEN;
$response_content = "power control - auth failed\n";
}
}
elsif ($request_filename =~ m|^/cache/([\w\-\.]{1,128})$|)
{
if ($auth)
{
my $item = $1;
if ($item eq 'uprecords.exec')
{
$response_content = sprintf "uptime: %s\n", duration_exact time - $^T;
}
else
{
my $get_cached_items_map = cfg->{shcp}->{get_cached_items_map} || {};
my $url = $get_cached_items_map->{$item};
if ($url)
{
$kernel->yield(httpclient_request =>
{
url => $url,
response_event => 'tcpserver_httpclient_response',
wheel_id => $wheel_id,
protocol => $request_protocol,
connection => $request_headers->{Connection},
headers =>
{
'X-SHCP-Cached-Item' => $item,
},
client_addr => $client_addr,
client_port => $client_port,
client_useragent => $client_useragent,
});
return;
}
else
{
$response_status = HTTP_NOT_FOUND;
$response_content = "invalid cached item\n";
}
}
}
else
{
$response_status = HTTP_FORBIDDEN;
$response_content = "cached items - auth failed\n";
}
}
elsif ($request_filename eq '/auth')
{
if ($auth)
{
if ($request_shcp_session_id)
{
log_enabled && logline "send client response body '%s'", $request_shcp_session_id;
$response_content = sprintf "%s\n", $request_shcp_session_id;
}
else
{
$response_content = "\n";
}
}
else
{
$response_status = HTTP_FORBIDDEN;
$response_content = "auth failed\n";
}
}
else
{
$response_status = HTTP_NOT_FOUND;
$response_content = "not found\n";
}
}
else
{
my $ts_hires = time_hires;
my $gmt_time = time2str;
my $name = 'POE HTTPD';
my @lines;
push @lines, sprintf "%s reply @", $name;
push @lines, sprintf "%s local", strftime "%F %T %z", localtime $ts_hires;
push @lines, sprintf "%s", $gmt_time;
push @lines, sprintf "hires timestamp = %s", $ts_hires;
push @lines, '';
push @lines, sprintf "client ID #%s from %s port %s",
$wheel_id,
$client->{client_addr},
$client->{client_port};
push @lines, sprintf "using SSL cipher %s", $client->{cipher} || 'n/a';
push @lines, sprintf "connected %s", duration_exact $ts_hires - $client->{connect_time};
$response_content_type = 'text/html';
$response_content = sprintf $html_template,
$name,
join("
\n\n", @lines);
}
}
}
return unless $wheel;
$wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent') unless $websocket_handshake_request;
$kernel->call($session => tcpserver_output_http =>
{
wheel_id => $wheel_id,
status => $response_status,
content => $response_content,
content_type => $response_content_type,
headers => $response_headers,
rawcontent => $response_rawcontent,
protocol => $request_protocol,
connection => $request_headers->{Connection},
no_log => $no_output_log,
});
}
sub tcpserver_output_http
{
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
return unless ref $args eq 'HASH';
my $wheel_id = $args->{wheel_id} || return;
my $status = $args->{status};
my $content = $args->{content};
my $content_type = $args->{content_type};
my $headers = $args->{headers};
my $content_length = $args->{content_length};
my $last_modified = $args->{last_modified};
my $rawcontent = $args->{rawcontent};
my $protocol = $args->{protocol};
my $connection = $args->{connection};
my $no_log = $args->{no_log};
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http};
my $sock_wheel_id = $client->{sock_wheel_id};
my $wheel = $client->{wheel};
my $cn = $client->{cn};
my $cipher = $client->{cipher};
return unless $wheel;
my $output;
my $gmt_time = time2str;
my $response;
if (defined $content || defined $content_type || defined $content_length || defined $last_modified)
{
$response = HTTP::Response->new(HTTP_OK);
my ($protocol_version, $length, $conn);
if (defined $protocol && $protocol =~ m|^http/(\d+(?:\.\d+)?)$|i)
{
$protocol_version = $1;
}
else
{
$protocol_version = 1.0;
}
if (defined $content_length)
{
$length = $content_length;
}
elsif (defined $content)
{
$length = length $content;
}
if ($protocol_version >= 1.1)
{
if (defined $connection && $connection =~ /close/i)
{
$conn = 'close';
delete $client->{http_keepalive};
}
else
{
$conn = 'keep-alive';
$client->{http_keepalive} = 1;
}
}
else
{
$conn = 'close';
delete $client->{http_keepalive};
}
$response->code($status) if defined $status;
$response->protocol(sprintf "HTTP/%s", $protocol_version) if defined $protocol_version;
$response->push_header
(
'Content-Type' => ($content_type || 'text/plain'),
(defined $length ? ('Content-Length' => $length) : ()),
'Last-Modified' => (defined $last_modified ? $last_modified : $gmt_time),
(defined $conn ? (Connection => $conn) : ()),
'Accept-Ranges' => 'bytes',
(ref $headers eq 'HASH' ? %$headers : ()),
);
#log_enabled && logline "content sha256 hex '%s'", defined $content ? sha256_hex $content : 'n/a';
$response->content($content) if defined $content;
}
elsif (defined $rawcontent)
{
$response = HTTP::Response->parse($rawcontent);
}
if ($response)
{
$response->push_header
(
'Server' => 'POE',
'Date' => $gmt_time,
'Expires' => time2str(0),
'Cache-Control' => 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0',
'Pragma' => 'no-cache',
);
my $response_code = $response->code;
my $response_message = $response->message;
log_enabled && logline "[tcpserver #%s conn #%s%s%s] HTTP output = '%s' '%s'",
$sock_wheel_id,
$wheel_id,
(cfg->{log_sslcn_on_io_lines} ? (defined $cn ? sprintf " cn %s", $cn : '') : ''),
(cfg->{log_sslcipher_on_io_lines} ? (defined $cipher ? sprintf " cipher %s", $cipher : '') : ''),
defined $response_code ? $response_code : 'n/a',
defined $response_message ? $response_message : 'n/a';
if (debug_http_raw_io)
{
log_enabled && logline "response:\n%s", Dumper $response;
log_enabled && logline "response->as_string:\n%s", Dumper $response->as_string;
}
$output = $response;
}
return unless $output && $wheel;
$kernel->call($session => tcpserver_output =>
{
wheel_id => $wheel_id,
output => $output,
no_log => $no_log,
});
}
sub tcpserver_client_input_websocket
{
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
return unless ref $args eq 'HASH';
my $wheel_id = $args->{wheel_id} || return;
my $input = $args->{input};
return unless defined $input;
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http} && $client->{websocket};
my $sock_wheel_id = $client->{sock_wheel_id};
my $client_addr = $client->{client_addr};
my $cn = $client->{cn};
my $cipher = $client->{cipher};
my ($output, $no_output_log);
my $websocket_frame = $client->{websocket_frame};
unless ($websocket_frame)
{
$websocket_frame = Protocol::WebSocket::Frame->new;
$client->{websocket_frame} = $websocket_frame;
}
$websocket_frame->append($input);
while (my $websocket_input_message = $websocket_frame->next)
{
my $frame_type = websocket_frame_type $websocket_frame;
$client->{input_message_count} = 0 unless defined $client->{input_message_count};
$client->{last_input_message_speed} = 0 unless defined $client->{last_input_message_speed};
$client->{input_message_count}++;
log_enabled && logline "[tcpserver #%s conn #%s] WebSocket input '%s' message = '%s'",
$sock_wheel_id,
$wheel_id,
$frame_type,
($frame_type eq 'text' ? $websocket_input_message : hexval $websocket_input_message);
if ($frame_type eq 'text')
{
if ($websocket_input_message =~ /^\s*ready(?:\s+(.{0,1024}))?\s*$/i)
{
my $ready_args = $1;
if (defined $ready_args)
{
foreach (split /\s+/, $ready_args)
{
next unless /^\s*(rtt|color)\s*$/i;
my $type = lc $1;
$client->{websocket_readyargs}->{$type} = 1;
}
}
}
elsif ($websocket_input_message =~ /^\s*rtt\s+(\d+(?:\.\d+)?)\s*$/i)
{
my $response_time = $1;
my $rtt = time_hires - $response_time;
$client->{websocket_rtt} = $rtt;
if (!defined $client->{websocket_rtt_max} || $client->{websocket_rtt_max} < $rtt)
{
$client->{websocket_rtt_max} = $rtt;
}
if (!defined $client->{websocket_rtt_min} || $client->{websocket_rtt_min} > $rtt)
{
$client->{websocket_rtt_min} = $rtt;
}
log_enabled && logline "[tcpserver #%s conn #%s] client rtt = %.3f ms, one way = ~ %.3f ms",
$sock_wheel_id,
$wheel_id,
$rtt * 1000,
($rtt / 2) * 1000;
}
}
elsif ($frame_type eq 'close')
{
log_enabled && logline "[tcpserver #%s conn #%s%s%s] disconnect, received WebSocket close",
$sock_wheel_id,
$wheel_id,
(defined $cn ? sprintf " cn %s", $cn : ''),
(defined $cipher ? sprintf " cipher %s", $cipher : '');
my $reason = 'received WebSocket close';
$kernel->call($session => tcpserver_client_kick =>
{
wheel_id => $wheel_id,
reason => $reason,
});
return;
}
}
return unless defined $output && $client->{wheel};
$kernel->call($session => tcpserver_output_websocket =>
{
wheel_id => $wheel_id,
output => $output,
no_log => $no_output_log,
});
}
sub tcpserver_output_websocket
{
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
return unless ref $args eq 'HASH';
my $wheel_id = $args->{wheel_id} || return;
my $output = $args->{output};
my $no_log = $args->{no_log};
return unless defined $output;
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http} && $client->{websocket};
my $sock_wheel_id = $client->{sock_wheel_id};
my $type = 'text';
if (ref $output eq 'HASH' && %$output && $output->{type} && $output->{type} eq 'ping')
{
$type = $output->{type};
$output = ' ';
}
my $websocket_frame = Protocol::WebSocket::Frame->new
(
type => $type,
buffer => $output,
);
my $raw_output = $websocket_frame->to_bytes;
unless ($no_log)
{
log_enabled && logline "[tcpserver #%s conn #%s] WebSocket output '%s' message = '%s'",
$sock_wheel_id,
$wheel_id,
$type,
($type eq 'text' ? $output : hexval $output);
}
return unless defined $raw_output && $client->{wheel};
$client->{output_message_count} = 0 unless defined $client->{output_message_count};
$client->{last_output_message_speed} = 0 unless defined $client->{last_output_message_speed};
$client->{output_message_count}++;
$kernel->call($session => tcpserver_output =>
{
wheel_id => $wheel_id,
output => $raw_output,
no_log => $no_log,
});
}
sub tcpserver_client_switch_to_websocket
{
my ($kernel, $session, $heap, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0];
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client;
my $sock_wheel_id = $client->{sock_wheel_id};
my $http = $client->{http};
my $wheel = $client->{wheel};
return unless $sock_wheel_id && $http && $wheel;
$wheel->event('FlushedEvent');
unless ($kernel->call($session => tcpserver_client_remove_http_filter => { wheel_id => $wheel_id }))
{
log_enabled && logline 'error calling tcpserver_client_remove_http_filter';
return;
}
$client->{websocket} = 1;
$client->{client_max_idle_time} = 10;
$client->{client_max_idle_time_disconnect} = 30;
}
sub tcpserver_client_aio_send_file
{
my ($kernel, $session, $heap, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0];
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client;
my $sock_wheel_id = $client->{sock_wheel_id};
my $http = $client->{http};
my $wheel = $client->{wheel};
return unless $sock_wheel_id && $http && $wheel;
$wheel->event('FlushedEvent');
unless ($kernel->call($session => tcpserver_client_remove_http_filter => { wheel_id => $wheel_id }))
{
log_enabled && logline 'error calling tcpserver_client_remove_http_filter';
return;
}
my $aio_args = $client->{aio_args};
unless (ref $aio_args eq 'HASH')
{
log_enabled && logline 'invalid aio_args';
return;
}
$kernel->yield(aio_read_file => $aio_args);
}
sub tcpserver_client_remove_http_filter
{
my ($heap, $args) = @_[HEAP, ARG0];
unless (ref $args eq 'HASH')
{
log_enabled && logline 'args not HASHref';
return;
}
my $wheel_id = $args->{wheel_id};
unless ($wheel_id)
{
log_enabled && logline 'invalid wheel_id';
return;
}
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
unless ($client)
{
log_enabled && logline 'no client';
return;
}
my $sock_wheel_id = $client->{sock_wheel_id};
my $http = $client->{http};
my $wheel = $client->{wheel};
unless ($sock_wheel_id && $http && $wheel)
{
log_enabled && logline 'no sock_wheel_id|http|wheel';
return;
}
my $input_filter_stack = $wheel->get_input_filter;
if (ref $input_filter_stack eq 'POE::Filter::Stackable')
{
my @input_filters = $input_filter_stack->filters;
if (ref $input_filters[$#input_filters] eq 'POE::Filter::HTTPD')
{
my $last_popped_input_filter = $input_filter_stack->pop;
log_enabled && logline "[tcpserver #%s conn #%s] remove HTTP filter ('%s' -> pop -> '%s')",
$sock_wheel_id,
$wheel_id,
ref $input_filter_stack,
ref $last_popped_input_filter;
return 1;
}
else
{
log_enabled && logline "last element in input_filters is not POE::Filter::HTTPD ref\n%s", Dumper \@input_filters;
return;
}
}
else
{
log_enabled && logline "input_filter_stack is not POE::Filter::Stackable ref\n%s", Dumper $input_filter_stack;
return;
}
}
sub tcpserver_client_add_http_filter
{
my ($heap, $args) = @_[HEAP, ARG0];
return unless ref $args eq 'HASH';
my $wheel_id = $args->{wheel_id} || return;
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client;
my $sock_wheel_id = $client->{sock_wheel_id};
my $http = $client->{http};
my $wheel = $client->{wheel};
return unless $sock_wheel_id && $http && $wheel;
my $input_filter_stack = $wheel->get_input_filter;
if (ref $input_filter_stack eq 'POE::Filter::Stackable')
{
my @input_filters = $input_filter_stack->filters;
if (ref $input_filters[$#input_filters] ne 'POE::Filter::HTTPD')
{
my $poe_filter_httpd_new_params = $client->{poe_filter_httpd_new_params} || {};
my $poe_filter_httpd = POE::Filter::HTTPD->new(%$poe_filter_httpd_new_params);
my $last_popped_input_filter = $input_filter_stack->push($poe_filter_httpd);
log_enabled && logline "[tcpserver #%s conn #%s] add HTTP filter ('%s' -> push -> '%s'), poe_filter_httpd_new_params = %s",
$sock_wheel_id,
$wheel_id,
ref $input_filter_stack,
ref $poe_filter_httpd,
dumper_oneline $poe_filter_httpd_new_params;
return 1;
}
}
}
sub tcpserver_client_http_response_sent
{
my ($kernel, $session, $heap, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0];
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http};
my $wheel = $client->{wheel};
return unless $wheel;
$wheel->event('FlushedEvent');
unless ($client->{http_keepalive})
{
my $reason = 'http close';
$kernel->call($session => tcpserver_client_kick =>
{
wheel_id => $wheel_id,
reason => $reason,
});
return;
}
if (ref $client->{aio_args} eq 'HASH')
{
delete $client->{aio_args};
unless ($kernel->call($session => tcpserver_client_add_http_filter => { wheel_id => $wheel_id }))
{
log_enabled && logline 'error calling tcpserver_client_add_http_filter';
return;
}
}
}
sub tcpserver_client_aio_send_file_done
{
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
return unless ref $args eq 'HASH';
my $wheel_id = $args->{wheel_id} || return;
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http};
my $wheel = $client->{wheel};
return unless $wheel;
if ($wheel->get_driver_out_octets)
{
log_enabled && logline 'found unflushed data, adding FlushedEvent => tcpserver_client_http_response_sent';
$wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent');
return;
}
unless ($client->{http_keepalive})
{
my $reason = 'http close';
$kernel->call($session => tcpserver_client_kick =>
{
wheel_id => $wheel_id,
reason => $reason,
});
return;
}
if (ref $client->{aio_args} eq 'HASH')
{
delete $client->{aio_args};
unless ($kernel->call($session => tcpserver_client_add_http_filter => { wheel_id => $wheel_id }))
{
log_enabled && logline 'error calling tcpserver_client_add_http_filter';
return;
}
}
}
sub tcpserver_periodic1s_client_websocket_color
{
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}})
{
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
next unless $client && $client->{websocket};
for (my $i = 0; $i < 100; $i++)
{
#my $output = 'FF0000';
my $output = sprintf "%02X%02X%02X",
int(rand() * 255),
int(rand() * 255),
int(rand() * 255);
$kernel->call($session => tcpserver_output_websocket =>
{
wheel_id => $wheel_id,
output => $output,
});
}
}
}
sub tcpserver_client_websocket_colorchanger
{
my ($kernel, $session, $heap, $state) = @_[KERNEL, SESSION, HEAP, STATE];
foreach my $wheel_id (keys %{$heap->{tcpserver}->{connections}})
{
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
next unless $client && $client->{websocket} && exists $client->{websocket_readyargs} && $client->{websocket_readyargs}->{color};
$heap->{websocket_colorchanger} = {} unless exists $heap->{websocket_colorchanger};
my $colors = $heap->{websocket_colorchanger};
my $value_min = 0;
my $value_max = 255;
$colors->{value} = $value_min unless defined $colors->{value};
$colors->{inc} = 1 unless defined $colors->{inc};
$colors->{curr_cval} = 0 unless defined $colors->{curr_cval};
$colors->{cvalues} = { 0 => $value_min, 1 => $value_min, 2 => $value_min } unless defined $colors->{cvalues};
my $cvalues = $colors->{cvalues};
my $step_up = 1;
my $step_down = 1;
#if ($cvalues->{$colors->{curr_cval}} == $value_max)
#if ($cvalues->{$colors->{curr_cval}} == 127)
#if ($cvalues->{$colors->{curr_cval}} == int($value_max / 2))
if ($cvalues->{$colors->{curr_cval}} == $value_min || $cvalues->{$colors->{curr_cval}} == int($value_max / 2) || $cvalues->{$colors->{curr_cval}} == $value_max)
#if ($cvalues->{$colors->{curr_cval}} == $value_min)
#if ($cvalues->{$colors->{curr_cval}} =~ /^(?:127|255)$/)
#if ($cvalues->{$colors->{curr_cval}} =~ /^(?:0|127|255)$/)
#if ($cvalues->{$colors->{curr_cval}} == int(rand() * 256))
{
$colors->{inc} = int(rand() * 2);
#$colors->{curr_cval}++;
$colors->{curr_cval} = int(rand() * 3);
#if ($colors->{curr_cval} > 2)
#{
# $colors->{curr_cval} = 0;
#}
}
if ($colors->{inc})
{
$colors->{value} += $step_up;
$cvalues->{$colors->{curr_cval}} += $step_up;
}
else
{
$colors->{value} -= $step_down;
$cvalues->{$colors->{curr_cval}} -= $step_up;
}
if ($colors->{value} < $value_min)
{
$colors->{value} = $value_min;
}
elsif ($colors->{value} > $value_max)
{
$colors->{value} = $value_max;
}
if ($cvalues->{$colors->{curr_cval}} < $value_min)
{
$cvalues->{$colors->{curr_cval}} = $value_min;
}
elsif ($cvalues->{$colors->{curr_cval}} > $value_max)
{
$cvalues->{$colors->{curr_cval}} = $value_max;
}
if ($cvalues->{$colors->{curr_cval}} == $value_min)
{
$colors->{inc} = 1;
}
elsif ($cvalues->{$colors->{curr_cval}} == $value_max)
{
$colors->{inc} = 0;
}
my $output = sprintf "%02X%02X%02X",
#int(rand() * $value_max),
#$value_max,
#$colors->{value},
$cvalues->{0},
#int(rand() * $value_max),
#$value_max,
#$colors->{value},
$cvalues->{1},
#int(rand() * $value_max);
#$value_max;
#$colors->{value};
$cvalues->{2};
$kernel->call($session => tcpserver_output_websocket =>
{
wheel_id => $wheel_id,
output => $output,
});
}
unless ($heap->{shutdown})
{
$kernel->delay($state => 0.025000);
}
}
sub tcpserver_httpclient_response
{
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
return unless ref $args eq 'HASH';
my $orig_args = $args->{orig_args} || return;
my $wheel_id = $orig_args->{wheel_id} || return;
my $headers = $orig_args->{headers};
my $protocol = $orig_args->{protocol};
my $connection = $orig_args->{connection};
my $content = $args->{content};
my $content_type = $args->{content_type};
return unless defined $content;
#log_enabled && logline Dumper
#{
# headers => $headers,
# content_sha256_hex => sha256_hex $content,
#};
my $client = $heap->{tcpserver}->{connections}->{$wheel_id};
return unless $client && $client->{http};
my $wheel = $client->{wheel};
return unless $wheel;
$wheel->event(FlushedEvent => 'tcpserver_client_http_response_sent');
$kernel->call($session => tcpserver_output_http =>
{
wheel_id => $wheel_id,
content => $content,
content_type => $content_type,
headers => $headers,
protocol => $protocol,
connection => $connection,
no_log => 1,
});
}
sub tcpserver_periodic3s_session_gc
{
my $heap = $_[HEAP];
return unless $heap->{shcp}->{sessions};
my @sessions = keys %{$heap->{shcp}->{sessions} || {}};
my $session_count = $#sessions + 1;
my $max_session_count = 50;
return if $session_count <= $max_session_count;
my $session_times = {};
foreach my $session_id (@sessions)
{
my $session = $heap->{shcp}->{sessions}->{$session_id};
#log_enabled && logline "session '%s':\n%s",
# $session_id,
# Dumper $session;
next unless ref $session eq 'HASH';
my $session_time = $session->{time};
next unless $session_time;
$session_times->{$session_time} = $session_id;
}
my @session_times = sort keys %$session_times;
#log_enabled && logline "session_times:\n%s", Dumper \@session_times;
if (@session_times)
{
my $session_time = $session_times[0];
my $session_id = $session_times->{$session_time};
#log_enabled && logline "oldest session id: '%s'", $session_id;
log_enabled && logline "session count '%s', deleting oldest session '%s'",
$session_count,
$session_id;
delete $heap->{shcp}->{sessions}->{$session_id};
}
}
1;