33
34:- module(mqi,
35 [ mqi_start/0,
36 mqi_start/1, 37 mqi_stop/1 38 ]). 39
109:- use_module(library(socket)). 110:- use_module(library(http/json)). 111:- use_module(library(http/json_convert)). 112:- use_module(library(option)). 113:- use_module(library(term_to_json)). 114:- use_module(library(debug)). 115:- use_module(library(filesex)). 116:- use_module(library(gensym)). 117:- use_module(library(lists)). 118:- use_module(library(main)). 119:- use_module(library(make)). 120:- use_module(library(prolog_source)). 121:- use_module(library(time)). 122:- use_module(library(uuid)). 123
125:- dynamic(mqi_thread/3). 126
128:- dynamic(mqi_worker_threads/3). 129:- dynamic(mqi_socket/5). 130
134:- dynamic(query_in_progress/1). 135
138:- dynamic(safe_to_cancel/1). 139
140
144mqi_start(Options) :-
145 Encoding = utf8,
146 option(pending_connections(Connection_Count), Options, 5),
147 option(query_timeout(Query_Timeout), Options, -1),
148 option(port(Port), Options, _),
149 option(run_server_on_thread(Run_Server_On_Thread), Options, true),
150 option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
151 option(write_connection_values(Write_Connection_Values), Options, false),
152 option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
153 ( ( memberchk(unix_domain_socket(_), Options),
154 var(Unix_Domain_Socket_Path_And_File)
155 )
156 -> unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
157 ; true
158 ),
159 option(server_thread(Server_Thread_ID), Options, _),
160 ( var(Server_Thread_ID)
161 -> gensym(mqi, Server_Thread_ID)
162 ; true
163 ),
164 option(password(Password), Options, _),
165 ( var(Password)
166 -> ( current_prolog_flag(bounded, false)
167 -> uuid(UUID, [format(integer)])
168 ; UUID is random(1<<62)
169 ),
170 format(string(Password), '~d', [UUID])
171 ; true
172 ),
173 string_concat(Password, '.\n', Final_Password),
174 bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
175 send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
176 option(write_output_to_file(File), Options, _),
177 ( var(File)
178 -> true
179 ; write_output_to_file(File)
180 ),
181 Server_Goal = (
182 catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
183 debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
184 ),
185 start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
186
187opt_type(port, port, natural).
188opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
189opt_type(unix_domain_socket, unix_domain_socket, file(write)).
190opt_type(password, password, string).
191opt_type(pending_connections, pending_connections, nonneg).
192opt_type(query_timeout, query_timeout, float).
193opt_type(run_server_on_thread, run_server_on_thread, boolean).
194opt_type(exit_main_on_failure, exit_main_on_failure, boolean).
195opt_type(write_connection_values, write_connection_values, boolean).
196opt_type(write_output_to_file, write_output_to_file, file(write)).
197
198opt_help(port, "TCP/IP port for clients to connect to").
199opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
200opt_help(unix_domain_socket, "File path for the Unix domin socket").
201opt_help(password, "Connection password").
202opt_help(pending_connections, "Max number of queued connections (5)").
203opt_help(query_timeout, "Max query runtime in seconds (default infinite)").
204opt_help(run_server_on_thread, "Run server in a background thread (true)").
205opt_help(exit_main_on_failure, "Exit the process on a failure").
206opt_help(write_connection_values, "Print info for clients to connect").
207opt_help(write_output_to_file, "Write stdout and stderr to file").
208
245
246
254mqi_start :-
255 current_prolog_flag(argv, Argv),
256 argv_options(Argv, _Args, Options),
257 merge_options(Options, [exit_main_on_failure(true)], Options1),
258 select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
259 ( Create_Unix_Domain_Socket == true
260 -> merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
261 ; FinalOptions = Options2
262 ),
263 option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
264 ( Run_Server_On_Thread == true
265 -> true
266 ; throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
267 ),
268 mqi_start(FinalOptions),
269 on_signal(int, _, quit),
270 thread_get_message(quit_mqi).
271
272
273quit(_) :-
274 thread_send_message(main, quit_mqi).
275
276
282
287mqi_stop(Server_Thread_ID) :-
288 289 forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
290 (
291 debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
292 catch(tcp_close_socket(Socket), Socket_Exception, true),
293 abortSilentExit(Server_Thread_ID, Server_Thread_Exception),
294 debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception])
295 )),
296 forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
297 (
298 abortSilentExit(Communication_Thread_ID, CommunicationException),
299 debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]),
300 abortSilentExit(Goal_Thread_ID, Goal_Exception),
301 debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception])
302 )).
303
304
305start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
306 ( Run_Server_On_Thread
307 -> ( thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
308 at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
309 detach_if_expected(Server_Thread_ID)
310 ))
311 ]),
312 debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
313 )
314 ; ( Server_Goal,
315 delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
316 debug(mqi(protocol), "Halting.", [])
317 )
318 ).
319
320
324delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
325 ( nonvar(Unix_Domain_Socket_Path)
326 -> catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
327 ; ( nonvar(Unix_Domain_Socket_Path_And_File)
328 -> catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
329 ; true
330 )
331 ).
332
333:- if(current_predicate(unix_domain_socket/1)). 334 optional_unix_domain_socket(Socket) :-
335 unix_domain_socket(Socket).
336:- else. 337 optional_unix_domain_socket(_).
338:- endif. 339
342bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
343 ( nonvar(Unix_Domain_Socket_Path_And_File)
344 -> debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
345 optional_unix_domain_socket(Socket),
346 catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
347 tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
348 Client_Address = Unix_Domain_Socket_Path_And_File
349 ; ( tcp_socket(Socket),
350 tcp_setopt(Socket, reuseaddr),
351 tcp_bind(Socket, '127.0.0.1':Port),
352 debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
353 Client_Address = Port
354 )
355 ),
356 assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
357
360send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
361 ( Write_Connection_Values
362 -> ( ( var(Unix_Domain_Socket_Path_And_File)
363 -> format(Stream, "~d\n", [Port])
364 ; format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
365 ),
366 format(Stream, "~w\n", [Password]),
367 flush_output(Stream)
368 )
369 ; true
370 ).
371
372
376server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
377 debug(mqi(protocol), "Listening on address: ~w", [Address]),
378 tcp_listen(Socket, Connection_Count),
379 tcp_open_socket(Socket, AcceptFd, _),
380 create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
381 server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
382
383
388create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
389 debug(mqi(protocol), "Waiting for client connection...", []),
390 tcp_accept(AcceptFd, Socket, _Peer),
391 debug(mqi(protocol), "Client connected", []),
392 gensym('conn', Connection_Base),
393 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
394 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
395 mutex_create(Goal_Alias, [alias(Goal_Alias)]),
396 assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
397 thread_create(goal_thread(Thread_Alias),
398 _,
399 [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
400 thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
401 _,
402 [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
403
404
412goal_thread(Respond_To_Thread_ID) :-
413 thread_self(Self_ID),
414 throw_if_testing(Self_ID),
415 thread_get_message(Self_ID, goal(Goal, Binding_List, Query_Timeout, Find_All)),
416 debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Goal]),
417 ( Find_All
418 -> One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
419 ;
420 One_Answer_Goal = ( @(user:Goal, user),
421 Answers = [Binding_List],
422 send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
423 )
424 ),
425 All_Answers_Goal = run_cancellable_goal(Self_ID, findall(Answers, One_Answer_Goal, [Find_All_Answers | _])),
426 ( Query_Timeout == -1
427 -> catch(All_Answers_Goal, Top_Exception, true)
428 ; catch(call_with_time_limit(Query_Timeout, All_Answers_Goal), Top_Exception, true)
429 ),
430 (
431 var(Top_Exception)
432 -> (
433 Find_All
434 ->
435 send_next_result(Respond_To_Thread_ID, Find_All_Answers, _, Find_All)
436 ;
437 send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
438 )
439 ;
440 send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
441 ),
442 goal_thread(Respond_To_Thread_ID).
443
444
446throw_if_testing(Self_ID) :-
447 ( thread_peek_message(Self_ID, testThrow(Test_Exception))
448 -> ( debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
449 throw(Test_Exception)
450 )
451 ; true
452 ).
453
454
462run_cancellable_goal(Mutex_ID, Goal) :-
463 thread_self(Self_ID),
464 setup_call_cleanup(
465 assert(safe_to_cancel(Self_ID), Assertion),
466 Goal,
467 with_mutex(Mutex_ID, erase(Assertion))
468 ).
469
470
481communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
482 thread_self(Self_ID),
483 ( (
484 catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true),
485 debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]),
486 abortSilentExit(Goal_Thread_ID, _),
487 retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
488 )
489 -> Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
490 ; Halt = true
491 ),
492 ( Halt
493 -> ( debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]),
494 quit(_)
495 )
496 ; ( debug(mqi(protocol), "Ending session ~w", [Self_ID]),
497 catch(tcp_close_socket(Socket), error(_, _), true)
498 )
499 ).
500
501
506communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
507 tcp_open_socket(Socket, Read_Stream, Write_Stream),
508 thread_self(Communication_Thread_ID),
509 assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
510 set_stream(Read_Stream, encoding(Encoding)),
511 set_stream(Write_Stream, encoding(Encoding)),
512 read_message(Read_Stream, Sent_Password),
513 ( Password == Sent_Password
514 -> ( debug(mqi(protocol), "Password matched.", []),
515 thread_self(Self_ID),
516 reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID)]]))
517 )
518 ; ( debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
519 reply_error(Write_Stream, password_mismatch),
520 throw(password_mismatch)
521 )
522 ),
523 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
524 debug(mqi(protocol), "Session finished.", []).
525
526
541process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
542 process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
543 ( Command == close
544 -> ( debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
545 true
546 )
547 ; ( Command == quit
548 -> ( debug(mqi(protocol), "Command: quit.", []),
549 false
550 )
551 ;
552 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
553 )
554 ).
555
573process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
574 debug(mqi(protocol), "Waiting for next message ...", []),
575 ( state_receive_raw_message(Read_Stream, Message_String)
576 -> ( state_parse_command(Write_Stream, Message_String, Command, Binding_List)
577 -> state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
578 ; true
579 )
580 ; false
581 ).
582
583
588state_receive_raw_message(Read, Command_String) :-
589 read_message(Read, Command_String),
590 debug(mqi(protocol), "Valid message: ~w", [Command_String]).
591
592
602state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
603 ( catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
604 -> ( var(Parse_Exception)
605 -> debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
606 ; ( reply_error(Write_Stream, Parse_Exception),
607 fail
608 )
609 )
610 ; ( reply_error(Write_Stream, error(couldNotParseCommand, _)),
611 fail
612 )
613 ).
614
615
627state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
628 !,
629 debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
630 repeat_until_false((
631 query_in_progress(Goal_Thread_ID),
632 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
633 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
634 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
635 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer])
636 )),
637 debug(mqi(protocol), "All previous results drained", []),
638 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
639 heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
640 reply_with_result(Goal_Thread_ID, Stream, Answers).
641
642
645state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
646 !,
647 debug(mqi(protocol), "Command: run_async/1.", []),
648 debug(mqi(query), " Goal: ~w", [Goal]),
649 repeat_until_false((
650 query_in_progress(Goal_Thread_ID),
651 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
652 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
653 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
654 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer])
655 )),
656 debug(mqi(protocol), "All previous results drained", []),
657 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
658 reply(Stream, true([[]])).
659
660
662state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
663 !,
664 debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
665 ( once((var(Timeout) ; Timeout == -1))
666 -> Options = []
667 ; Options = [timeout(Timeout)]
668 ),
669 ( query_in_progress(Goal_Thread_ID)
670 -> ( ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
671 get_next_result(Goal_Thread_ID, Stream, Options, Result)
672 )
673 -> reply_with_result(Goal_Thread_ID, Stream, Result)
674 ; reply_error(Stream, result_not_available)
675 )
676 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
677 reply_error(Stream, no_query)
678 )
679 ).
680
681
688state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
689 !,
690 debug(mqi(protocol), "Command: cancel_async/0.", []),
691 with_mutex(Goal_Thread_ID, (
692 ( safe_to_cancel(Goal_Thread_ID)
693 -> ( thread_signal(Goal_Thread_ID, throw(cancel_goal)),
694 reply(Stream, true([[]]))
695 )
696 ; ( query_in_progress(Goal_Thread_ID)
697 -> ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
698 reply(Stream, true([[]]))
699 )
700 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
701 reply_error(Stream, no_query)
702 )
703 )
704 )
705 )).
706
707
711state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
712 !,
713 debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
714 thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
715 state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
716
717
718state_process_command(Stream, _, _, close, _) :-
719 !,
720 reply(Stream, true([[]])).
721
722
723state_process_command(Stream, _, _, quit, _) :-
724 !,
725 reply(Stream, true([[]])).
726
727
729state_process_command(Stream, _, _, Command, _) :-
730 debug(mqi(protocol), "Unknown command ~w", [Command]),
731 reply_error(Stream, unknownCommand).
732
733
739heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
740 ( get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
741 -> debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
742 ; ( debug(mqi(protocol), "heartbeat...", []),
743 write_heartbeat(Stream),
744 heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
745 )
746 ).
747
748
752write_heartbeat(Stream) :-
753 put_char(Stream, '.'),
754 flush_output(Stream).
755
756
765send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
766 ( var(Timeout)
767 -> Timeout = Default_Timeout
768 ; true
769 ),
770 ( var(Binding_List)
771 -> Binding_List = []
772 ; true
773 ),
774 debug(mqi(query), "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
775 assert(query_in_progress(Goal_Thread_ID)),
776 catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
777 ( var(Send_Message_Exception)
778 -> true
779 ; ( reply_error(Stream, connection_failed),
780 throw(Send_Message_Exception)
781 )
782 ).
783
784
786send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
787 ( var(Exception_In_Goal)
788 -> ( ( debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
789 Answer == []
790 )
791 -> thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
792 ; thread_send_message(Respond_To_Thread_ID, result(true(Answer), Find_All))
793 )
794 ; ( debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
795 thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
796 )
797 ).
798
799
806get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
807 ( thread_property(Goal_Thread_ID, status(running))
808 -> true
809 ; ( reply_error(Stream, connection_failed),
810 throw(connection_failed)
811 )
812 ),
813 thread_self(Self_ID),
814 thread_get_message(Self_ID, result(Answers, Find_All), Options),
815 ( Find_All
816 -> ( debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
817 retractall(query_in_progress(Goal_Thread_ID))
818 )
819 ; ( Answers = error(_)
820 -> ( debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
821 retractall(query_in_progress(Goal_Thread_ID))
822 )
823 ; true
824 )
825 ).
826
827
830reply_with_result(_, Stream, error(Error)) :-
831 !,
832 reply_error(Stream, Error).
833reply_with_result(_, Stream, Result) :-
834 !,
835 reply(Stream, Result).
836
837
840reply(Stream, Term) :-
841 debug(mqi(query), "Responding with Term: ~w", [Term]),
842 term_to_json_string(Term, Json_String),
843 write_message(Stream, Json_String).
844
845
848reply_error(Stream, Error_Term) :-
849 ( error(Error_Value, _) = Error_Term
850 -> Response = exception(Error_Value)
851 ; ( atom(Error_Term)
852 ->
853 Response = exception(Error_Term)
854 ; ( compound_name_arity(Error_Term, Name, _),
855 Response = exception(Name)
856 )
857 )
858 ),
859 reply(Stream, Response).
860
861
865
866
869write_message(Stream, String) :-
870 write_string_length(Stream, String),
871 write(Stream, String),
872 flush_output(Stream).
873
874
879read_message(Stream, String) :-
880 read_string_length(Stream, Length),
881 read_string(Stream, Length, String).
882
883
885write_string_length(Stream, String) :-
886 stream_property(Stream, encoding(Encoding)),
887 string_encoding_length(String, Encoding, Length),
888 format(Stream, "~d.\n", [Length]).
889
890
893read_string_length(Stream, Length) :-
894 read_term(Stream, Length, []),
895 get_char(Stream, _).
896
897
899string_encoding_length(String, Encoding, Length) :-
900 setup_call_cleanup(
901 open_null_stream(Out),
902 ( set_stream(Out, encoding(Encoding)),
903 write(Out, String),
904 byte_count(Out, Length)
905 ),
906 close(Out)).
907
908
911term_to_json_string(Term, Json_String) :-
912 term_to_json(Term, Json),
913 with_output_to(string(Json_String),
914 ( current_output(Stream),
915 json_write(Stream, Json),
916 put(Stream, '\n')
917 )).
918
919
922repeat_until_false(Goal) :-
923 (\+ (\+ Goal)), !, repeat_until_false(Goal).
924repeat_until_false(_).
925
926
933abortSilentExit(Thread_ID, Exception) :-
934 catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
935 debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
945
946
953detach_if_expected(Thread_ID) :-
954 thread_property(Thread_ID, status(Status)),
955 debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
956 ( once((Status = true ; Status = false))
957 -> ( debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
958 thread_detach(Thread_ID)
959 )
960 ; true
961 ).
962
963
964write_output_to_file(File) :-
965 debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
966 open(File, write, Stream, [buffer(false)]),
967 set_prolog_IO(user_input, Stream, Stream).
968
969
988unix_domain_socket_path(Created_Directory, File_Path) :-
989 tmp_file(udsock, Created_Directory),
990 make_directory(Created_Directory),
991 catch( chmod(Created_Directory, urwx),
992 Exception,
993 ( catch(delete_directory(Created_Directory), error(_, _), true),
994 throw(Exception)
995 )
996 ),
997 setup_call_cleanup( ( current_prolog_flag(tmp_dir, Save_Tmp_Dir),
998 set_prolog_flag(tmp_dir, Created_Directory)
999 ),
1000 tmp_file_stream(File_Path, Stream, []),
1001 set_prolog_flag(tmp_dir, Save_Tmp_Dir)
1002 ),
1003 close(Stream).
1004
1005
1009install_to_library(File) :-
1010 once(find_library(Path)),
1011 copy_file(File, Path),
1012 make.
1013
1014
1017find_library(Path) :-
1018 file_alias_path(library, Path),
1019 atomic_list_concat(Parts, '/', Path),
1020 reverse(Parts, Parts_Reverse),
1021 nth0(0, Parts_Reverse, ''),
1022 nth0(1, Parts_Reverse, Library),
1023 string_lower(Library, 'library')