36
37:- module(thread_httpd,
38 [ http_current_server/2, 39 http_server_property/2, 40 http_server/2, 41 http_workers/2, 42 http_add_worker/2, 43 http_current_worker/2, 44 http_stop_server/2, 45 http_spawn/2, 46
47 http_requeue/1, 48 http_close_connection/1, 49 http_enough_workers/3 50 ]). 51:- use_module(library(debug)). 52:- use_module(library(error)). 53:- use_module(library(option)). 54:- use_module(library(socket)). 55:- use_module(library(thread_pool)). 56:- use_module(library(gensym)). 57:- use_module(http_wrapper). 58:- use_module(http_path). 59
60:- autoload(library(uri), [uri_resolve/3]). 61
62:- predicate_options(http_server/2, 2,
63 [ port(any),
64 unix_socket(atom),
65 entry_page(atom),
66 tcp_socket(any),
67 workers(positive_integer),
68 timeout(number),
69 keep_alive_timeout(number),
70 silent(boolean),
71 ssl(list(any)), 72 pass_to(system:thread_create/3, 3)
73 ]). 74:- predicate_options(http_spawn/2, 2,
75 [ pool(atom),
76 pass_to(system:thread_create/3, 3),
77 pass_to(thread_pool:thread_create_in_pool/4, 4)
78 ]). 79:- predicate_options(http_add_worker/2, 2,
80 [ timeout(number),
81 keep_alive_timeout(number),
82 max_idle_time(number),
83 pass_to(system:thread_create/3, 3)
84 ]). 85
111
112:- meta_predicate
113 http_server(1, :),
114 http_current_server(1, ?),
115 http_spawn(0, +). 116
117:- dynamic
118 current_server/6, 119 queue_worker/2, 120 queue_options/2. 121
122:- multifile
123 make_socket_hook/3,
124 accept_hook/2,
125 close_hook/1,
126 open_client_hook/6,
127 http:create_pool/1,
128 http:schedule_workers/1. 129
130:- meta_predicate
131 thread_repeat_wait(0). 132
195
196http_server(Goal, M:Options0) :-
197 server_address(Address, Options0),
198 !,
199 make_socket(Address, M:Options0, Options),
200 create_workers(Options),
201 create_server(Goal, Address, Options),
202 ( option(silent(true), Options0)
203 -> true
204 ; print_message(informational,
205 httpd_started_server(Address, Options0))
206 ).
207http_server(_Goal, _:Options0) :-
208 existence_error(server_address, Options0).
209
210server_address(Address, Options) :-
211 ( option(port(Port), Options)
212 -> Address = Port
213 ; option(unix_socket(Path), Options)
214 -> Address = unix_socket(Path)
215 ).
216
217address_port(_IFace:Port, Port) :- !.
218address_port(unix_socket(Path), Path) :- !.
219address_port(Address, Address) :- !.
220
221tcp_address(Port) :-
222 var(Port),
223 !.
224tcp_address(Port) :-
225 integer(Port),
226 !.
227tcp_address(_Iface:_Port).
228
236
237make_socket(Address, M:Options0, Options) :-
238 tcp_address(Address),
239 make_socket_hook(Address, M:Options0, Options),
240 !.
241make_socket(Address, _:Options0, Options) :-
242 option(tcp_socket(_), Options0),
243 !,
244 make_addr_atom('httpd', Address, Queue),
245 Options = [ queue(Queue)
246 | Options0
247 ].
248make_socket(Address, _:Options0, Options) :-
249 tcp_address(Address),
250 !,
251 tcp_socket(Socket),
252 tcp_setopt(Socket, reuseaddr),
253 tcp_bind(Socket, Address),
254 tcp_listen(Socket, 64),
255 make_addr_atom('httpd', Address, Queue),
256 Options = [ queue(Queue),
257 tcp_socket(Socket)
258 | Options0
259 ].
260:- if(current_predicate(unix_domain_socket/1)). 261make_socket(Address, _:Options0, Options) :-
262 Address = unix_socket(Path),
263 !,
264 unix_domain_socket(Socket),
265 tcp_bind(Socket, Path),
266 tcp_listen(Socket, 64),
267 make_addr_atom('httpd', Address, Queue),
268 Options = [ queue(Queue),
269 tcp_socket(Socket)
270 | Options0
271 ].
272:- endif. 273
278
279make_addr_atom(Scheme, Address, Atom) :-
280 phrase(address_parts(Address), Parts),
281 atomic_list_concat([Scheme,@|Parts], Atom).
282
283address_parts(Atomic) -->
284 { atomic(Atomic) },
285 !,
286 [Atomic].
287address_parts(Host:Port) -->
288 !,
289 address_parts(Host), [:], address_parts(Port).
290address_parts(ip(A,B,C,D)) -->
291 !,
292 [ A, '.', B, '.', C, '.', D ].
293address_parts(unix_socket(Path)) -->
294 [Path].
295
296
301
302create_server(Goal, Address, Options) :-
303 get_time(StartTime),
304 memberchk(queue(Queue), Options),
305 scheme(Scheme, Options),
306 autoload_https(Scheme),
307 address_port(Address, Port),
308 make_addr_atom(Scheme, Port, Alias),
309 thread_self(Initiator),
310 thread_create(accept_server(Goal, Initiator, Options), _,
311 [ alias(Alias)
312 ]),
313 thread_get_message(server_started),
314 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
315
316scheme(Scheme, Options) :-
317 option(scheme(Scheme), Options),
318 !.
319scheme(Scheme, Options) :-
320 ( option(ssl(_), Options)
321 ; option(ssl_instance(_), Options)
322 ),
323 !,
324 Scheme = https.
325scheme(http, _).
326
327autoload_https(https) :-
328 \+ clause(accept_hook(_Goal, _Options), _),
329 exists_source(library(http/http_ssl_plugin)),
330 !,
331 use_module(library(http/http_ssl_plugin)).
332autoload_https(_).
333
339
340http_current_server(Goal, Port) :-
341 current_server(Port, Goal, _, _, _, _).
342
343
356
357http_server_property(_:Port, Property) :-
358 integer(Port),
359 !,
360 server_property(Property, Port).
361http_server_property(Port, Property) :-
362 server_property(Property, Port).
363
364server_property(goal(Goal), Port) :-
365 current_server(Port, Goal, _, _, _, _).
366server_property(scheme(Scheme), Port) :-
367 current_server(Port, _, _, _, Scheme, _).
368server_property(start_time(Time), Port) :-
369 current_server(Port, _, _, _, _, Time).
370
371
378
379http_workers(Port, Workers) :-
380 must_be(ground, Port),
381 current_server(Port, _, _, Queue, _, _),
382 !,
383 ( integer(Workers)
384 -> resize_pool(Queue, Workers)
385 ; findall(W, queue_worker(Queue, W), WorkerIDs),
386 length(WorkerIDs, Workers)
387 ).
388http_workers(Port, _) :-
389 existence_error(http_server, Port).
390
391
401
402http_add_worker(Port, Options) :-
403 must_be(ground, Port),
404 current_server(Port, _, _, Queue, _, _),
405 !,
406 queue_options(Queue, QueueOptions),
407 merge_options(Options, QueueOptions, WorkerOptions),
408 atom_concat(Queue, '_', AliasBase),
409 create_workers(1, 1, Queue, AliasBase, WorkerOptions).
410http_add_worker(Port, _) :-
411 existence_error(http_server, Port).
412
413
420
421http_current_worker(Port, ThreadID) :-
422 current_server(Port, _, _, Queue, _, _),
423 queue_worker(Queue, ThreadID).
424
425
430
431accept_server(Goal, Initiator, Options) :-
432 catch(accept_server2(Goal, Initiator, Options), http_stop, true),
433 thread_self(Thread),
434 retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
435 close_server_socket(Options).
436
437accept_server2(Goal, Initiator, Options) :-
438 thread_send_message(Initiator, server_started),
439 repeat,
440 ( catch(accept_server3(Goal, Options), E, true)
441 -> ( var(E)
442 -> fail
443 ; accept_rethrow_error(E)
444 -> throw(E)
445 ; print_message(error, E),
446 fail
447 )
448 ; print_message(error, 449 goal_failed(accept_server3(Goal, Options))),
450 fail
451 ).
452
453accept_server3(Goal, Options) :-
454 accept_hook(Goal, Options),
455 !.
456accept_server3(Goal, Options) :-
457 memberchk(tcp_socket(Socket), Options),
458 memberchk(queue(Queue), Options),
459 debug(http(connection), 'Waiting for connection', []),
460 tcp_accept(Socket, Client, Peer),
461 debug(http(connection), 'New HTTP connection from ~p', [Peer]),
462 thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
463 http_enough_workers(Queue, accept, Peer).
464
465accept_rethrow_error(http_stop).
466accept_rethrow_error('$aborted').
467
468
472
473close_server_socket(Options) :-
474 close_hook(Options),
475 !.
476close_server_socket(Options) :-
477 memberchk(tcp_socket(Socket), Options),
478 !,
479 tcp_close_socket(Socket).
480
481
488
489http_stop_server(Host:Port, Options) :- 490 ground(Host),
491 !,
492 http_stop_server(Port, Options).
493http_stop_server(Port, _Options) :-
494 http_workers(Port, 0), 495 current_server(Port, _, Thread, Queue, _Scheme, _Start),
496 retractall(queue_options(Queue, _)),
497 thread_signal(Thread, throw(http_stop)),
498 catch(connect(localhost:Port), _, true),
499 thread_join(Thread, _),
500 message_queue_destroy(Queue).
501
502connect(Address) :-
503 setup_call_cleanup(
504 tcp_socket(Socket),
505 tcp_connect(Socket, Address),
506 tcp_close_socket(Socket)).
507
513
514http_enough_workers(Queue, _Why, _Peer) :-
515 message_queue_property(Queue, waiting(_0)),
516 !,
517 debug(http(scheduler), '~D waiting for work; ok', [_0]).
518http_enough_workers(Queue, Why, Peer) :-
519 message_queue_property(Queue, size(Size)),
520 ( enough(Size, Why)
521 -> debug(http(scheduler), '~D in queue; ok', [Size])
522 ; current_server(Port, _, _, Queue, _, _),
523 Data = _{ port:Port,
524 reason:Why,
525 peer:Peer,
526 waiting:Size,
527 queue:Queue
528 },
529 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
530 catch(http:schedule_workers(Data),
531 Error,
532 print_message(error, Error))
533 -> true
534 ; true
535 ).
536
537enough(0, _).
538enough(1, keep_alive). 539
540
566
567
568 571
576
577create_workers(Options) :-
578 option(workers(N), Options, 5),
579 option(queue(Queue), Options),
580 catch(message_queue_create(Queue), _, true),
581 atom_concat(Queue, '_', AliasBase),
582 create_workers(1, N, Queue, AliasBase, Options),
583 assert(queue_options(Queue, Options)).
584
585create_workers(I, N, _, _, _) :-
586 I > N,
587 !.
588create_workers(I, N, Queue, AliasBase, Options) :-
589 gensym(AliasBase, Alias),
590 thread_create(http_worker(Options), Id,
591 [ alias(Alias)
592 | Options
593 ]),
594 assertz(queue_worker(Queue, Id)),
595 I2 is I + 1,
596 create_workers(I2, N, Queue, AliasBase, Options).
597
598
603
604resize_pool(Queue, Size) :-
605 findall(W, queue_worker(Queue, W), Workers),
606 length(Workers, Now),
607 ( Now < Size
608 -> queue_options(Queue, Options),
609 atom_concat(Queue, '_', AliasBase),
610 I0 is Now+1,
611 create_workers(I0, Size, Queue, AliasBase, Options)
612 ; Now == Size
613 -> true
614 ; Now > Size
615 -> Excess is Now - Size,
616 thread_self(Me),
617 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
618 forall(between(1, Excess, _), thread_get_message(quitted(_)))
619 ).
620
621
629
630http_worker(Options) :-
631 debug(http(scheduler), 'New worker', []),
632 prolog_listen(this_thread_exit, done_worker),
633 option(queue(Queue), Options),
634 option(max_idle_time(MaxIdle), Options, infinite),
635 thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
636 debug(http(worker), 'Waiting for a job ...', []),
637 debug(http(worker), 'Got job ~p', [Message]),
638 ( Message = quit(Sender)
639 -> !,
640 thread_self(Self),
641 thread_detach(Self),
642 ( Sender == idle
643 -> true
644 ; retract(queue_worker(Queue, Self)),
645 thread_send_message(Sender, quitted(Self))
646 )
647 ; open_client(Message, Queue, Goal, In, Out,
648 Options, ClientOptions),
649 ( catch(http_process(Goal, In, Out, ClientOptions),
650 Error, true)
651 -> true
652 ; Error = goal_failed(http_process/4)
653 ),
654 ( var(Error)
655 -> fail
656 ; current_message_level(Error, Level),
657 print_message(Level, Error),
658 memberchk(peer(Peer), ClientOptions),
659 close_connection(Peer, In, Out),
660 fail
661 )
662 ).
663
664get_work(Queue, Message, infinite) :-
665 !,
666 thread_get_message(Queue, Message).
667get_work(Queue, Message, MaxIdle) :-
668 ( thread_get_message(Queue, Message, [timeout(MaxIdle)])
669 -> true
670 ; Message = quit(idle)
671 ).
672
673
679
680open_client(requeue(In, Out, Goal, ClOpts),
681 _, Goal, In, Out, Opts, ClOpts) :-
682 !,
683 memberchk(peer(Peer), ClOpts),
684 option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
685 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
686open_client(Message, Queue, Goal, In, Out, Opts,
687 [ pool(client(Queue, Goal, In, Out)),
688 timeout(Timeout)
689 | Options
690 ]) :-
691 catch(open_client(Message, Goal, In, Out, Options, Opts),
692 E, report_error(E)),
693 option(timeout(Timeout), Opts, 60),
694 ( debugging(http(connection))
695 -> memberchk(peer(Peer), Options),
696 debug(http(connection), 'Opened connection from ~p', [Peer])
697 ; true
698 ).
699
700
703
704open_client(Message, Goal, In, Out, ClientOptions, Options) :-
705 open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
706 !.
707open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
708 [ peer(Peer),
709 protocol(http)
710 ], _) :-
711 tcp_open_socket(Socket, In, Out).
712
713report_error(E) :-
714 print_message(error, E),
715 fail.
716
717
723
724check_keep_alive_connection(In, TMO, Peer, In, Out) :-
725 stream_property(In, timeout(Old)),
726 set_stream(In, timeout(TMO)),
727 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
728 catch(peek_code(In, Code), E, true),
729 ( var(E), 730 Code \== -1 731 -> set_stream(In, timeout(Old)),
732 debug(http(keep_alive), '\tre-using keep-alive connection', [])
733 ; ( Code == -1
734 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
735 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
736 ),
737 close_connection(Peer, In, Out),
738 fail
739 ).
740
741
747
748done_worker :-
749 thread_self(Self),
750 thread_detach(Self),
751 retract(queue_worker(Queue, Self)),
752 thread_property(Self, status(Status)),
753 !,
754 ( catch(recreate_worker(Status, Queue), _, fail)
755 -> print_message(informational,
756 httpd_restarted_worker(Self))
757 ; done_status_message_level(Status, Level),
758 print_message(Level,
759 httpd_stopped_worker(Self, Status))
760 ).
761done_worker :- 762 thread_self(Self),
763 thread_property(Self, status(Status)),
764 done_status_message_level(Status, Level),
765 print_message(Level,
766 httpd_stopped_worker(Self, Status)).
767
768done_status_message_level(true, silent) :- !.
769done_status_message_level(exception('$aborted'), silent) :- !.
770done_status_message_level(_, informational).
771
772
784
785recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
786 halt(2).
787recreate_worker(exception(Error), Queue) :-
788 recreate_on_error(Error),
789 queue_options(Queue, Options),
790 atom_concat(Queue, '_', AliasBase),
791 create_workers(1, 1, Queue, AliasBase, Options).
792
793recreate_on_error('$aborted').
794recreate_on_error(time_limit_exceeded).
795
802
803:- multifile
804 message_level/2. 805
806message_level(error(io_error(read, _), _), silent).
807message_level(error(socket_error(epipe,_), _), silent).
808message_level(error(http_write_short(_Obj,_Written), _), silent).
809message_level(error(timeout_error(read, _), _), informational).
810message_level(keep_alive_timeout, silent).
811
812current_message_level(Term, Level) :-
813 ( message_level(Term, Level)
814 -> true
815 ; Level = error
816 ).
817
818
823
824http_requeue(Header) :-
825 requeue_header(Header, ClientOptions),
826 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
827 memberchk(peer(Peer), ClientOptions),
828 http_enough_workers(Queue, keep_alive, Peer),
829 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
830 !.
831http_requeue(Header) :-
832 debug(http(error), 'Re-queue failed: ~p', [Header]),
833 fail.
834
([], []).
836requeue_header([H|T0], [H|T]) :-
837 requeue_keep(H),
838 !,
839 requeue_header(T0, T).
840requeue_header([_|T0], T) :-
841 requeue_header(T0, T).
842
843requeue_keep(pool(_)).
844requeue_keep(peer(_)).
845requeue_keep(protocol(_)).
846
847
851
852http_process(Goal, In, Out, Options) :-
853 debug(http(server), 'Running server goal ~p on ~p -> ~p',
854 [Goal, In, Out]),
855 option(timeout(TMO), Options, 60),
856 set_stream(In, timeout(TMO)),
857 set_stream(Out, timeout(TMO)),
858 http_wrapper(Goal, In, Out, Connection,
859 [ request(Request)
860 | Options
861 ]),
862 next(Connection, Request).
863
864next(Connection, Request) :-
865 next_(Connection, Request), !.
866next(Connection, Request) :-
867 print_message(warning, goal_failed(next(Connection,Request))).
868
869next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
870 !,
871 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
872 ( catch(call(SwitchGoal, In, Out), E,
873 ( print_message(error, E),
874 fail))
875 -> true
876 ; http_close_connection(Request)
877 ).
878next_(spawned(ThreadId), _) :-
879 !,
880 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
881next_(Connection, Request) :-
882 downcase_atom(Connection, 'keep-alive'),
883 http_requeue(Request),
884 !.
885next_(_, Request) :-
886 http_close_connection(Request).
887
888
892
893http_close_connection(Request) :-
894 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
895 memberchk(peer(Peer), Request),
896 close_connection(Peer, In, Out).
897
902
903close_connection(Peer, In, Out) :-
904 debug(http(connection), 'Closing connection from ~p', [Peer]),
905 catch(close(In, [force(true)]), _, true),
906 catch(close(Out, [force(true)]), _, true).
907
923
924http_spawn(Goal, Options) :-
925 select_option(pool(Pool), Options, ThreadOptions),
926 !,
927 current_output(CGI),
928 catch(thread_create_in_pool(Pool,
929 wrap_spawned(CGI, Goal), Id,
930 [ detached(true)
931 | ThreadOptions
932 ]),
933 Error,
934 true),
935 ( var(Error)
936 -> http_spawned(Id)
937 ; Error = error(resource_error(threads_in_pool(_)), _)
938 -> throw(http_reply(busy))
939 ; Error = error(existence_error(thread_pool, Pool), _),
940 create_pool(Pool)
941 -> http_spawn(Goal, Options)
942 ; throw(Error)
943 ).
944http_spawn(Goal, Options) :-
945 current_output(CGI),
946 thread_create(wrap_spawned(CGI, Goal), Id,
947 [ detached(true)
948 | Options
949 ]),
950 http_spawned(Id).
951
952wrap_spawned(CGI, Goal) :-
953 set_output(CGI),
954 http_wrap_spawned(Goal, Request, Connection),
955 next(Connection, Request).
956
964
965create_pool(Pool) :-
966 E = error(permission_error(create, thread_pool, Pool), _),
967 catch(http:create_pool(Pool), E, true).
968create_pool(Pool) :-
969 print_message(informational, httpd(created_pool(Pool))),
970 thread_pool_create(Pool, 10, []).
971
972
973 976
977:- meta_predicate
978 thread_repeat_wait(0). 979
984
985thread_repeat_wait(Goal) :-
986 new_rate_mma(5, 1000, State),
987 repeat,
988 update_rate_mma(State, MMA),
989 long(MMA, IsLong),
990 ( IsLong == brief
991 -> call(Goal)
992 ; thread_idle(Goal, IsLong)
993 ).
994
995long(MMA, brief) :-
996 MMA < 0.05,
997 !.
998long(MMA, short) :-
999 MMA < 1,
1000 !.
1001long(_, long).
1002
1014
1015new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
1016 current_prolog_flag(max_tagged_integer, MaxI),
1017 get_time(Base).
1018
1019update_rate_mma(State, MMAr) :-
1020 State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
1021 get_time(Now),
1022 Stamp is round((Now-Base)*Resolution),
1023 ( Stamp > MaxI
1024 -> nb_setarg(1, State, Now),
1025 nb_setarg(2, State, 0)
1026 ; true
1027 ),
1028 Diff is Stamp-Last,
1029 nb_setarg(2, State, Stamp),
1030 MMA is round(((N-1)*MMA0+Diff)/N),
1031 nb_setarg(6, State, MMA),
1032 MMAr is MMA/float(Resolution).
1033
1034
1035 1038
1039:- multifile
1040 prolog:message/3. 1041
1042prolog:message(httpd_started_server(Port, Options)) -->
1043 [ 'Started server at '-[] ],
1044 http_root(Port, Options).
1045prolog:message(httpd_stopped_worker(Self, Status)) -->
1046 [ 'Stopped worker ~p: ~p'-[Self, Status] ].
1047prolog:message(httpd_restarted_worker(Self)) -->
1048 [ 'Replaced aborted worker ~p'-[Self] ].
1049prolog:message(httpd(created_pool(Pool))) -->
1050 [ 'Created thread-pool ~p of size 10'-[Pool], nl,
1051 'Create this pool at startup-time or define the hook ', nl,
1052 'http:create_pool/1 to avoid this message and create a ', nl,
1053 'pool that fits the usage-profile.'
1054 ].
1055
1056http_root(Address, Options) -->
1057 { landing_page(Address, URI, Options) },
1058 [ '~w'-[URI] ].
1059
1060landing_page(Host:Port, URI, Options) :-
1061 !,
1062 must_be(atom, Host),
1063 must_be(integer, Port),
1064 http_server_property(Port, scheme(Scheme)),
1065 ( default_port(Scheme, Port)
1066 -> format(atom(Base), '~w://~w', [Scheme, Host])
1067 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
1068 ),
1069 entry_page(Base, URI, Options).
1070landing_page(unix_socket(Path), URI, _Options) :-
1071 !,
1072 format(string(URI), 'Unix domain socket "~w"', [Path]).
1073landing_page(Port, URI, Options) :-
1074 landing_page(localhost:Port, URI, Options).
1075
1076default_port(http, 80).
1077default_port(https, 443).
1078
1079entry_page(Base, URI, Options) :-
1080 option(entry_page(Entry), Options),
1081 !,
1082 uri_resolve(Entry, Base, URI).
1083entry_page(Base, URI, _) :-
1084 http_absolute_location(root(.), Entry, []),
1085 uri_resolve(Entry, Base, URI)