How to stop the OCaml collector collecting my reactive event handler?

I am trying to use the OBus library with Lwt_react . It uses "functional reactive programming" for properties and signals.

The problem (as stated in the React documentation ) is that OCaml can remove your callback while you use it. There is a keep function that holds the handler forever, but I do not want this. In the end, I want to free him, just not while I still need him.

So, I thought I would bind a handler to a switch:

 let keep ~switch handler = Lwt_switch.add_hook (Some switch) (fun () -> ignore handler; Lwt.return () ) 

But my event handler still collects garbage (which makes sense, since the code to turn off the switch is called when a signal arrives, so it is only a signal handler, keeping the switch alive first).

Here is a simplified (separate) version of my code:

 (* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *) let finished_event, fire_finished = React.E.create () let setup () = let switch = Lwt_switch.create () in let finished, waker = Lwt.wait () in let handler () = Lwt.wakeup waker () in let dont_gc_me = Lwt_react.E.map handler finished_event in ignore dont_gc_me; (* What goes here? *) print_endline "Waiting for signal..."; Lwt.bind finished (fun () -> Lwt_switch.turn_off switch) let () = let finished = Lwt.protected (setup ()) in Gc.full_major (); (* Force GC, to demonstrate problem *) fire_finished (); (* Simulate send *) Lwt_main.run finished; print_endline "Done"; 

Without the Gc.full_major line Gc.full_major this usually prints Done . However, it just freezes when Waiting for signal...

Edit: I split setup (real code) from the test driver and added the Lwt.protected wrapper to avoid masking the problem by accident with canceling Lwt.

+8
garbage-collection reactive-programming ocaml ocaml-lwt
source share
4 answers

Here is a snippet taken from my project, fixed to work around this problem with weak links (thanks!). The first part is to keep the global root pointing to your object. The second part is to limit the survivability of the signal / event in the volume stream Lwt.

Please note that the reactive object is cloned and obviously stopped, which may not meet your expectations.

 module Keep : sig type t val this : 'a -> t val release : t -> unit end = struct type t = {mutable prev: t; mutable next: t; mutable keep: (unit -> unit)} let rec root = {next = root; prev = root; keep = ignore} let release item = item.next.prev <- item.prev; item.prev.next <- item.next; item.prev <- item; item.next <- item; (* In case user-code keep a reference to item *) item.keep <- ignore let attach keep = let item = {next = root.next; prev = root; keep} in root.next.prev <- item; root.next <- item; item let this a = attach (fun () -> ignore a) end module React_utils : sig val with_signal : 'a signal -> ('a signal -> 'b Lwt.t) -> 'b Lwt.t val with_event : 'a event -> ('a event -> 'b Lwt.t) -> 'b Lwt.t end = struct let with_signal sf = let clone = S.map (fun x -> x) s in let kept = Keep.this clone in Lwt.finalize (fun () -> f clone) (fun () -> S.stop clone; Keep.release kept; Lwt.return_unit) let with_event ef = let clone = E.map (fun x -> x) e in let kept = Keep.this clone in Lwt.finalize (fun () -> f clone) (fun () -> E.stop clone; Keep.release kept; Lwt.return_unit) end 

Solving your example using this:

 let run () = let switch = Lwt_switch.create () in let finished, waker = Lwt.wait () in let handler () = Lwt.wakeup waker () in (* We use [Lwt.async] because are not interested in knowing when exactly the reference will be released *) Lwt.async (fun () -> (React_utils.with_event (Lwt_react.E.map handler finished_event) (fun _dont_gc_me -> finished))); print_endline "Waiting for signal..."; Gc.full_major (); (* Force GC, to demonstrate problem *) fire_finished (); (* Simulate send *) Lwt.bind finished (fun () -> Lwt_switch.turn_off switch) 
+6
source share

Here is my current (hacker) workaround. Each handler is added to the global hash table and then deleted again when the switch is off:

 let keep = let kept = Hashtbl.create 10 in let next = ref 0 in fun ~switch value -> let ticket = !next in incr next; Hashtbl.add kept ticket value; Lwt_switch.add_hook (Some switch) (fun () -> Hashtbl.remove kept ticket; Lwt.return () ) 

It is used as follows:

 Lwt_react.E.map handler event |> keep ~switch; 
+1
source share

One easy way to handle this is to keep a link to your event and call React.E.stop when you no longer want it:

 (* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *) let finished_event, fire_finished = React.E.create () let run () = let switch = Lwt_switch.create () in let finished, waker = Lwt.wait () in let handler () = Lwt.wakeup waker () in let ev = Lwt_react.E.map handler finished_event in print_endline "Waiting for signal..."; Gc.full_major (); (* Force GC, to demonstrate problem *) fire_finished (); (* Simulate send *) React.E.stop ev; Lwt.bind finished (fun () -> Lwt_switch.turn_off switch) let () = Lwt_main.run (run ()); print_endline "Done"; 
+1
source share

Note that if lwt did not support cancellation, then you would observe the same behavior by replacing Lwt.protected (setup ()) with Lwt.bind (setup ()) Lwt.return .

Basically you have:

finished_event --weak--> SETUP --> finished

where SETUP is the loop between the event and the Lwt stream. Removing Lwt.protected just goes through the last pointer, so it does what you want.

Lwt has only forward pointers (with the exception of cancellation support), and React has only reverse pointers (the former are weak). Thus, to ensure proper operation, you need to return an event instead of a thread.

0
source share

All Articles