class PhusionPassenger::RequestHandler::ThreadHandler
This class encapsulates the logic of a single RequestHandler thread.
Constants
- CONTENT_LENGTH
- GC_SUPPORTS_CLEAR_STATS
- GC_SUPPORTS_TIME
- MAX_HEADER_SIZE
- OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS
- OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS
- OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS
- OOBW
- PASSENGER_CONNECT_PASSWORD
- PING
- REQUEST_METHOD
- TRANSFER_ENCODING
Attributes
interruptable[R]
iteration[R]
stats_mutex[R]
thread[R]
Public Class Methods
new(request_handler, options = {})
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 62 def initialize(request_handler, options = {}) @request_handler = request_handler @server_socket = Utils.require_option(options, :server_socket) @socket_name = Utils.require_option(options, :socket_name) @protocol = Utils.require_option(options, :protocol) @app_group_name = Utils.require_option(options, :app_group_name) Utils.install_options_as_ivars(self, options, :app, :analytics_logger, :connect_password ) @stats_mutex = Mutex.new @interruptable = false @iteration = 0 if @protocol == :session metaclass = class << self; self; end metaclass.class_eval do alias parse_request parse_session_request end elsif @protocol == :http metaclass = class << self; self; end metaclass.class_eval do alias parse_request parse_http_request end else raise ArgumentError, "Unknown protocol specified" end end
Public Instance Methods
install()
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 93 def install @thread = Thread.current Thread.current[:passenger_thread_handler] = self PhusionPassenger.call_event(:starting_request_handler_thread) end
main_loop(finish_callback)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 99 def main_loop(finish_callback) socket_wrapper = Utils::UnseekableSocket.new channel = MessageChannel.new buffer = '' buffer.force_encoding('binary') if buffer.respond_to?(:force_encoding) begin finish_callback.call while true hijacked = accept_and_process_next_request(socket_wrapper, channel, buffer) socket_wrapper = Utils::UnseekableSocket.new if hijacked end rescue Interrupted # Do nothing. end debug("Thread handler main loop exited normally") ensure @stats_mutex.synchronize { @interruptable = true } end
Private Instance Methods
accept_and_process_next_request(socket_wrapper, channel, buffer)
click to toggle source
Returns true if the socket has been hijacked, false otherwise.
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 121 def accept_and_process_next_request(socket_wrapper, channel, buffer) @stats_mutex.synchronize do @interruptable = true end connection = socket_wrapper.wrap(@server_socket.accept) @stats_mutex.synchronize do @interruptable = false @iteration += 1 end trace(3, "Accepted new request on socket #{@socket_name}") channel.io = connection if headers = parse_request(connection, channel, buffer) prepare_request(connection, headers) begin if headers[REQUEST_METHOD] == PING process_ping(headers, connection) elsif headers[REQUEST_METHOD] == OOBW process_oobw(headers, connection) else process_request(headers, connection, socket_wrapper, @protocol == :http) end rescue Exception has_error = true raise ensure if headers[RACK_HIJACK_IO] socket_wrapper = nil connection = nil channel = nil end finalize_request(connection, headers, has_error) trace(3, "Request done.") end else trace(2, "No headers parsed; disconnecting client.") end rescue Interrupted raise rescue => e if socket_wrapper && socket_wrapper.source_of_exception?(e) # EPIPE is harmless, it just means that the client closed the connection. # Other errors might indicate a problem so we print them, but they're # probably not bad enough to warrant stopping the request handler. if !e.is_a?(Errno::EPIPE) print_exception("Passenger RequestHandler's client socket", e) end else if @analytics_logger && headers && headers[PASSENGER_TXN_ID] log_analytics_exception(headers, e) end raise e if should_reraise_error?(e) end ensure # The 'close_write' here prevents forked child # processes from unintentionally keeping the # connection open. if connection && !connection.closed? begin connection.close_write rescue SystemCallError end begin connection.close rescue SystemCallError end end end
finalize_request(connection, headers, has_error)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 311 def finalize_request(connection, headers, has_error) if connection connection.stop_simulating_eof! end log = headers[PASSENGER_ANALYTICS_WEB_LOG] if log && !log.closed? exception_occurred = false begin log.end_measure("app request handler processing", has_error) if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS log.message("Final objects on heap: #{ObjectSpace.live_objects}") end if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS log.message("Final objects allocated so far: #{ObjectSpace.allocated_objects}") elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS count = ObjectSpace.count_objects log.message("Final objects allocated so far: #{count[:TOTAL] - count[:FREE]}") end if GC_SUPPORTS_TIME log.message("Final GC time: #{GC.time}") end if GC_SUPPORTS_CLEAR_STATS # Clear statistics to void integer wraps. GC.clear_stats end Thread.current[PASSENGER_ANALYTICS_WEB_LOG] = nil rescue Exception # Maybe this exception was raised while communicating # with the logging agent. If that is the case then # log.close may also raise an exception, but we're only # interested in the original exception. So if this # situation occurs we must ignore any exceptions raised # by log.close. exception_occurred = true raise ensure # It is important that the following call receives an ACK # from the logging agent and that we don't close the socket # connection until the ACK has been received, otherwise # the helper agent may close the transaction before this # process's openTransaction command is processed. begin log.close rescue raise if !exception_occurred end end end ################# end
log_analytics_exception(env, exception)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 364 def log_analytics_exception(env, exception) log = @analytics_logger.new_transaction( @app_group_name, :exceptions, env[PASSENGER_UNION_STATION_KEY]) begin request_txn_id = env[PASSENGER_TXN_ID] message = exception.message message = exception.to_s if message.empty? message = [message].pack('m') message.gsub!("\n", "") backtrace_string = [exception.backtrace.join("\n")].pack('m') backtrace_string.gsub!("\n", "") log.message("Request transaction ID: #{request_txn_id}") log.message("Message: #{message}") log.message("Class: #{exception.class.name}") log.message("Backtrace: #{backtrace_string}") ensure log.close end end
parse_http_request(connection, channel, buffer)
click to toggle source
Like #parse_session_request, but parses an HTTP request. This is a very minimalistic HTTP parser and is not intended to be complete, fast or secure, since the HTTP server socket is intended to be used for debugging purposes only.
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 211 def parse_http_request(connection, channel, buffer) headers = {} data = "" while data !~ /\r\n\r\n/ && data.size < MAX_HEADER_SIZE data << connection.readpartial(16 * 1024) end if data.size >= MAX_HEADER_SIZE warn("*** Passenger RequestHandler warning: " << "HTTP header size exceeded maximum.") return end data.gsub!(/\r\n\r\n.*/, '') data.split("\r\n").each_with_index do |line, i| if i == 0 # GET / HTTP/1.1 line =~ /^([A-Za-z]+) (.+?) (HTTP\/\d\.\d)$/ request_method = $1 request_uri = $2 protocol = $3 path_info, query_string = request_uri.split("?", 2) headers[REQUEST_METHOD] = request_method headers["REQUEST_URI"] = request_uri headers["QUERY_STRING"] = query_string || "" headers["SCRIPT_NAME"] = "" headers["PATH_INFO"] = path_info headers["SERVER_NAME"] = "127.0.0.1" headers["SERVER_PORT"] = connection.addr[1].to_s headers["SERVER_PROTOCOL"] = protocol else header, value = line.split(/\s*:\s*/, 2) header.upcase! # "Foo-Bar" => "FOO-BAR" header.gsub!("-", "_") # => "FOO_BAR" if header == CONTENT_LENGTH || header == "CONTENT_TYPE" headers[header] = value else headers["HTTP_#{header}"] = value end end end if @connect_password && headers["HTTP_X_PASSENGER_CONNECT_PASSWORD"] != @connect_password warn "*** Passenger RequestHandler warning: " << "someone tried to connect with an invalid connect password." return else return headers end rescue EOFError return end
parse_session_request(connection, channel, buffer)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 189 def parse_session_request(connection, channel, buffer) headers_data = channel.read_scalar(buffer, MAX_HEADER_SIZE) if headers_data.nil? return end headers = Utils.split_by_null_into_hash(headers_data) if @connect_password && headers[PASSENGER_CONNECT_PASSWORD] != @connect_password warn "*** Passenger RequestHandler warning: " << "someone tried to connect with an invalid connect password." return else return headers end rescue SecurityError => e warn("*** Passenger RequestHandler warning: " << "HTTP header size exceeded maximum.") return end
prepare_request(connection, headers)
click to toggle source
def process_request(env, connection, socket_wrapper, full_http_response)
raise NotImplementedError, "Override with your own implementation!"
end
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 277 def prepare_request(connection, headers) if (!headers.has_key?(CONTENT_LENGTH) && !headers.has_key?(TRANSFER_ENCODING)) || headers[CONTENT_LENGTH] == 0 connection.simulate_eof! end if @analytics_logger && headers[PASSENGER_TXN_ID] txn_id = headers[PASSENGER_TXN_ID] union_station_key = headers[PASSENGER_UNION_STATION_KEY] log = @analytics_logger.continue_transaction(txn_id, @app_group_name, :requests, union_station_key) headers[PASSENGER_ANALYTICS_WEB_LOG] = log Thread.current[PASSENGER_ANALYTICS_WEB_LOG] = log Thread.current[PASSENGER_TXN_ID] = txn_id Thread.current[PASSENGER_UNION_STATION_KEY] = union_station_key if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS log.message("Initial objects on heap: #{ObjectSpace.live_objects}") end if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS log.message("Initial objects allocated so far: #{ObjectSpace.allocated_objects}") elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS count = ObjectSpace.count_objects log.message("Initial objects allocated so far: #{count[:TOTAL] - count[:FREE]}") end if GC_SUPPORTS_TIME log.message("Initial GC time: #{GC.time}") end log.begin_measure("app request handler processing") end ################# end
process_oobw(env, connection)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 268 def process_oobw(env, connection) PhusionPassenger.call_event(:oob_work) connection.write("oobw done") end
process_ping(env, connection)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 264 def process_ping(env, connection) connection.write("pong") end
should_reraise_app_error?(e, socket_wrapper)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 392 def should_reraise_app_error?(e, socket_wrapper) return false end
should_reraise_error?(e)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 387 def should_reraise_error?(e) # Stubable by unit tests. return true end
should_swallow_app_error?(e, socket_wrapper)
click to toggle source
# File lib/phusion_passenger/request_handler/thread_handler.rb, line 396 def should_swallow_app_error?(e, socket_wrapper) return socket_wrapper && socket_wrapper.source_of_exception?(e) && e.is_a?(Errno::EPIPE) end