hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use quote::quote;
24use serde::de::DeserializeOwned;
25use serde::{Deserialize, Serialize};
26use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
27use stageleft::{QuotedWithContext, q, quote_type};
28use syn::parse_quote;
29use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
30
31use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
32use crate::forward_handle::ForwardRef;
33#[cfg(stageleft_runtime)]
34use crate::forward_handle::{CycleCollection, ForwardHandle};
35use crate::live_collections::boundedness::Unbounded;
36use crate::live_collections::keyed_stream::KeyedStream;
37use crate::live_collections::singleton::Singleton;
38use crate::live_collections::stream::{
39    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
40};
41use crate::location::dynamic::LocationId;
42use crate::location::external_process::{
43    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
44};
45use crate::nondet::NonDet;
46#[cfg(feature = "sim")]
47use crate::sim::SimSender;
48use crate::staging_util::get_this_crate;
49
50pub mod dynamic;
51
52#[expect(missing_docs, reason = "TODO")]
53pub mod external_process;
54pub use external_process::External;
55
56#[expect(missing_docs, reason = "TODO")]
57pub mod process;
58pub use process::Process;
59
60#[expect(missing_docs, reason = "TODO")]
61pub mod cluster;
62pub use cluster::Cluster;
63
64#[expect(missing_docs, reason = "TODO")]
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67#[expect(missing_docs, reason = "TODO")]
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71#[expect(missing_docs, reason = "TODO")]
72#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
73pub enum MembershipEvent {
74    Joined,
75    Left,
76}
77
78#[expect(missing_docs, reason = "TODO")]
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
80pub enum NetworkHint {
81    Auto,
82    TcpPort(Option<u16>),
83}
84
85pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
86    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
87}
88
89#[expect(missing_docs, reason = "TODO")]
90#[expect(
91    private_bounds,
92    reason = "only internal Hydro code can define location types"
93)]
94pub trait Location<'a>: dynamic::DynLocation {
95    type Root: Location<'a>;
96    fn root(&self) -> Self::Root;
97
98    /// A human-readable name used for debugging and telemetry.
99    fn name() -> String;
100
101    fn try_tick(&self) -> Option<Tick<Self>> {
102        if Self::is_top_level() {
103            let next_id = self.flow_state().borrow_mut().next_clock_id;
104            self.flow_state().borrow_mut().next_clock_id += 1;
105            Some(Tick {
106                id: next_id,
107                l: self.clone(),
108            })
109        } else {
110            None
111        }
112    }
113
114    fn id(&self) -> LocationId {
115        dynamic::DynLocation::id(self)
116    }
117
118    fn tick(&self) -> Tick<Self>
119    where
120        Self: NoTick,
121    {
122        let next_id = self.flow_state().borrow_mut().next_clock_id;
123        self.flow_state().borrow_mut().next_clock_id += 1;
124        Tick {
125            id: next_id,
126            l: self.clone(),
127        }
128    }
129
130    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
131    where
132        Self: Sized + NoTick,
133    {
134        Stream::new(
135            self.clone(),
136            HydroNode::Source {
137                source: HydroSource::Spin(),
138                metadata: self.new_node_metadata(Stream::<
139                    (),
140                    Self,
141                    Unbounded,
142                    TotalOrder,
143                    ExactlyOnce,
144                >::collection_kind()),
145            },
146        )
147    }
148
149    fn source_stream<T, E>(
150        &self,
151        e: impl QuotedWithContext<'a, E, Self>,
152    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
153    where
154        E: FuturesStream<Item = T> + Unpin,
155        Self: Sized + NoTick,
156    {
157        let e = e.splice_untyped_ctx(self);
158
159        Stream::new(
160            self.clone(),
161            HydroNode::Source {
162                source: HydroSource::Stream(e.into()),
163                metadata: self.new_node_metadata(Stream::<
164                    T,
165                    Self,
166                    Unbounded,
167                    TotalOrder,
168                    ExactlyOnce,
169                >::collection_kind()),
170            },
171        )
172    }
173
174    fn source_iter<T, E>(
175        &self,
176        e: impl QuotedWithContext<'a, E, Self>,
177    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
178    where
179        E: IntoIterator<Item = T>,
180        Self: Sized + NoTick,
181    {
182        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
183        // for bounded top-level streams, and this is the only way to generate one
184        let e = e.splice_typed_ctx(self);
185
186        Stream::new(
187            self.clone(),
188            HydroNode::Source {
189                source: HydroSource::Iter(e.into()),
190                metadata: self.new_node_metadata(Stream::<
191                    T,
192                    Self,
193                    Unbounded,
194                    TotalOrder,
195                    ExactlyOnce,
196                >::collection_kind()),
197            },
198        )
199    }
200
201    fn source_cluster_members<C: 'a>(
202        &self,
203        cluster: &Cluster<'a, C>,
204    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
205    where
206        Self: Sized + NoTick,
207    {
208        Stream::new(
209            self.clone(),
210            HydroNode::Source {
211                source: HydroSource::ClusterMembers(cluster.id()),
212                metadata: self.new_node_metadata(Stream::<
213                    (TaglessMemberId, MembershipEvent),
214                    Self,
215                    Unbounded,
216                    TotalOrder,
217                    ExactlyOnce,
218                >::collection_kind()),
219            },
220        )
221        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
222        .into_keyed()
223    }
224
225    fn source_external_bytes<L>(
226        &self,
227        from: &External<L>,
228    ) -> (
229        ExternalBytesPort,
230        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
231    )
232    where
233        Self: Sized + NoTick,
234    {
235        let (port, stream, sink) =
236            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
237
238        sink.complete(self.source_iter(q!([])));
239
240        (port, stream)
241    }
242
243    #[expect(clippy::type_complexity, reason = "stream markers")]
244    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
245        &self,
246        from: &External<L>,
247    ) -> (
248        ExternalBincodeSink<T, NotMany, O, R>,
249        Stream<T, Self, Unbounded, O, R>,
250    )
251    where
252        Self: Sized + NoTick,
253        T: Serialize + DeserializeOwned,
254    {
255        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
256        sink.complete(self.source_iter(q!([])));
257
258        (
259            ExternalBincodeSink {
260                process_id: from.id,
261                port_id: port.port_id,
262                _phantom: PhantomData,
263            },
264            stream.weaken_ordering().weaken_retries(),
265        )
266    }
267
268    #[cfg(feature = "sim")]
269    #[expect(clippy::type_complexity, reason = "stream markers")]
270    /// Sets up a simulated input port on this location, returning a handle to send messages to
271    /// the location as well as a stream of received messages.
272    fn sim_input<T, O: Ordering, R: Retries>(
273        &self,
274    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
275    where
276        Self: Sized + NoTick,
277        T: Serialize + DeserializeOwned,
278    {
279        let external_location: External<'a, ()> = External {
280            id: 0,
281            flow_state: self.flow_state().clone(),
282            _phantom: PhantomData,
283        };
284
285        let (external, stream) = self.source_external_bincode(&external_location);
286
287        (SimSender(external.port_id, PhantomData), stream)
288    }
289
290    /// Establishes a server on this location to receive a bidirectional connection from a single
291    /// client, identified by the given `External` handle. Returns a port handle for the external
292    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
293    /// messages.
294    ///
295    /// # Example
296    /// ```rust
297    /// # #[cfg(feature = "deploy")] {
298    /// # use hydro_lang::prelude::*;
299    /// # use hydro_deploy::Deployment;
300    /// # use futures::{SinkExt, StreamExt};
301    /// # tokio_test::block_on(async {
302    /// # use bytes::Bytes;
303    /// # use hydro_lang::location::NetworkHint;
304    /// # use tokio_util::codec::LengthDelimitedCodec;
305    /// # let flow = FlowBuilder::new();
306    /// let node = flow.process::<()>();
307    /// let external = flow.external::<()>();
308    /// let (port, incoming, outgoing) =
309    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
310    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
311    ///     let mut resp: Vec<u8> = data.into();
312    ///     resp.push(42);
313    ///     resp.into() // : Bytes
314    /// })));
315    ///
316    /// # let mut deployment = Deployment::new();
317    /// let nodes = flow // ... with_process and with_external
318    /// #     .with_process(&node, deployment.Localhost())
319    /// #     .with_external(&external, deployment.Localhost())
320    /// #     .deploy(&mut deployment);
321    ///
322    /// deployment.deploy().await.unwrap();
323    /// deployment.start().await.unwrap();
324    ///
325    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
326    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
327    /// assert_eq!(
328    ///     external_out.next().await.unwrap().unwrap(),
329    ///     vec![1, 2, 3, 42]
330    /// );
331    /// # });
332    /// # }
333    /// ```
334    #[expect(clippy::type_complexity, reason = "stream markers")]
335    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
336        &self,
337        from: &External<L>,
338        port_hint: NetworkHint,
339    ) -> (
340        ExternalBytesPort<NotMany>,
341        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
342        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
343    )
344    where
345        Self: Sized + NoTick,
346    {
347        let next_external_port_id = {
348            let mut flow_state = from.flow_state.borrow_mut();
349            let id = flow_state.next_external_out;
350            flow_state.next_external_out += 1;
351            id
352        };
353
354        let (fwd_ref, to_sink) =
355            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
356        let mut flow_state_borrow = self.flow_state().borrow_mut();
357
358        flow_state_borrow.push_root(HydroRoot::SendExternal {
359            to_external_id: from.id,
360            to_key: next_external_port_id,
361            to_many: false,
362            unpaired: false,
363            serialize_fn: None,
364            instantiate_fn: DebugInstantiate::Building,
365            input: Box::new(to_sink.ir_node.into_inner()),
366            op_metadata: HydroIrOpMetadata::new(),
367        });
368
369        let raw_stream: Stream<
370            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
371            Self,
372            Unbounded,
373            TotalOrder,
374            ExactlyOnce,
375        > = Stream::new(
376            self.clone(),
377            HydroNode::ExternalInput {
378                from_external_id: from.id,
379                from_key: next_external_port_id,
380                from_many: false,
381                codec_type: quote_type::<Codec>().into(),
382                port_hint,
383                instantiate_fn: DebugInstantiate::Building,
384                deserialize_fn: None,
385                metadata: self.new_node_metadata(Stream::<
386                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
387                    Self,
388                    Unbounded,
389                    TotalOrder,
390                    ExactlyOnce,
391                >::collection_kind()),
392            },
393        );
394
395        (
396            ExternalBytesPort {
397                process_id: from.id,
398                port_id: next_external_port_id,
399                _phantom: PhantomData,
400            },
401            raw_stream.flatten_ordered(),
402            fwd_ref,
403        )
404    }
405
406    #[expect(clippy::type_complexity, reason = "stream markers")]
407    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
408        &self,
409        from: &External<L>,
410    ) -> (
411        ExternalBincodeBidi<InT, OutT, NotMany>,
412        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
413        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
414    )
415    where
416        Self: Sized + NoTick,
417    {
418        let next_external_port_id = {
419            let mut flow_state = from.flow_state.borrow_mut();
420            let id = flow_state.next_external_out;
421            flow_state.next_external_out += 1;
422            id
423        };
424
425        let (fwd_ref, to_sink) =
426            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
427        let mut flow_state_borrow = self.flow_state().borrow_mut();
428
429        let root = get_this_crate();
430
431        let out_t_type = quote_type::<OutT>();
432        let ser_fn: syn::Expr = syn::parse_quote! {
433            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
434                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
435            )
436        };
437
438        flow_state_borrow.push_root(HydroRoot::SendExternal {
439            to_external_id: from.id,
440            to_key: next_external_port_id,
441            to_many: false,
442            unpaired: false,
443            serialize_fn: Some(ser_fn.into()),
444            instantiate_fn: DebugInstantiate::Building,
445            input: Box::new(to_sink.ir_node.into_inner()),
446            op_metadata: HydroIrOpMetadata::new(),
447        });
448
449        let in_t_type = quote_type::<InT>();
450
451        let deser_fn: syn::Expr = syn::parse_quote! {
452            |res| {
453                let b = res.unwrap();
454                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
455            }
456        };
457
458        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
459            self.clone(),
460            HydroNode::ExternalInput {
461                from_external_id: from.id,
462                from_key: next_external_port_id,
463                from_many: false,
464                codec_type: quote_type::<LengthDelimitedCodec>().into(),
465                port_hint: NetworkHint::Auto,
466                instantiate_fn: DebugInstantiate::Building,
467                deserialize_fn: Some(deser_fn.into()),
468                metadata: self.new_node_metadata(Stream::<
469                    InT,
470                    Self,
471                    Unbounded,
472                    TotalOrder,
473                    ExactlyOnce,
474                >::collection_kind()),
475            },
476        );
477
478        (
479            ExternalBincodeBidi {
480                process_id: from.id,
481                port_id: next_external_port_id,
482                _phantom: PhantomData,
483            },
484            raw_stream,
485            fwd_ref,
486        )
487    }
488
489    #[expect(clippy::type_complexity, reason = "stream markers")]
490    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
491        &self,
492        from: &External<L>,
493        port_hint: NetworkHint,
494    ) -> (
495        ExternalBytesPort<Many>,
496        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
497        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
498        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
499    )
500    where
501        Self: Sized + NoTick,
502    {
503        let next_external_port_id = {
504            let mut flow_state = from.flow_state.borrow_mut();
505            let id = flow_state.next_external_out;
506            flow_state.next_external_out += 1;
507            id
508        };
509
510        let (fwd_ref, to_sink) =
511            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
512        let mut flow_state_borrow = self.flow_state().borrow_mut();
513
514        flow_state_borrow.push_root(HydroRoot::SendExternal {
515            to_external_id: from.id,
516            to_key: next_external_port_id,
517            to_many: true,
518            unpaired: false,
519            serialize_fn: None,
520            instantiate_fn: DebugInstantiate::Building,
521            input: Box::new(to_sink.entries().ir_node.into_inner()),
522            op_metadata: HydroIrOpMetadata::new(),
523        });
524
525        let raw_stream: Stream<
526            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
527            Self,
528            Unbounded,
529            TotalOrder,
530            ExactlyOnce,
531        > = Stream::new(
532            self.clone(),
533            HydroNode::ExternalInput {
534                from_external_id: from.id,
535                from_key: next_external_port_id,
536                from_many: true,
537                codec_type: quote_type::<Codec>().into(),
538                port_hint,
539                instantiate_fn: DebugInstantiate::Building,
540                deserialize_fn: None,
541                metadata: self.new_node_metadata(Stream::<
542                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
543                    Self,
544                    Unbounded,
545                    TotalOrder,
546                    ExactlyOnce,
547                >::collection_kind()),
548            },
549        );
550
551        let membership_stream_ident = syn::Ident::new(
552            &format!(
553                "__hydro_deploy_many_{}_{}_membership",
554                from.id, next_external_port_id
555            ),
556            Span::call_site(),
557        );
558        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
559        let raw_membership_stream: KeyedStream<
560            u64,
561            bool,
562            Self,
563            Unbounded,
564            TotalOrder,
565            ExactlyOnce,
566        > = KeyedStream::new(
567            self.clone(),
568            HydroNode::Source {
569                source: HydroSource::Stream(membership_stream_expr.into()),
570                metadata: self.new_node_metadata(KeyedStream::<
571                    u64,
572                    bool,
573                    Self,
574                    Unbounded,
575                    TotalOrder,
576                    ExactlyOnce,
577                >::collection_kind()),
578            },
579        );
580
581        (
582            ExternalBytesPort {
583                process_id: from.id,
584                port_id: next_external_port_id,
585                _phantom: PhantomData,
586            },
587            raw_stream
588                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
589                .into_keyed(),
590            raw_membership_stream.map(q!(|join| {
591                if join {
592                    MembershipEvent::Joined
593                } else {
594                    MembershipEvent::Left
595                }
596            })),
597            fwd_ref,
598        )
599    }
600
601    #[expect(clippy::type_complexity, reason = "stream markers")]
602    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
603        &self,
604        from: &External<L>,
605    ) -> (
606        ExternalBincodeBidi<InT, OutT, Many>,
607        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
608        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
609        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
610    )
611    where
612        Self: Sized + NoTick,
613    {
614        let next_external_port_id = {
615            let mut flow_state = from.flow_state.borrow_mut();
616            let id = flow_state.next_external_out;
617            flow_state.next_external_out += 1;
618            id
619        };
620
621        let root = get_this_crate();
622
623        let (fwd_ref, to_sink) =
624            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
625        let mut flow_state_borrow = self.flow_state().borrow_mut();
626
627        let out_t_type = quote_type::<OutT>();
628        let ser_fn: syn::Expr = syn::parse_quote! {
629            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
630                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
631            )
632        };
633
634        flow_state_borrow.push_root(HydroRoot::SendExternal {
635            to_external_id: from.id,
636            to_key: next_external_port_id,
637            to_many: true,
638            unpaired: false,
639            serialize_fn: Some(ser_fn.into()),
640            instantiate_fn: DebugInstantiate::Building,
641            input: Box::new(to_sink.entries().ir_node.into_inner()),
642            op_metadata: HydroIrOpMetadata::new(),
643        });
644
645        let in_t_type = quote_type::<InT>();
646
647        let deser_fn: syn::Expr = syn::parse_quote! {
648            |res| {
649                let (id, b) = res.unwrap();
650                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
651            }
652        };
653
654        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
655            KeyedStream::new(
656                self.clone(),
657                HydroNode::ExternalInput {
658                    from_external_id: from.id,
659                    from_key: next_external_port_id,
660                    from_many: true,
661                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
662                    port_hint: NetworkHint::Auto,
663                    instantiate_fn: DebugInstantiate::Building,
664                    deserialize_fn: Some(deser_fn.into()),
665                    metadata: self.new_node_metadata(KeyedStream::<
666                        u64,
667                        InT,
668                        Self,
669                        Unbounded,
670                        TotalOrder,
671                        ExactlyOnce,
672                    >::collection_kind()),
673                },
674            );
675
676        let membership_stream_ident = syn::Ident::new(
677            &format!(
678                "__hydro_deploy_many_{}_{}_membership",
679                from.id, next_external_port_id
680            ),
681            Span::call_site(),
682        );
683        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
684        let raw_membership_stream: KeyedStream<
685            u64,
686            bool,
687            Self,
688            Unbounded,
689            TotalOrder,
690            ExactlyOnce,
691        > = KeyedStream::new(
692            self.clone(),
693            HydroNode::Source {
694                source: HydroSource::Stream(membership_stream_expr.into()),
695                metadata: self.new_node_metadata(KeyedStream::<
696                    u64,
697                    bool,
698                    Self,
699                    Unbounded,
700                    TotalOrder,
701                    ExactlyOnce,
702                >::collection_kind()),
703            },
704        );
705
706        (
707            ExternalBincodeBidi {
708                process_id: from.id,
709                port_id: next_external_port_id,
710                _phantom: PhantomData,
711            },
712            raw_stream,
713            raw_membership_stream.map(q!(|join| {
714                if join {
715                    MembershipEvent::Joined
716                } else {
717                    MembershipEvent::Left
718                }
719            })),
720            fwd_ref,
721        )
722    }
723
724    /// Constructs a [`Singleton`] materialized at this location with the given static value.
725    ///
726    /// # Example
727    /// ```rust
728    /// # #[cfg(feature = "deploy")] {
729    /// # use hydro_lang::prelude::*;
730    /// # use futures::StreamExt;
731    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
732    /// let tick = process.tick();
733    /// let singleton = tick.singleton(q!(5));
734    /// # singleton.all_ticks()
735    /// # }, |mut stream| async move {
736    /// // 5
737    /// # assert_eq!(stream.next().await.unwrap(), 5);
738    /// # }));
739    /// # }
740    /// ```
741    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
742    where
743        T: Clone,
744        Self: Sized,
745    {
746        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
747        // for bounded top-level singletons, and this is the only way to generate one
748
749        let e = e.splice_untyped_ctx(self);
750
751        Singleton::new(
752            self.clone(),
753            HydroNode::SingletonSource {
754                value: e.into(),
755                metadata: self
756                    .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
757            },
758        )
759    }
760
761    /// Generates a stream with values emitted at a fixed interval, with
762    /// each value being the current time (as an [`tokio::time::Instant`]).
763    ///
764    /// The clock source used is monotonic, so elements will be emitted in
765    /// increasing order.
766    ///
767    /// # Non-Determinism
768    /// Because this stream is generated by an OS timer, it will be
769    /// non-deterministic because each timestamp will be arbitrary.
770    fn source_interval(
771        &self,
772        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
773        _nondet: NonDet,
774    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
775    where
776        Self: Sized + NoTick,
777    {
778        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
779            tokio::time::interval(interval)
780        )))
781    }
782
783    /// Generates a stream with values emitted at a fixed interval (with an
784    /// initial delay), with each value being the current time
785    /// (as an [`tokio::time::Instant`]).
786    ///
787    /// The clock source used is monotonic, so elements will be emitted in
788    /// increasing order.
789    ///
790    /// # Non-Determinism
791    /// Because this stream is generated by an OS timer, it will be
792    /// non-deterministic because each timestamp will be arbitrary.
793    fn source_interval_delayed(
794        &self,
795        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
796        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
797        _nondet: NonDet,
798    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
799    where
800        Self: Sized + NoTick,
801    {
802        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
803            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
804        )))
805    }
806
807    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
808    where
809        S: CycleCollection<'a, ForwardRef, Location = Self>,
810        Self: NoTick,
811    {
812        let next_id = self.flow_state().borrow_mut().next_cycle_id();
813        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
814
815        (
816            ForwardHandle {
817                completed: false,
818                ident: ident.clone(),
819                expected_location: Location::id(self),
820                _phantom: PhantomData,
821            },
822            S::create_source(ident, self.clone()),
823        )
824    }
825}
826
827/// A free variable representing the root location's ID. When spliced in
828/// a quoted snippet that will run on a cluster, this turns into a [`LocationId`].
829pub static LOCATION_SELF_ID: LocationSelfId = LocationSelfId { _private: &() };
830
831/// See [`LOCATION_SELF_ID`].
832#[derive(Clone, Copy)]
833pub struct LocationSelfId<'a> {
834    _private: &'a (),
835}
836
837impl<'a, L> FreeVariableWithContext<L> for LocationSelfId<'a>
838where
839    L: Location<'a>,
840{
841    type O = LocationId;
842
843    fn to_tokens(self, ctx: &L) -> QuoteTokens
844    where
845        Self: Sized,
846    {
847        let root = get_this_crate();
848        let location_id = ctx.root().id();
849
850        let variant = match location_id {
851            LocationId::Process(id) => quote! { Process(#id) },
852            LocationId::Cluster(id) => quote! { Cluster(#id) },
853            _other => unreachable!(),
854        };
855
856        QuoteTokens {
857            prelude: None,
858            expr: Some(quote! { #root::location::dynamic::LocationId::#variant }),
859        }
860    }
861}
862
863/// A free variable representing the root location's human-readable name. When spliced in
864/// a quoted snippet that will run on a cluster, this turns into a [`&'static str`].
865pub static LOCATION_SELF_NAME: LocationSelfName = LocationSelfName { _private: &() };
866
867/// See [`LOCATION_SELF_NAME`].
868#[derive(Clone, Copy)]
869pub struct LocationSelfName<'a> {
870    _private: &'a (),
871}
872
873impl<'a, L> FreeVariableWithContext<L> for LocationSelfName<'a>
874where
875    L: Location<'a>,
876{
877    type O = LocationId;
878
879    fn to_tokens(self, _ctx: &L) -> QuoteTokens
880    where
881        Self: Sized,
882    {
883        let location_name = <L::Root as Location>::name();
884
885        QuoteTokens {
886            prelude: None,
887            expr: Some(quote! { #location_name }),
888        }
889    }
890}
891
892#[cfg(feature = "deploy")]
893#[cfg(test)]
894mod tests {
895    use std::collections::HashSet;
896
897    use futures::{SinkExt, StreamExt};
898    use hydro_deploy::Deployment;
899    use stageleft::q;
900    use tokio_util::codec::LengthDelimitedCodec;
901
902    use crate::compile::builder::FlowBuilder;
903    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
904    use crate::location::{Location, NetworkHint};
905    use crate::nondet::nondet;
906
907    #[tokio::test]
908    async fn top_level_singleton_replay_cardinality() {
909        let mut deployment = Deployment::new();
910
911        let flow = FlowBuilder::new();
912        let node = flow.process::<()>();
913        let external = flow.external::<()>();
914
915        let (in_port, input) =
916            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
917        let singleton = node.singleton(q!(123));
918        let tick = node.tick();
919        let out = input
920            .batch(&tick, nondet!(/** test */))
921            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
922            .cross_singleton(
923                singleton
924                    .snapshot(&tick, nondet!(/** test */))
925                    .into_stream()
926                    .count(),
927            )
928            .all_ticks()
929            .send_bincode_external(&external);
930
931        let nodes = flow
932            .with_process(&node, deployment.Localhost())
933            .with_external(&external, deployment.Localhost())
934            .deploy(&mut deployment);
935
936        deployment.deploy().await.unwrap();
937
938        let mut external_in = nodes.connect(in_port).await;
939        let mut external_out = nodes.connect(out).await;
940
941        deployment.start().await.unwrap();
942
943        external_in.send(1).await.unwrap();
944        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
945
946        external_in.send(2).await.unwrap();
947        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
948    }
949
950    #[tokio::test]
951    async fn tick_singleton_replay_cardinality() {
952        let mut deployment = Deployment::new();
953
954        let flow = FlowBuilder::new();
955        let node = flow.process::<()>();
956        let external = flow.external::<()>();
957
958        let (in_port, input) =
959            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
960        let tick = node.tick();
961        let singleton = tick.singleton(q!(123));
962        let out = input
963            .batch(&tick, nondet!(/** test */))
964            .cross_singleton(singleton.clone())
965            .cross_singleton(singleton.into_stream().count())
966            .all_ticks()
967            .send_bincode_external(&external);
968
969        let nodes = flow
970            .with_process(&node, deployment.Localhost())
971            .with_external(&external, deployment.Localhost())
972            .deploy(&mut deployment);
973
974        deployment.deploy().await.unwrap();
975
976        let mut external_in = nodes.connect(in_port).await;
977        let mut external_out = nodes.connect(out).await;
978
979        deployment.start().await.unwrap();
980
981        external_in.send(1).await.unwrap();
982        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
983
984        external_in.send(2).await.unwrap();
985        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
986    }
987
988    #[tokio::test]
989    async fn external_bytes() {
990        let mut deployment = Deployment::new();
991
992        let flow = FlowBuilder::new();
993        let first_node = flow.process::<()>();
994        let external = flow.external::<()>();
995
996        let (in_port, input) = first_node.source_external_bytes(&external);
997        let out = input.send_bincode_external(&external);
998
999        let nodes = flow
1000            .with_process(&first_node, deployment.Localhost())
1001            .with_external(&external, deployment.Localhost())
1002            .deploy(&mut deployment);
1003
1004        deployment.deploy().await.unwrap();
1005
1006        let mut external_in = nodes.connect(in_port).await.1;
1007        let mut external_out = nodes.connect(out).await;
1008
1009        deployment.start().await.unwrap();
1010
1011        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1012
1013        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1014    }
1015
1016    #[tokio::test]
1017    async fn multi_external_source() {
1018        let mut deployment = Deployment::new();
1019
1020        let flow = FlowBuilder::new();
1021        let first_node = flow.process::<()>();
1022        let external = flow.external::<()>();
1023
1024        let (in_port, input, _membership, complete_sink) =
1025            first_node.bidi_external_many_bincode(&external);
1026        let out = input.entries().send_bincode_external(&external);
1027        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
1028
1029        let nodes = flow
1030            .with_process(&first_node, deployment.Localhost())
1031            .with_external(&external, deployment.Localhost())
1032            .deploy(&mut deployment);
1033
1034        deployment.deploy().await.unwrap();
1035
1036        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1037        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1038        let external_out = nodes.connect(out).await;
1039
1040        deployment.start().await.unwrap();
1041
1042        external_in_1.send(123).await.unwrap();
1043        external_in_2.send(456).await.unwrap();
1044
1045        assert_eq!(
1046            external_out.take(2).collect::<HashSet<_>>().await,
1047            vec![(0, 123), (1, 456)].into_iter().collect()
1048        );
1049    }
1050
1051    #[tokio::test]
1052    async fn second_connection_only_multi_source() {
1053        let mut deployment = Deployment::new();
1054
1055        let flow = FlowBuilder::new();
1056        let first_node = flow.process::<()>();
1057        let external = flow.external::<()>();
1058
1059        let (in_port, input, _membership, complete_sink) =
1060            first_node.bidi_external_many_bincode(&external);
1061        let out = input.entries().send_bincode_external(&external);
1062        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
1063
1064        let nodes = flow
1065            .with_process(&first_node, deployment.Localhost())
1066            .with_external(&external, deployment.Localhost())
1067            .deploy(&mut deployment);
1068
1069        deployment.deploy().await.unwrap();
1070
1071        // intentionally skipped to test stream waking logic
1072        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1073        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1074        let mut external_out = nodes.connect(out).await;
1075
1076        deployment.start().await.unwrap();
1077
1078        external_in_2.send(456).await.unwrap();
1079
1080        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1081    }
1082
1083    #[tokio::test]
1084    async fn multi_external_bytes() {
1085        let mut deployment = Deployment::new();
1086
1087        let flow = FlowBuilder::new();
1088        let first_node = flow.process::<()>();
1089        let external = flow.external::<()>();
1090
1091        let (in_port, input, _membership, complete_sink) = first_node
1092            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1093        let out = input.entries().send_bincode_external(&external);
1094        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
1095
1096        let nodes = flow
1097            .with_process(&first_node, deployment.Localhost())
1098            .with_external(&external, deployment.Localhost())
1099            .deploy(&mut deployment);
1100
1101        deployment.deploy().await.unwrap();
1102
1103        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1104        let mut external_in_2 = nodes.connect(in_port).await.1;
1105        let external_out = nodes.connect(out).await;
1106
1107        deployment.start().await.unwrap();
1108
1109        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1110        external_in_2.send(vec![4, 5].into()).await.unwrap();
1111
1112        assert_eq!(
1113            external_out.take(2).collect::<HashSet<_>>().await,
1114            vec![
1115                (0, (&[1u8, 2, 3] as &[u8]).into()),
1116                (1, (&[4u8, 5] as &[u8]).into())
1117            ]
1118            .into_iter()
1119            .collect()
1120        );
1121    }
1122
1123    #[tokio::test]
1124    async fn single_client_external_bytes() {
1125        let mut deployment = Deployment::new();
1126        let flow = FlowBuilder::new();
1127        let first_node = flow.process::<()>();
1128        let external = flow.external::<()>();
1129        let (port, input, complete_sink) = first_node
1130            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1131        complete_sink.complete(input.map(q!(|data| {
1132            let mut resp: Vec<u8> = data.into();
1133            resp.push(42);
1134            resp.into() // : Bytes
1135        })));
1136
1137        let nodes = flow
1138            .with_process(&first_node, deployment.Localhost())
1139            .with_external(&external, deployment.Localhost())
1140            .deploy(&mut deployment);
1141
1142        deployment.deploy().await.unwrap();
1143        deployment.start().await.unwrap();
1144
1145        let (mut external_out, mut external_in) = nodes.connect(port).await;
1146
1147        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1148        assert_eq!(
1149            external_out.next().await.unwrap().unwrap(),
1150            vec![1, 2, 3, 42]
1151        );
1152    }
1153
1154    #[tokio::test]
1155    async fn echo_external_bytes() {
1156        let mut deployment = Deployment::new();
1157
1158        let flow = FlowBuilder::new();
1159        let first_node = flow.process::<()>();
1160        let external = flow.external::<()>();
1161
1162        let (port, input, _membership, complete_sink) = first_node
1163            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1164        complete_sink
1165            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1166
1167        let nodes = flow
1168            .with_process(&first_node, deployment.Localhost())
1169            .with_external(&external, deployment.Localhost())
1170            .deploy(&mut deployment);
1171
1172        deployment.deploy().await.unwrap();
1173
1174        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1175        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1176
1177        deployment.start().await.unwrap();
1178
1179        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1180        external_in_2.send(vec![4, 5].into()).await.unwrap();
1181
1182        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1183        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1184    }
1185
1186    #[tokio::test]
1187    async fn echo_external_bincode() {
1188        let mut deployment = Deployment::new();
1189
1190        let flow = FlowBuilder::new();
1191        let first_node = flow.process::<()>();
1192        let external = flow.external::<()>();
1193
1194        let (port, input, _membership, complete_sink) =
1195            first_node.bidi_external_many_bincode(&external);
1196        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1197
1198        let nodes = flow
1199            .with_process(&first_node, deployment.Localhost())
1200            .with_external(&external, deployment.Localhost())
1201            .deploy(&mut deployment);
1202
1203        deployment.deploy().await.unwrap();
1204
1205        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1206        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1207
1208        deployment.start().await.unwrap();
1209
1210        external_in_1.send("hi".to_string()).await.unwrap();
1211        external_in_2.send("hello".to_string()).await.unwrap();
1212
1213        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1214        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1215    }
1216}