-module(stream). -export([ p_inorder/1, p_list/1, p_preorder/1, p_postorder/1, p_seq/2, p_seq/3, p_tuple/1 ]). -export([ c_ave/1, c_empty/1, c_first/1, c_fold/3, c_last/1, c_list/1, c_range/1, c_reverse/1, c_sum/1, c_tree/1, c_tuple/1 ]). -export([ t_alternate/2, t_drop/2, t_drop_while/2, t_fby/2, t_filter/2, t_map/2, t_merge/2, t_repeat/2, t_scan/3, t_take/2, t_take_while/2, t_zip/2, t_zip_with/3 ]). % A stream is represented by a pair consisting of an % (initial) State and an Unfolding function. When a % stream is generated, new States are created, but % the same Unfolding function is used throughout. % The Unfolding function, given a State, returns one % of three and only three possibilties: % - done : the stream is at its end % - skip,State' : try again with the new state % - next,State',Item : here is the next item in the % stream and a new state representing the residue of % the stream. In Haskell, we'd have % % data Outcome s x = Done | Skip s | Next s x % type Stream x = exists s. (s, s -> Outcome s x) % % to_list (s, u) = % case u s of % Done -> [] % Skip s' -> to_list s' % Next s' x -> x : to_list s' % %if(true). -define(stream(S,U), {S,U}). -define(done, done). -define(skip(S), {skip,S}). -define(next(S,X), {next,S,X}). %else. %define(stream(S,U), [U|S]). %define(done, []). %define(skip(S), {S}). %define(next(S,X), [X|S]). %endif. %------------------------------------------------------------------------ % % Producers % % A producer returns a pair containing an initial state and an unfolder. % The unfolder is a *non-recursive* function that takes a single step; % it may report the end of the stream, a silent state change, or a % state change with an emitted element. % Producers should be in-lined. %------------------------------------------------------------------------ p_list(List) -> ?stream(List, fun ([]) -> ?done ; ([H|T]) -> ?next(T, H) end). p_seq(L, U) when is_integer(L), is_integer(U) -> ?stream(L, fun (I) when I > U -> ?done ; (I) -> ?next(I+1, I) end). p_seq(L, U, D) when is_integer(L), is_integer(U), is_integer(D), D > 0 -> ?stream(L, fun (I) when I > U -> ?done ; (I) -> ?next(I+D, I) end). % Trees here are just an example. % They are binary trees with nothing at the leaves. % We use {X} as an intermediate value in the stack % to report the element X when that's reached. p_preorder(Tree) -> ?stream([Tree], fun ([]) -> ?done ; ([[] |Stack]) -> ?skip(Stack) ; ([{L,X,R}|Stack]) -> ?next([L,R|Stack], X) end). p_inorder(Tree) -> ?stream([Tree], fun ([]) -> ?done ; ([[] |Stack]) -> ?skip(Stack) ; ([{L,X,R}|Stack]) -> ?skip([L,{X},R|Stack]) ; ([{X} |Stack]) -> ?next(Stack, X) end). p_postorder(Tree) -> ?stream([Tree], fun ([]) -> ?done ; ([[] |Stack]) -> ?skip(Stack) ; ([{L,X,R}|Stack]) -> ?skip([L,R,{X}|Stack]) ; ([{X} |Stack]) -> ?next(Stack, X) end). p_tuple(Tuple) -> N = size(Tuple), % tuple_size ?stream(1, fun (I) when I > N -> ?done ; (I) -> ?next(I+1, element(I, Tuple)) end). %------------------------------------------------------------------------ % % Consumers % % A consumer has two stages: a setup stage (which may be inlined) and a % tail recursive loop which calls the unfold function exactly once. The % rest of a stream's code should be inlined into a copy of that loop. % %------------------------------------------------------------------------ c_first(?stream(S, U)) when is_function(U, 1) -> c_first_loop(S, U). c_first_loop(S, U) -> case U(S) of ?skip(S1) -> c_first_loop(S1, U) ; ?next(_,R) -> R end. c_last(?stream(S, U)) when is_function(U, 1) -> c_last_loop(S, U, []). c_last_loop(S, U, X) -> case U(S) of ?done -> hd(X) ; ?skip(S1) -> c_last_loop(S1, U, X) ; ?next(S1,R) -> c_last_loop(S1, U, [R]) end. c_sum(?stream(S, U)) when is_function(U, 1) -> c_sum_loop(S, U, 0). c_sum_loop(S, U, T) -> case U(S) of ?done -> T ; ?skip(S1) -> c_sum_loop(S1, U, T) ; ?next(S1,R) -> c_sum_loop(S1, U, T+R) end. c_ave(?stream(S, U)) when is_function(U, 1) -> c_ave_loop(S, U, 0, 0). c_ave_loop(S, U, T, N) -> case U(S) of ?done -> T/N ; ?skip(S1) -> c_ave_loop(S1, U, T, N) ; ?next(S1,R) -> c_ave_loop(S1, U, T+R, N+1) end. c_empty(?stream(S, U)) when is_function(U, 1) -> c_empty_loop(S, U). c_empty_loop(S, U) -> case U(S) of ?done -> true ; ?skip(S1) -> c_empty_loop(S1, U) ; ?next(_,_) -> false end. c_reverse(?stream(S, U)) when is_function(U, 1) -> c_reverse_loop(S, U, []). c_reverse_loop(S, U, L) -> case U(S) of ?done -> L ; ?skip(S1) -> c_reverse_loop(S1, U, L) ; ?next(S1,R) -> c_reverse_loop(S1, U, [R|L]) end. c_list(Stream) -> lists:reverse(c_reverse(Stream)). c_tuple(Stream) -> list_to_tuple(c_list(Stream)). insert(X, {L,Y,R}) -> if X < Y -> {insert(X, L), Y, R} ; X > Y -> {L, Y, insert(X, R)} ; true -> {L,Y,R} end; insert(X, []) -> {[], X, []}. c_tree(?stream(S, U)) when is_function(U, 1) -> c_tree_loop(S, U, []). c_tree_loop(S, U, T) -> case U(S) of ?done -> T ; ?skip(S1) -> c_tree_loop(S1, U, T) ; ?next(S1,R) -> c_tree_loop(S1, U, insert(R, T)) end. c_fold(F, A, ?stream(S, U)) when is_function(U, 1), is_function(F, 2) -> c_fold_loop(F, A, S, U). c_fold_loop(F, A, S, U) -> case U(S) of ?done -> A ; ?skip(S1) -> c_fold_loop(F, A, S1, U) ; ?next(S1,R) -> c_fold_loop(F, F(A,R), S1, U) end. c_range(?stream(S, U)) when is_function(U, 1) -> {_,_} = c_range_loop(S, U, []). c_range_loop(S, U, AZ) -> case U(S) of ?done -> AZ ; ?skip(S1) -> c_range_loop(S1, U, AZ) ; ?next(S1,R) -> {A,Z} = AZ, c_range_loop(S1, U, if R > Z -> {A,R} ; R < Z -> {R,Z} ; true -> AZ end) end. %------------------------------------------------------------------------ % % Transformers % % A transformer accepts one or more stream arguments and returns a new % stream containing a possibly extended initial state and a new unfold % function. The new unfold function must mention the unfold functions % of its arguments exactly once each. Transformers should be inlined, % and their arguments' unfolders should be inlined into them. The % unfold function must be nonrecursive. % %------------------------------------------------------------------------ t_map(F, ?stream(S, U)) when is_function(F, 1) -> ?stream(S, fun (S0) -> case R = U(S0) of ?next(S1,X) -> ?next(S1, F(X)) ; _ -> R end end). t_filter(P, ?stream(S, U)) when is_function(P, 1) -> ?stream(S, fun (S0) -> case R = U(S0) of ?next(S1,X) -> case P(X) of false -> ?skip(S1) ; true -> R end ; _ -> R end end). t_take_while(P, ?stream(S, U)) when is_function(P, 1) -> ?stream(S, fun (S0) -> case R = U(S0) of ?next(_,X) -> case P(X) of false -> ?done ; true -> R end ; _ -> R end end). t_drop_while(P, ?stream(S, U)) when is_function(P, 1) -> ?stream({false,S}, fun ({B,S0}) -> case U(S0) of ?next(S1,X) -> if B -> ?next({B,S1}, X) ; true -> case B1 = P(X) of true -> ?next({B1,S1}, X) ; false -> ?skip({B1,S1}) end end ; ?skip(S1) -> ?skip({B,S1}) ; ?done -> ?done end end). t_take(N, ?stream(S, U)) when is_integer(N), N >= 0 -> ?stream({N,S}, fun ({I,S0}) -> if I =< 0 -> ?done ; true -> case U(S0) of ?next(S1,X) -> ?next({I-1,S1},X) ; ?skip(S1) -> ?skip({I,S1}) ; ?done -> ?done end end end). t_drop(N, ?stream(S, U)) when is_integer(N), N >= 0 -> ?stream({N,S}, fun ({I,S0}) -> case U(S0) of ?done -> ?done ; ?skip(S1) -> ?skip({I,S1}) ; ?next(S1,X) -> if 0 < I -> ?skip({I-1,S1}) ; true -> ?next({0,S1}, X) end end end). t_scan(F, A, ?stream(S, U)) when is_function(F, 2) -> ?stream({S,A}, fun ({S0,A0}) -> case U(S0) of ?done -> ?done ; ?skip(S1) -> ?skip({S1,A0}) ; ?next(S1,X) -> A1 = F(A0,X), ?next({S1,A1}, A1) end end). % t_repeat(F, S) % repeats each element X of S exactly F(X) times. t_repeat(F, ?stream(U, S)) when is_function(F, 1) -> ?stream({S}, fun ({S0}) -> case U(S0) of ?done -> ?done ; ?skip(S1) -> ?skip({S1}) ; ?next(S1,X) -> N = F(S1), if N > 0 -> ?next({S1,N-1,X}, X) ; true -> ?skip({S1}) end end ; ({S0,N,X}) when N > 0 -> ?next({S0,N-1,X}, X) ; ({S0,_,_}) -> ?skip({S0}) end). % t_fby, t_alternate, t_zip, and t_zip_with % can be extended to any fixed number of streams. t_fby(?stream(LS, UL), ?stream(RS, UR)) -> ?stream({left,LS,RS}, fun ({left,SL,SR}) -> case UL(SL) of ?done -> ?skip({right,SL,SR}) ; ?skip(S1) -> ?skip({left,S1,SR}) ; ?next(S1,X) -> ?next({left,S1,SR}, X) end ; ({right,SL,SR}) -> case UR(SR) of ?done -> ?done ; ?skip(S1) -> ?skip({right,SL,S1}) ; ?next(S1,X) -> ?next({right,SL,S1}, X) end end). t_alternate(?stream(LS, UL), ?stream(RS, UR)) -> ?stream({left,LS,RS}, fun ({left,SL,SR}) -> case UL(SL) of ?done -> ?done ; ?skip(S1) -> ?skip({left,S1,SR}) ; ?next(S1,XL) -> ?next({right,S1,SR}, XL) end ; ({right,SL,SR}) -> case UR(SR) of ?done -> ?done ; ?skip(S1) -> ?skip({right,SL,S1}) ; ?next(S1,XR) -> ?next({left,SL,S1}, XR) end end). t_zip(?stream(LS, UL), ?stream(RS, UR)) -> ?stream({LS,RS}, fun ({SL,SR}) -> case UL(SL) of ?done -> ?done ; ?skip(S1) -> ?skip({S1,SR}) ; ?next(S1,XL) -> ?skip({S1,SR,XL}) end ; ({SL,SR,XL}) -> case UR(SR) of ?done -> ?done ; ?skip(S1) -> ?skip({SL,S1,XL}) ; ?next(S1,XR) -> ?next({SL,S1}, {XL,XR}) end end). t_zip_with(F, ?stream(LS,UL), ?stream(RS,UR)) when is_function(F, 2) -> ?stream({LS,RS}, fun ({SL,SR}) -> case UL(SL) of ?done -> ?done ; ?skip(S1) -> ?skip({S1,SR}) ; ?next(S1,XL) -> ?skip({S1,SR,XL}) end ; ({SL,SR,XL}) -> case UR(SR) of ?done -> ?done ; ?skip(S1) -> ?skip({SL,S1,XL}) ; ?next(S1,XR) -> ?next({SL,S1}, F(XL,XR)) end end). % It might be the left stream's turn to be looked at. % {left,SL,} % The right stream might be % - known to be empty [] % - known to be non-empty {SR,XR} % - not yet examined [SR] % It might be the right stream's turn to be looked at. % {right,,SR} % The left stream might be empty, non-empty, or unexamined. t_merge(?stream(LS, UL), ?stream(RS, UR)) -> ?stream({LS,RS}, fun ({left,SL,RR}) -> case UL(SL) of ?done -> case RR of [] -> ?done ; {SR,XR} -> ?next({right,[],SR}, XR) ; [SR] -> ?skip({right,[],SR}) end ; ?skip(S1) -> ?skip({left,S1,RR}) ; ?next(S1,XL) -> case RR of [] -> ?next({left,S1,[]}, XL) ; {SR,XR} -> if XR < XL -> ?next({right,{S1,XL},SR}, XR) ; true -> ?next({left,S1,{SR,XR}}, XL) end ; [SR] -> ?skip({right,{S1,XL},SR}) end end ; ({right,LL,SR}) -> case UR(SR) of ?done -> case LL of [] -> ?done ; {SL,XL} -> ?next({left,SL,[]}, XL) ; [SL] -> ?skip({left,SL,[]}) end ; ?skip(S1) -> ?skip({right,LL,S1}) ; ?next(S1,XR) -> case LL of [] -> ?next({right,[],S1}, XR) ; {SL,XL} -> if XR < XL -> ?next({right,{S1,XL},SR}, XR) ; true -> ?next({left,SL,{S1,XR}}, XL) end ; {SL} -> ?skip({left,SL,{S1,XR}}) end end end). % Example: t() -> [{1,a},{2,b},{3,c},{4,d},{5,e}] % t() -> % L = [1,a,b,2,c,3,d,4,5,e], % S1 = t_filter(fun (X) -> is_integer(X) end, p_list(L)), % S2 = t_filter(fun (X) -> is_atom(X) end, p_list(L)), % c_list(t_zip(S1, S2)).