1/* Prolog Machine Query Interface 2 Author: Eric Zinda 3 E-mail: ericz@inductorsoftware.com 4 WWW: http://www.inductorsoftware.com 5 Copyright (c) 2021, Eric Zinda 6 All rights reserved. 7 8 Redistribution and use in source and binary forms, with or without 9 modification, are permitted provided that the following conditions 10 are met: 11 12 1. Redistributions of source code must retain the above copyright 13 notice, this list of conditions and the following disclaimer. 14 15 2. Redistributions in binary form must reproduce the above copyright 16 notice, this list of conditions and the following disclaimer in 17 the documentation and/or other materials provided with the 18 distribution. 19 20 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 23 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 24 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 25 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 26 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 27 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 28 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 29 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 30 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 31 POSSIBILITY OF SUCH DAMAGE. 32*/ 33 34:- module(mqi, 35 [ mqi_start/0, 36 mqi_start/1, % +Options 37 mqi_stop/1 % ?Thread 38 ]).
swiplserver
Python library, but starting manually can be useful when debugging Prolog code in some scenarios. See the documentation on "Standalone Mode" for more information.
Once started, the MQI listens for TCP/IP or Unix Domain Socket connections and authenticates them using the password provided (or created depending on options) before processing any messages. The messages processed by the MQI are described below.
For debugging, the server outputs traces using the debug/3 predicate so that the server operation can be observed by using the debug/1 predicate. Run the following commands to see them:
debug(mqi(protocol))
: Traces protocol messages to show the flow of commands and connections. It is designed to avoid filling the screen with large queries and results to make it easier to read.debug(mqi(query))
: Traces messages that involve each query and its results. Therefore it can be quite verbose depending on the query.
__Options__
Options is a list containing any combination of the following options. When used in the Prolog top level (i.e. in Standalone Mode), these are specified as normal Prolog options like this:
mqi_start([unix_domain_socket(Socket), password('a password')])
When using "Embedded Mode" they are passed using the same name but as normal command line arguments like this:
swipl --quiet -g mqi_start -t halt -- --write_connection_values=true --password="a password" --create_unix_domain_socket=true
Note the use of quotes around values that could confuse command line
processing like spaces (e.g. "a password") and that
unix_domain_socket(Variable)
is written as
--create_unix_domain_socket=true
on the command line. See below for
more information.
write_connection_values(true)
is set, the selected port is output to STDOUT followed by \n
on startup to allow the client language library to retrieve it in "Embedded Mode".
To have one generated instead (recommended), pass Unix_Domain_Socket_Path_And_File as a variable when calling from the Prolog top level and the variable will be unified with a created filename. If launching in "Embedded Mode", instead pass --create_unix_domain_socket=true
since there isn't a way to specify variables from the command line. When generating the file, a temporary directory will be created using tmp_file/2 and a socket file will be created within that directory following the below requirements. If the directory and file are unable to be created for some reason, mqi_start/1 fails.
Regardless of whether the file is specified or generated, if the option write_connection_values(true)
is set, the fully qualified path to the generated file is output to STDOUT followed by \n
on startup to allow the client language library to retrieve it.
Specifying a file to use should follow the same guidelines as the generated file:
write_connection_values(true)
is set, the password is output to STDOUT followed by \n
on startup to allow the client language library to retrieve it. This is the recommended way to integrate the MQI with a language as it avoids including the password as source code. This option is only included so that a known password can be supplied for when the MQI is running in Standalone Mode.query_timeout(+Seconds)
Sets the default time in seconds that a query is allowed to run before it is cancelled. This can be overridden on a query by query basis. If not set, the default is no timeout (-1
).pending_connections(+Count)
Sets the number of pending connections allowed for the MQI as in tcp_listen/2. If not provided, the default is 5
.run_server_on_thread(+Run_Server_On_Thread)
Determines whether mqi_start/1 runs in the background on its own thread or blocks until the MQI shuts down. Must be missing or set to true
when running in "Embedded Mode" so that the SWI Prolog process can exit properly. If not set, the default is true
.run_server_on_thread(true)
. Passing in an atom for Server_Thread will only set the server thread name if run_server_on_thread(true)
. If Server_Thread is a variable, it is unified with a generated name.write_connection_values(+Write_Connection_Values)
Determines whether the server writes the port (or generated Unix Domain Socket) and password to STDOUT as it initializes. Used by language libraries to retrieve this information for connecting. If not set, the default is false
.write_output_to_file(+File)
Redirects STDOUT and STDERR to the file path specified. Useful for debugging the MQI when it is being used in "Embedded Mode". If using multiple MQI instances in one SWI Prolog instance, only set this on the first one. Each time it is set the output will be redirected.
*/
109:- use_module(library(socket)). 110:- use_module(library(http/json)). 111:- use_module(library(http/json_convert)). 112:- use_module(library(option)). 113:- use_module(library(term_to_json)). 114:- use_module(library(debug)). 115:- use_module(library(filesex)). 116:- use_module(library(gensym)). 117:- use_module(library(lists)). 118:- use_module(library(main)). 119:- use_module(library(make)). 120:- use_module(library(prolog_source)). 121:- use_module(library(time)). 122:- use_module(library(uuid)). 123 124% One for every Machine Query Interface running 125:- dynamic(mqi_thread/3). 126 127% One for every active connection 128:- dynamic(mqi_worker_threads/3). 129:- dynamic(mqi_socket/5). 130 131% Indicates that a query is in progress on the goal thread or hasn't had its results drained 132% Deleted once the last result from the queue has been drained 133% Only deleted by the communication thread to avoid race conditions 134:- dynamic(query_in_progress/1). 135 136% Indicates to the communication thread that we are in a place 137% that can be cancelled 138:- dynamic(safe_to_cancel/1). 139 140 141% Password is carefully constructed to be a string (not an atom) so that it is not 142% globally visible 143% Add ".\n" to the password since it will be added by the message when received 144mqi_start(Options) :- 145 Encoding = utf8, 146 option(pending_connections(Connection_Count), Options, 5), 147 option(query_timeout(Query_Timeout), Options, -1), 148 option(port(Port), Options, _), 149 option(run_server_on_thread(Run_Server_On_Thread), Options, true), 150 option(exit_main_on_failure(Exit_Main_On_Failure), Options, false), 151 option(write_connection_values(Write_Connection_Values), Options, false), 152 option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _), 153 ( ( memberchk(unix_domain_socket(_), Options), 154 var(Unix_Domain_Socket_Path_And_File) 155 ) 156 -> unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) 157 ; true 158 ), 159 option(server_thread(Server_Thread_ID), Options, _), 160 ( var(Server_Thread_ID) 161 -> gensym(mqi, Server_Thread_ID) 162 ; true 163 ), 164 option(password(Password), Options, _), 165 ( var(Password) 166 -> ( current_prolog_flag(bounded, false) 167 -> uuid(UUID, [format(integer)]) 168 ; UUID is random(1<<62) 169 ), 170 format(string(Password), '~d', [UUID]) 171 ; true 172 ), 173 string_concat(Password, '.\n', Final_Password), 174 bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address), 175 send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password), 176 option(write_output_to_file(File), Options, _), 177 ( var(File) 178 -> true 179 ; write_output_to_file(File) 180 ), 181 Server_Goal = ( 182 catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true), 183 debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)]) 184 ), 185 start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File). 186 187opt_type(port, port, natural). 188opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean). 189opt_type(unix_domain_socket, unix_domain_socket, file(write)). 190opt_type(password, password, string). 191opt_type(pending_connections, pending_connections, nonneg). 192opt_type(query_timeout, query_timeout, float). 193opt_type(run_server_on_thread, run_server_on_thread, boolean). 194opt_type(exit_main_on_failure, exit_main_on_failure, boolean). 195opt_type(write_connection_values, write_connection_values, boolean). 196opt_type(write_output_to_file, write_output_to_file, file(write)). 197 198opt_help(port, "TCP/IP port for clients to connect to"). 199opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to"). 200opt_help(unix_domain_socket, "File path for the Unix domin socket"). 201opt_help(password, "Connection password"). 202opt_help(pending_connections, "Max number of queued connections (5)"). 203opt_help(query_timeout, "Max query runtime in seconds (default infinite)"). 204opt_help(run_server_on_thread, "Run server in a background thread (true)"). 205opt_help(exit_main_on_failure, "Exit the process on a failure"). 206opt_help(write_connection_values, "Print info for clients to connect"). 207opt_help(write_output_to_file, "Write stdout and stderr to file").
To launch embedded mode:
swipl --quiet -g mqi_start -t halt -- --write_connection_values=true
This will start SWI Prolog and invoke the mqi_start/0 predicate and
exit the process when that predicate stops. Any command line
arguments after the standalone --
will be passed as Options. These
are the same Options that mqi_start/1 accepts and are passed to it
directly. Some options are expressed differently due to command line
limitations, see mqi_start/1 Options for more information.
Any Option values that cause issues during command line parsing (such
as spaces) should be passed with ""
like this:
swipl --quiet -g mqi_start -t halt -- --write_connection_values=true \ --password="HGJ SOWLWW WNDSJD"
For help on commandline options run
swipl -g mqi_start -- --help
247% Turn off int signal when running in embedded mode so the client language 248% debugger signal doesn't put Prolog into debug mode 249% run_server_on_thread must be missing or true (the default) so we can exit 250% properly 251% create_unix_domain_socket=true/false is only used as a command line argument 252% since it doesn't seem possible to pass create_unix_domain_socket=_ on the command line 253% and have it interpreted as a variable. 254mqi_start :- 255 current_prolog_flag(argv, Argv), 256 argv_options(Argv, _Args, Options), 257 merge_options(Options, [exit_main_on_failure(true)], Options1), 258 select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false), 259 ( Create_Unix_Domain_Socket == true 260 -> merge_options(Options2, [unix_domain_socket(_)], FinalOptions) 261 ; FinalOptions = Options2 262 ), 263 option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true), 264 ( Run_Server_On_Thread == true 265 -> true 266 ; throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread)) 267 ), 268 mqi_start(FinalOptions), 269 on_signal(int, _, quit), 270 thread_get_message(quit_mqi). 271 272 273quit(_) :- 274 thread_send_message(main, quit_mqi).
Always succeeds.
283% tcp_close_socket(Socket) will shut down the server thread cleanly so the socket is released and can be used again in the same session 284% Closes down any pending connections using abort even if there were no matching server threads since the server thread could have died. 285% At this point only threads associated with live connections (or potentially a goal thread that hasn't detected its missing communication thread) 286% should be left so seeing abort warning messages in the console seems OK 287mqi_stop(Server_Thread_ID) :- 288 % First shut down any matching servers to stop new connections 289 forall(retract(mqi_thread(Server_Thread_ID, _, Socket)), 290 ( 291 debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]), 292 catch(tcp_close_socket(Socket), Socket_Exception, true), 293 abortSilentExit(Server_Thread_ID, Server_Thread_Exception), 294 debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception]) 295 )), 296 forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)), 297 ( 298 abortSilentExit(Communication_Thread_ID, CommunicationException), 299 debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]), 300 abortSilentExit(Goal_Thread_ID, Goal_Exception), 301 debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception]) 302 )). 303 304 305start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :- 306 ( 307 -> ( thread_create(Server_Goal, _, [ alias(Server_Thread_ID), 308 at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File), 309 detach_if_expected(Server_Thread_ID) 310 )) 311 ]), 312 debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID]) 313 ) 314 ; ( , 315 delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File), 316 debug(mqi(protocol), "Halting.", []) 317 ) 318 ). 319 320 321% Unix domain sockets create a file that needs to be cleaned up 322% If mqi generated it, there is also a directory that needs to be cleaned up 323% that will only contain that file 324delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :- 325 ( nonvar(Unix_Domain_Socket_Path) 326 -> catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true) 327 ; ( nonvar(Unix_Domain_Socket_Path_And_File) 328 -> catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true) 329 ; true 330 ) 331 ). 332 333:- if(current_predicate(unix_domain_socket/1)). 334 optional_unix_domain_socket(Socket) :- 335 unix_domain_socket(Socket). 336:- else. 337 optional_unix_domain_socket(_). 338:- endif. 339 340% Always bind only to localhost for security reasons 341% Delete the socket file in case it is already around so that the same name can be reused 342bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :- 343 ( nonvar(Unix_Domain_Socket_Path_And_File) 344 -> debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]), 345 optional_unix_domain_socket(Socket), 346 catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true), 347 tcp_bind(Socket, Unix_Domain_Socket_Path_And_File), 348 Client_Address = Unix_Domain_Socket_Path_And_File 349 ; ( tcp_socket(Socket), 350 tcp_setopt(Socket, reuseaddr), 351 tcp_bind(Socket, '127.0.0.1':Port), 352 debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]), 353 Client_Address = Port 354 ) 355 ), 356 assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)). 357 358% Communicates the used port and password to the client via STDOUT so the client 359% language library can use them to connect 360send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :- 361 ( 362 -> ( ( var(Unix_Domain_Socket_Path_And_File) 363 -> format(Stream, "~d\n", [Port]) 364 ; format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File]) 365 ), 366 format(Stream, "~w\n", [Password]), 367 flush_output(Stream) 368 ) 369 ; true 370 ). 371 372 373% Server thread worker predicate 374% Listen for connections and create a connection for each in its own communication thread 375% Uses tail recursion to ensure the stack doesn't grow 376server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :- 377 debug(mqi(protocol), "Listening on address: ~w", [Address]), 378 tcp_listen(Socket, Connection_Count), 379 tcp_open_socket(Socket, AcceptFd, _), 380 create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure), 381 server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure). 382 383 384% Wait for the next connection and create communication and goal threads to support it 385% Create known IDs for the threads so we can pass them along before the threads are created 386% First create the goal thread to avoid a race condition where the communication 387% thread tries to queue a goal before it is created 388create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :- 389 debug(mqi(protocol), "Waiting for client connection...", []), 390 tcp_accept(AcceptFd, Socket, _Peer), 391 debug(mqi(protocol), "Client connected", []), 392 gensym('conn', Connection_Base), 393 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias), 394 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias), 395 mutex_create(Goal_Alias, [alias(Goal_Alias)]), 396 assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)), 397 thread_create(goal_thread(Thread_Alias), 398 _, 399 [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]), 400 thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure), 401 _, 402 [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]). 403 404 405% The worker predicate for the Goal thread. 406% Looks for a message from the connection thread, processes it, then recurses. 407% 408% Goals always run in the same thread in case the user is setting thread local information. 409% For each answer to the user's query (including an exception), the goal thread will queue a message 410% to the communication thread of the form result(Answer, Find_All), where Find_All == true if the user wants all answers at once 411% Tail recurse to avoid growing the stack 412goal_thread(Respond_To_Thread_ID) :- 413 thread_self(Self_ID), 414 throw_if_testing(Self_ID), 415 thread_get_message(Self_ID, goal(Goal, Binding_List, Query_Timeout, Find_All)), 416 debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Goal]), 417 ( 418 -> One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers) 419 ; 420 One_Answer_Goal = ( @(user:Goal, user), 421 Answers = [Binding_List], 422 send_next_result(Respond_To_Thread_ID, Answers, _, Find_All) 423 ) 424 ), 425 All_Answers_Goal = run_cancellable_goal(Self_ID, findall(Answers, One_Answer_Goal, [Find_All_Answers | _])), 426 ( Query_Timeout == -1 427 -> catch(All_Answers_Goal, Top_Exception, true) 428 ; catch(call_with_time_limit(Query_Timeout, All_Answers_Goal), Top_Exception, true) 429 ), 430 ( 431 var(Top_Exception) 432 -> ( 433 434 -> 435 send_next_result(Respond_To_Thread_ID, Find_All_Answers, _, Find_All) 436 ; 437 send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All) 438 ) 439 ; 440 send_next_result(Respond_To_Thread_ID, [], Top_Exception, true) 441 ), 442 goal_thread(Respond_To_Thread_ID). 443 444 445% Used only for testing unhandled exceptions outside of the "safe zone" 446throw_if_testing(Self_ID) :- 447 ( thread_peek_message(Self_ID, testThrow(Test_Exception)) 448 -> ( debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]), 449 throw(Test_Exception) 450 ) 451 ; true 452 ). 453 454 455% run_cancellable_goal handles the communication 456% to ensure the cancel exception from the communication thread 457% is injected at a place we are prepared to handle in the goal_thread 458% Before the goal is run, sets a fact to indicate we are in the "safe to cancel" 459% zone for the communication thread. 460% Then it doesn't exit this "safe to cancel" zone if the 461% communication thread is about to cancel 462run_cancellable_goal(Mutex_ID, Goal) :- 463 thread_self(Self_ID), 464 setup_call_cleanup( 465 assert(safe_to_cancel(Self_ID), Assertion), 466 Goal, 467 with_mutex(Mutex_ID, erase(Assertion)) 468 ). 469 470 471% Worker predicate for the communication thread. 472% Processes messages and sends goals to the goal thread. 473% Continues processing messages until communication_thread_listen() throws or ends with true/false 474% 475% Catches all exceptions from communication_thread_listen so that it can do an orderly shutdown of the goal 476% thread if there is a communication failure. 477% 478% True means user explicitly called close or there was an exception 479% only exit the main thread if there was an exception and we are supposed to Exit_Main_On_Failure 480% otherwise just exit the session 481communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :- 482 thread_self(Self_ID), 483 ( ( 484 catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true), 485 debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]), 486 abortSilentExit(Goal_Thread_ID, _), 487 retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID)) 488 ) 489 -> Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure) 490 ; Halt = true 491 ), 492 ( 493 -> ( debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]), 494 quit(_) 495 ) 496 ; ( debug(mqi(protocol), "Ending session ~w", [Self_ID]), 497 catch(tcp_close_socket(Socket), error(_, _), true) 498 ) 499 ). 500 501 502% Open socket and begin processing the streams for a connection using the Encoding if the password matches 503% true: session ended 504% exception: communication failure or an internal failure (like a thread threw or shutdown unexpectedly) 505% false: halt 506communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :- 507 tcp_open_socket(Socket, Read_Stream, Write_Stream), 508 thread_self(Communication_Thread_ID), 509 assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)), 510 set_stream(Read_Stream, encoding(Encoding)), 511 set_stream(Write_Stream, encoding(Encoding)), 512 read_message(Read_Stream, Sent_Password), 513 ( Password == Sent_Password 514 -> ( debug(mqi(protocol), "Password matched.", []), 515 thread_self(Self_ID), 516 reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID)]])) 517 ) 518 ; ( debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]), 519 reply_error(Write_Stream, password_mismatch), 520 throw(password_mismatch) 521 ) 522 ), 523 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout), 524 debug(mqi(protocol), "Session finished.", []). 525 526 527% process_mqi_messages implements the main interface to the Machine Query Interface. 528% Continuously reads a Machine Query Interface message from Read_Stream and writes a response to Write_Stream, 529% until the connection fails or a `quit` or `close` message is sent. 530% 531% Read_Stream and Write_Stream can be any valid stream using any encoding. 532% 533% Goal_Thread_ID must be the threadID of a thread started on the goal_thread predicate 534% 535% uses tail recursion to ensure the stack doesn't grow 536% 537% true: indicates we should terminate the session (clean termination) 538% false: indicates we should exit the process if running in embedded mode 539% exception: indicates we should terminate the session (communication failure termination) or 540% thread was asked to halt 541process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :- 542 process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command), 543 ( Command == close 544 -> ( debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []), 545 true 546 ) 547 ; ( Command == quit 548 -> ( debug(mqi(protocol), "Command: quit.", []), 549 false 550 ) 551 ; 552 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) 553 ) 554 ). 555 556% process_mqi_message manages the protocol for the connection: receive message, parse it, process it. 557% - Reads a single message from Read_Stream. 558% - Processes it and issues a response on Write_Stream. 559% - The message will be unified with Command to allow the caller to handle it. 560% 561% Read_Stream and Write_Stream can be any valid stream using any encoding. 562% 563% True if the message understood. A response will always be sent. 564% False if the message was malformed. 565% Exceptions will be thrown by the underlying stream if there are communication failures writing to Write_Stream or the thread was asked to exit. 566% 567% state_* predicates manage the state transitions of the protocol 568% They only bubble up exceptions if there is a communication failure 569% 570% state_process_command will never return false 571% since errors should be sent to the client 572% It can throw if there are communication failures, though. 573process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :- 574 debug(mqi(protocol), "Waiting for next message ...", []), 575 ( state_receive_raw_message(Read_Stream, Message_String) 576 -> ( state_parse_command(Write_Stream, Message_String, Command, Binding_List) 577 -> state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List) 578 ; true 579 ) 580 ; false 581 ). 582 583 584% state_receive_raw_message: receive a raw message, which is simply a string 585% true: valid message received 586% false: invalid message format 587% exception: communication failure OR thread asked to exit 588state_receive_raw_message(Read, Command_String) :- 589 read_message(Read, Command_String), 590 debug(mqi(protocol), "Valid message: ~w", [Command_String]). 591 592 593% state_parse_command: attempt to parse the message string into a valid command 594% 595% Use read_term_from_atom instead of read_term(stream) so that we don't hang 596% indefinitely if the caller didn't properly finish the term 597% parse in the context of module 'user' to properly bind operators, do term expansion, etc 598% 599% true: command could be parsed 600% false: command cannot be parsed. An error is sent to the client in this case 601% exception: communication failure on sending a reply 602state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :- 603 ( catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true) 604 -> ( var(Parse_Exception) 605 -> debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command]) 606 ; ( reply_error(Write_Stream, Parse_Exception), 607 fail 608 ) 609 ) 610 ; ( reply_error(Write_Stream, error(couldNotParseCommand, _)), 611 fail 612 ) 613 ). 614 615 616% state_process_command(): execute the requested Command 617% 618% First wait until we have removed all results from any previous query. 619% If query_in_progress(Goal_Thread_ID) exists then there is at least one 620% more result to drain, by definition. Because the predicate is 621% deleted by get_next_result in the communication thread when the last result is drained 622% 623% true: if the command itself succeeded, failed or threw an exception. 624% In that case, the outcome is sent to the client 625% exception: only communication or thread failures are allowed to bubble up 626% See mqi(Options) documentation 627state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :- 628 !, 629 debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]), 630 repeat_until_false(( 631 query_in_progress(Goal_Thread_ID), 632 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]), 633 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer), 634 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]), 635 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer]) 636 )), 637 debug(mqi(protocol), "All previous results drained", []), 638 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true), 639 heartbeat_until_result(Goal_Thread_ID, Stream, Answers), 640 reply_with_result(Goal_Thread_ID, Stream, Answers). 641 642 643% See mqi(Options) documentation for documentation 644% See notes in run(Goal, Timeout) re: draining previous query 645state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :- 646 !, 647 debug(mqi(protocol), "Command: run_async/1.", []), 648 debug(mqi(query), " Goal: ~w", [Goal]), 649 repeat_until_false(( 650 query_in_progress(Goal_Thread_ID), 651 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]), 652 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer), 653 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]), 654 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer]) 655 )), 656 debug(mqi(protocol), "All previous results drained", []), 657 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All), 658 reply(Stream, true([[]])). 659 660 661% See mqi(Options) documentation for documentation 662state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :- 663 !, 664 debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]), 665 ( once((var(Timeout) ; Timeout == -1)) 666 -> Options = [] 667 ; Options = [timeout(Timeout)] 668 ), 669 ( query_in_progress(Goal_Thread_ID) 670 -> ( ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]), 671 get_next_result(Goal_Thread_ID, Stream, Options, Result) 672 ) 673 -> reply_with_result(Goal_Thread_ID, Stream, Result) 674 ; reply_error(Stream, result_not_available) 675 ) 676 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]), 677 reply_error(Stream, no_query) 678 ) 679 ). 680 681 682% See mqi(Options) documentation for documentation 683% To ensure the goal thread is in a place it is safe to cancel, 684% we lock a mutex first that the goal thread checks before exiting 685% the "safe to cancel" zone. 686% It is not in the safe zone: it either finished 687% or was never running. 688state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :- 689 !, 690 debug(mqi(protocol), "Command: cancel_async/0.", []), 691 with_mutex(Goal_Thread_ID, ( 692 ( safe_to_cancel(Goal_Thread_ID) 693 -> ( thread_signal(Goal_Thread_ID, throw(cancel_goal)), 694 reply(Stream, true([[]])) 695 ) 696 ; ( query_in_progress(Goal_Thread_ID) 697 -> ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]), 698 reply(Stream, true([[]])) 699 ) 700 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]), 701 reply_error(Stream, no_query) 702 ) 703 ) 704 ) 705 )). 706 707 708% Used for testing how the system behaves when the goal thread is killed unexpectedly 709% Needs to run a bogus command `run(true, -1)` to 710% get the goal thread to process the exception 711state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :- 712 !, 713 debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []), 714 thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)), 715 state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List). 716 717 718state_process_command(Stream, _, _, close, _) :- 719 !, 720 reply(Stream, true([[]])). 721 722 723state_process_command(Stream, _, _, quit, _) :- 724 !, 725 reply(Stream, true([[]])). 726 727 728% Send an exception if the command is not known 729state_process_command(Stream, _, _, Command, _) :- 730 debug(mqi(protocol), "Unknown command ~w", [Command]), 731 reply_error(Stream, unknownCommand). 732 733 734% Wait for a result (and put in Answers) from the goal thread, but send a heartbeat message 735% every so often until it arrives to detect if the socket is broken. 736% Throws if If the heartbeat failed which will 737% and then shutdown the communication thread 738% Tail recurse to not grow the stack 739heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :- 740 ( get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers) 741 -> debug(mqi(query), "Received answer from goal thread: ~w", [Answers]) 742 ; ( debug(mqi(protocol), "heartbeat...", []), 743 write_heartbeat(Stream), 744 heartbeat_until_result(Goal_Thread_ID, Stream, Answers) 745 ) 746 ). 747 748 749% True if write succeeded, otherwise throws as that 750% indicates that heartbeat failed because the other 751% end of the pipe terminated 752write_heartbeat(Stream) :- 753 put_char(Stream, '.'), 754 flush_output(Stream). 755 756 757% Send a goal to the goal thread in its queue 758% 759% Remember that we are now running a query using assert. 760% This will be retracted once all the answers have been drained. 761% 762% If Goal_Thread_ID died, thread_send_message throws and, if we don't respond, 763% the client could hang so catch and give them a good message before propagating 764% the exception 765send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :- 766 ( var(Timeout) 767 -> Timeout = Default_Timeout 768 ; true 769 ), 770 ( var(Binding_List) 771 -> Binding_List = [] 772 ; true 773 ), 774 debug(mqi(query), "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]), 775 assert(query_in_progress(Goal_Thread_ID)), 776 catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true), 777 ( var(Send_Message_Exception) 778 -> true 779 ; ( reply_error(Stream, connection_failed), 780 throw(Send_Message_Exception) 781 ) 782 ). 783 784 785% Send a result from the goal thread to the communication thread in its queue 786send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :- 787 ( var(Exception_In_Goal) 788 -> ( ( debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]), 789 Answer == [] 790 ) 791 -> thread_send_message(Respond_To_Thread_ID, result(false, Find_All)) 792 ; thread_send_message(Respond_To_Thread_ID, result(true(Answer), Find_All)) 793 ) 794 ; ( debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]), 795 thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All)) 796 ) 797 ). 798 799 800% Gets the next result from the goal thread in the communication thread queue, 801% and retracts query_in_progress/1 when the last result has been sent. 802% Find_All == true only returns one message, so delete query_in_progress 803% No matter what it is 804% \+ Find_All: There may be more than one result. The first one we hit with any exception 805% (note that no_more_results is also returned as an exception) means we are done 806get_next_result(Goal_Thread_ID, Stream, Options, Answers) :- 807 ( thread_property(Goal_Thread_ID, status(running)) 808 -> true 809 ; ( reply_error(Stream, connection_failed), 810 throw(connection_failed) 811 ) 812 ), 813 thread_self(Self_ID), 814 thread_get_message(Self_ID, result(Answers, Find_All), Options), 815 ( 816 -> ( debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]), 817 retractall(query_in_progress(Goal_Thread_ID)) 818 ) 819 ; ( Answers = error(_) 820 -> ( debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]), 821 retractall(query_in_progress(Goal_Thread_ID)) 822 ) 823 ; true 824 ) 825 ). 826 827 828% reply_with_result predicates are used to consistently return 829% answers for a query from either run() or run_async() 830reply_with_result(_, Stream, error(Error)) :- 831 !, 832 reply_error(Stream, Error). 833reply_with_result(_, Stream, Result) :- 834 !, 835 reply(Stream, Result). 836 837 838% Reply with a normal term 839% Convert term to an actual JSON string 840reply(Stream, Term) :- 841 debug(mqi(query), "Responding with Term: ~w", [Term]), 842 term_to_json_string(Term, Json_String), 843 write_message(Stream, Json_String). 844 845 846% Special handling for exceptions since they can have parts that are not 847% "serializable". Ensures they they are always returned in an exception/1 term 848reply_error(Stream, Error_Term) :- 849 ( error(Error_Value, _) = Error_Term 850 -> Response = exception(Error_Value) 851 ; ( atom(Error_Term) 852 -> 853 Response = exception(Error_Term) 854 ; ( compound_name_arity(Error_Term, Name, _), 855 Response = exception(Name) 856 ) 857 ) 858 ), 859 reply(Stream, Response). 860 861 862% Send and receive messages are simply strings preceded by their length + ".\n" 863% i.e. "<stringlength>.\n<string>" 864% The desired encoding must be set on the Stream before calling this predicate 865 866 867% Writes the next message. 868% Throws if there is an unexpected exception 869write_message(Stream, String) :- 870 write_string_length(Stream, String), 871 write(Stream, String), 872 flush_output(Stream). 873 874 875% Reads the next message. 876% Throws if there is an unexpected exception or thread has been requested to quit 877% the length passed must match the actual number of bytes in the stream 878% in whatever encoding is being used 879read_message(Stream, String) :- 880 read_string_length(Stream, Length), 881 read_string(Stream, Length, String). 882 883 884% Terminate with '.\n' so we know that's the end of the count 885write_string_length(Stream, String) :- 886 stream_property(Stream, encoding(Encoding)), 887 string_encoding_length(String, Encoding, Length), 888 format(Stream, "~d.\n", [Length]). 889 890 891% Note: read_term requires ".\n" after the length 892% ... but does not consume the "\n" 893read_string_length(Stream, Length) :- 894 read_term(Stream, Length, []), 895 get_char(Stream, _). 896 897 898% converts a string to Codes using Encoding 899string_encoding_length(String, Encoding, Length) :- 900 setup_call_cleanup( 901 open_null_stream(Out), 902 ( set_stream(Out, encoding(Encoding)), 903 write(Out, String), 904 byte_count(Out, Length) 905 ), 906 close(Out)). 907 908 909% Convert Prolog Term to a Prolog JSON term 910% Add a final \n so that using netcat to debug works well 911term_to_json_string(Term, Json_String) :- 912 term_to_json(Term, Json), 913 with_output_to(string(Json_String), 914 ( current_output(Stream), 915 json_write(Stream, Json), 916 put(Stream, '\n') 917 )). 918 919 920% Execute the goal as once() without binding any variables 921% and keep executing it until it returns false (or throws) 922repeat_until_false(Goal) :- 923 (\+ (\+ )), !, repeat_until_false(Goal). 924repeat_until_false(_). 925 926 927% Used to kill a thread in an "expected" way so it doesn't leave around traces in thread_property/2 afterwards 928% 929% If the thread is alive OR it was already aborted (expected cases) then attempt to join 930% the thread so that no warnings are sent to the console. Other cases leave the thread for debugging. 931% There are some fringe cases (like calling external code) 932% where the call might not return for a long time. Do a timeout for those cases. 933abortSilentExit(Thread_ID, Exception) :- 934 catch(thread_signal(Thread_ID, abort), error(Exception, _), true), 935 debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]). 936% Workaround SWI Prolog bug: https://github.com/SWI-Prolog/swipl-devel/issues/852 by not joining 937% The workaround just stops joining the aborted thread, so an inert record will be left if thread_property/2 is called. 938% , 939% ( once((var(Exception) ; catch(thread_property(Thread_ID, status(exception('$aborted'))), error(_, _), true))) 940% -> ( catch(call_with_time_limit(4, thread_join(Thread_ID)), error(JoinException1, JoinException2), true), 941% debug(mqi(protocol), "thread_join attempted because thread: ~w exit was expected, exception: ~w", [Thread_ID, error(JoinException1, JoinException2)]) 942% ) 943% ; true 944% ). 945 946 947% Detach a thread that exits with true or false so that it doesn't leave around a record in thread_property/2 afterwards 948% Don't detach a thread if it exits because of an exception so we can debug using thread_property/2 afterwards 949% 950% However, `abort` is an expected exception but detaching a thread that aborts will leave an unwanted 951% thread_property/2 record *and* print a message to the console. To work around this, 952% the goal thread is always aborted by the communication thread using abortSilentExit. 953detach_if_expected(Thread_ID) :- 954 thread_property(Thread_ID, status(Status)), 955 debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]), 956 ( once((Status = true ; Status = false)) 957 -> ( debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]), 958 thread_detach(Thread_ID) 959 ) 960 ; true 961 ). 962 963 964write_output_to_file(File) :- 965 debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]), 966 open(File, write, Stream, [buffer(false)]), 967 set_prolog_IO(user_input, Stream, Stream). 968 969 970% Creates a Unix Domain Socket file in a secured directory. 971% Throws if the directory or file cannot be created in /tmp for any reason 972% Requirements for this file are: 973% - The Prolog process will attempt to create and, if Prolog exits cleanly, 974% delete this file when the server closes. This means the directory 975% must have the appropriate permissions to allow the Prolog process 976% to do so. 977% - For security reasons, the filename should not be predictable and the 978% directory it is contained in should have permissions set so that files 979% created are only accessible to the current user. 980% - The path must be below 92 *bytes* long (including null terminator) to 981% be portable according to the Linux documentation 982% 983% tmp_file finds the right /tmp directory, even on Mac OS, so the path is small 984% Set 700 (rwx------) permission so it is only accessible by current user 985% Create a secure tmp file in the new directory 986% {set,current}_prolog_flag is copied to a thread, so no need to use a mutex. 987% Close the stream so sockets can use it 988unix_domain_socket_path(Created_Directory, File_Path) :- 989 tmp_file(udsock, Created_Directory), 990 make_directory(Created_Directory), 991 catch( chmod(Created_Directory, urwx), 992 Exception, 993 ( catch(delete_directory(Created_Directory), error(_, _), true), 994 throw(Exception) 995 ) 996 ), 997 setup_call_cleanup( ( current_prolog_flag(tmp_dir, Save_Tmp_Dir), 998 set_prolog_flag(tmp_dir, Created_Directory) 999 ), 1000 tmp_file_stream(File_Path, Stream, []), 1001 set_prolog_flag(tmp_dir, Save_Tmp_Dir) 1002 ), 1003 close(Stream). 1004 1005 1006% Helper for installing the mqi.pl file to the right 1007% library directory. 1008% Call using swipl -s mqi.pl -g "mqi:install_to_library('mqi.pl')" -t halt 1009install_to_library(File) :- 1010 once(find_library(Path)), 1011 copy_file(File, Path), 1012 make. 1013 1014 1015% Find the base library path, i.e. the one that ends in 1016% "library/" 1017find_library(Path) :- 1018 file_alias_path(library, Path), 1019 atomic_list_concat(Parts, '/', Path), 1020 reverse(Parts, Parts_Reverse), 1021 nth0(0, Parts_Reverse, ''), 1022 nth0(1, Parts_Reverse, Library), 1023 string_lower(Library, 'library')