1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use quote::quote;
24use serde::de::DeserializeOwned;
25use serde::{Deserialize, Serialize};
26use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
27use stageleft::{QuotedWithContext, q, quote_type};
28use syn::parse_quote;
29use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
30
31use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
32use crate::forward_handle::ForwardRef;
33#[cfg(stageleft_runtime)]
34use crate::forward_handle::{CycleCollection, ForwardHandle};
35use crate::live_collections::boundedness::Unbounded;
36use crate::live_collections::keyed_stream::KeyedStream;
37use crate::live_collections::singleton::Singleton;
38use crate::live_collections::stream::{
39 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
40};
41use crate::location::dynamic::LocationId;
42use crate::location::external_process::{
43 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
44};
45use crate::nondet::NonDet;
46#[cfg(feature = "sim")]
47use crate::sim::SimSender;
48use crate::staging_util::get_this_crate;
49
50pub mod dynamic;
51
52#[expect(missing_docs, reason = "TODO")]
53pub mod external_process;
54pub use external_process::External;
55
56#[expect(missing_docs, reason = "TODO")]
57pub mod process;
58pub use process::Process;
59
60#[expect(missing_docs, reason = "TODO")]
61pub mod cluster;
62pub use cluster::Cluster;
63
64#[expect(missing_docs, reason = "TODO")]
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67#[expect(missing_docs, reason = "TODO")]
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71#[expect(missing_docs, reason = "TODO")]
72#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
73pub enum MembershipEvent {
74 Joined,
75 Left,
76}
77
78#[expect(missing_docs, reason = "TODO")]
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
80pub enum NetworkHint {
81 Auto,
82 TcpPort(Option<u16>),
83}
84
85pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
86 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
87}
88
89#[expect(missing_docs, reason = "TODO")]
90#[expect(
91 private_bounds,
92 reason = "only internal Hydro code can define location types"
93)]
94pub trait Location<'a>: dynamic::DynLocation {
95 type Root: Location<'a>;
96 fn root(&self) -> Self::Root;
97
98 fn name() -> String;
100
101 fn try_tick(&self) -> Option<Tick<Self>> {
102 if Self::is_top_level() {
103 let next_id = self.flow_state().borrow_mut().next_clock_id;
104 self.flow_state().borrow_mut().next_clock_id += 1;
105 Some(Tick {
106 id: next_id,
107 l: self.clone(),
108 })
109 } else {
110 None
111 }
112 }
113
114 fn id(&self) -> LocationId {
115 dynamic::DynLocation::id(self)
116 }
117
118 fn tick(&self) -> Tick<Self>
119 where
120 Self: NoTick,
121 {
122 let next_id = self.flow_state().borrow_mut().next_clock_id;
123 self.flow_state().borrow_mut().next_clock_id += 1;
124 Tick {
125 id: next_id,
126 l: self.clone(),
127 }
128 }
129
130 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
131 where
132 Self: Sized + NoTick,
133 {
134 Stream::new(
135 self.clone(),
136 HydroNode::Source {
137 source: HydroSource::Spin(),
138 metadata: self.new_node_metadata(Stream::<
139 (),
140 Self,
141 Unbounded,
142 TotalOrder,
143 ExactlyOnce,
144 >::collection_kind()),
145 },
146 )
147 }
148
149 fn source_stream<T, E>(
150 &self,
151 e: impl QuotedWithContext<'a, E, Self>,
152 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
153 where
154 E: FuturesStream<Item = T> + Unpin,
155 Self: Sized + NoTick,
156 {
157 let e = e.splice_untyped_ctx(self);
158
159 Stream::new(
160 self.clone(),
161 HydroNode::Source {
162 source: HydroSource::Stream(e.into()),
163 metadata: self.new_node_metadata(Stream::<
164 T,
165 Self,
166 Unbounded,
167 TotalOrder,
168 ExactlyOnce,
169 >::collection_kind()),
170 },
171 )
172 }
173
174 fn source_iter<T, E>(
175 &self,
176 e: impl QuotedWithContext<'a, E, Self>,
177 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
178 where
179 E: IntoIterator<Item = T>,
180 Self: Sized + NoTick,
181 {
182 let e = e.splice_typed_ctx(self);
185
186 Stream::new(
187 self.clone(),
188 HydroNode::Source {
189 source: HydroSource::Iter(e.into()),
190 metadata: self.new_node_metadata(Stream::<
191 T,
192 Self,
193 Unbounded,
194 TotalOrder,
195 ExactlyOnce,
196 >::collection_kind()),
197 },
198 )
199 }
200
201 fn source_cluster_members<C: 'a>(
202 &self,
203 cluster: &Cluster<'a, C>,
204 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
205 where
206 Self: Sized + NoTick,
207 {
208 Stream::new(
209 self.clone(),
210 HydroNode::Source {
211 source: HydroSource::ClusterMembers(cluster.id()),
212 metadata: self.new_node_metadata(Stream::<
213 (TaglessMemberId, MembershipEvent),
214 Self,
215 Unbounded,
216 TotalOrder,
217 ExactlyOnce,
218 >::collection_kind()),
219 },
220 )
221 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
222 .into_keyed()
223 }
224
225 fn source_external_bytes<L>(
226 &self,
227 from: &External<L>,
228 ) -> (
229 ExternalBytesPort,
230 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
231 )
232 where
233 Self: Sized + NoTick,
234 {
235 let (port, stream, sink) =
236 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
237
238 sink.complete(self.source_iter(q!([])));
239
240 (port, stream)
241 }
242
243 #[expect(clippy::type_complexity, reason = "stream markers")]
244 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
245 &self,
246 from: &External<L>,
247 ) -> (
248 ExternalBincodeSink<T, NotMany, O, R>,
249 Stream<T, Self, Unbounded, O, R>,
250 )
251 where
252 Self: Sized + NoTick,
253 T: Serialize + DeserializeOwned,
254 {
255 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
256 sink.complete(self.source_iter(q!([])));
257
258 (
259 ExternalBincodeSink {
260 process_id: from.id,
261 port_id: port.port_id,
262 _phantom: PhantomData,
263 },
264 stream.weaken_ordering().weaken_retries(),
265 )
266 }
267
268 #[cfg(feature = "sim")]
269 #[expect(clippy::type_complexity, reason = "stream markers")]
270 fn sim_input<T, O: Ordering, R: Retries>(
273 &self,
274 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
275 where
276 Self: Sized + NoTick,
277 T: Serialize + DeserializeOwned,
278 {
279 let external_location: External<'a, ()> = External {
280 id: 0,
281 flow_state: self.flow_state().clone(),
282 _phantom: PhantomData,
283 };
284
285 let (external, stream) = self.source_external_bincode(&external_location);
286
287 (SimSender(external.port_id, PhantomData), stream)
288 }
289
290 #[expect(clippy::type_complexity, reason = "stream markers")]
335 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
336 &self,
337 from: &External<L>,
338 port_hint: NetworkHint,
339 ) -> (
340 ExternalBytesPort<NotMany>,
341 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
342 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
343 )
344 where
345 Self: Sized + NoTick,
346 {
347 let next_external_port_id = {
348 let mut flow_state = from.flow_state.borrow_mut();
349 let id = flow_state.next_external_out;
350 flow_state.next_external_out += 1;
351 id
352 };
353
354 let (fwd_ref, to_sink) =
355 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
356 let mut flow_state_borrow = self.flow_state().borrow_mut();
357
358 flow_state_borrow.push_root(HydroRoot::SendExternal {
359 to_external_id: from.id,
360 to_key: next_external_port_id,
361 to_many: false,
362 unpaired: false,
363 serialize_fn: None,
364 instantiate_fn: DebugInstantiate::Building,
365 input: Box::new(to_sink.ir_node.into_inner()),
366 op_metadata: HydroIrOpMetadata::new(),
367 });
368
369 let raw_stream: Stream<
370 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
371 Self,
372 Unbounded,
373 TotalOrder,
374 ExactlyOnce,
375 > = Stream::new(
376 self.clone(),
377 HydroNode::ExternalInput {
378 from_external_id: from.id,
379 from_key: next_external_port_id,
380 from_many: false,
381 codec_type: quote_type::<Codec>().into(),
382 port_hint,
383 instantiate_fn: DebugInstantiate::Building,
384 deserialize_fn: None,
385 metadata: self.new_node_metadata(Stream::<
386 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
387 Self,
388 Unbounded,
389 TotalOrder,
390 ExactlyOnce,
391 >::collection_kind()),
392 },
393 );
394
395 (
396 ExternalBytesPort {
397 process_id: from.id,
398 port_id: next_external_port_id,
399 _phantom: PhantomData,
400 },
401 raw_stream.flatten_ordered(),
402 fwd_ref,
403 )
404 }
405
406 #[expect(clippy::type_complexity, reason = "stream markers")]
407 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
408 &self,
409 from: &External<L>,
410 ) -> (
411 ExternalBincodeBidi<InT, OutT, NotMany>,
412 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
413 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
414 )
415 where
416 Self: Sized + NoTick,
417 {
418 let next_external_port_id = {
419 let mut flow_state = from.flow_state.borrow_mut();
420 let id = flow_state.next_external_out;
421 flow_state.next_external_out += 1;
422 id
423 };
424
425 let (fwd_ref, to_sink) =
426 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
427 let mut flow_state_borrow = self.flow_state().borrow_mut();
428
429 let root = get_this_crate();
430
431 let out_t_type = quote_type::<OutT>();
432 let ser_fn: syn::Expr = syn::parse_quote! {
433 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
434 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
435 )
436 };
437
438 flow_state_borrow.push_root(HydroRoot::SendExternal {
439 to_external_id: from.id,
440 to_key: next_external_port_id,
441 to_many: false,
442 unpaired: false,
443 serialize_fn: Some(ser_fn.into()),
444 instantiate_fn: DebugInstantiate::Building,
445 input: Box::new(to_sink.ir_node.into_inner()),
446 op_metadata: HydroIrOpMetadata::new(),
447 });
448
449 let in_t_type = quote_type::<InT>();
450
451 let deser_fn: syn::Expr = syn::parse_quote! {
452 |res| {
453 let b = res.unwrap();
454 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
455 }
456 };
457
458 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
459 self.clone(),
460 HydroNode::ExternalInput {
461 from_external_id: from.id,
462 from_key: next_external_port_id,
463 from_many: false,
464 codec_type: quote_type::<LengthDelimitedCodec>().into(),
465 port_hint: NetworkHint::Auto,
466 instantiate_fn: DebugInstantiate::Building,
467 deserialize_fn: Some(deser_fn.into()),
468 metadata: self.new_node_metadata(Stream::<
469 InT,
470 Self,
471 Unbounded,
472 TotalOrder,
473 ExactlyOnce,
474 >::collection_kind()),
475 },
476 );
477
478 (
479 ExternalBincodeBidi {
480 process_id: from.id,
481 port_id: next_external_port_id,
482 _phantom: PhantomData,
483 },
484 raw_stream,
485 fwd_ref,
486 )
487 }
488
489 #[expect(clippy::type_complexity, reason = "stream markers")]
490 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
491 &self,
492 from: &External<L>,
493 port_hint: NetworkHint,
494 ) -> (
495 ExternalBytesPort<Many>,
496 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
497 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
498 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
499 )
500 where
501 Self: Sized + NoTick,
502 {
503 let next_external_port_id = {
504 let mut flow_state = from.flow_state.borrow_mut();
505 let id = flow_state.next_external_out;
506 flow_state.next_external_out += 1;
507 id
508 };
509
510 let (fwd_ref, to_sink) =
511 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
512 let mut flow_state_borrow = self.flow_state().borrow_mut();
513
514 flow_state_borrow.push_root(HydroRoot::SendExternal {
515 to_external_id: from.id,
516 to_key: next_external_port_id,
517 to_many: true,
518 unpaired: false,
519 serialize_fn: None,
520 instantiate_fn: DebugInstantiate::Building,
521 input: Box::new(to_sink.entries().ir_node.into_inner()),
522 op_metadata: HydroIrOpMetadata::new(),
523 });
524
525 let raw_stream: Stream<
526 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
527 Self,
528 Unbounded,
529 TotalOrder,
530 ExactlyOnce,
531 > = Stream::new(
532 self.clone(),
533 HydroNode::ExternalInput {
534 from_external_id: from.id,
535 from_key: next_external_port_id,
536 from_many: true,
537 codec_type: quote_type::<Codec>().into(),
538 port_hint,
539 instantiate_fn: DebugInstantiate::Building,
540 deserialize_fn: None,
541 metadata: self.new_node_metadata(Stream::<
542 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
543 Self,
544 Unbounded,
545 TotalOrder,
546 ExactlyOnce,
547 >::collection_kind()),
548 },
549 );
550
551 let membership_stream_ident = syn::Ident::new(
552 &format!(
553 "__hydro_deploy_many_{}_{}_membership",
554 from.id, next_external_port_id
555 ),
556 Span::call_site(),
557 );
558 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
559 let raw_membership_stream: KeyedStream<
560 u64,
561 bool,
562 Self,
563 Unbounded,
564 TotalOrder,
565 ExactlyOnce,
566 > = KeyedStream::new(
567 self.clone(),
568 HydroNode::Source {
569 source: HydroSource::Stream(membership_stream_expr.into()),
570 metadata: self.new_node_metadata(KeyedStream::<
571 u64,
572 bool,
573 Self,
574 Unbounded,
575 TotalOrder,
576 ExactlyOnce,
577 >::collection_kind()),
578 },
579 );
580
581 (
582 ExternalBytesPort {
583 process_id: from.id,
584 port_id: next_external_port_id,
585 _phantom: PhantomData,
586 },
587 raw_stream
588 .flatten_ordered() .into_keyed(),
590 raw_membership_stream.map(q!(|join| {
591 if join {
592 MembershipEvent::Joined
593 } else {
594 MembershipEvent::Left
595 }
596 })),
597 fwd_ref,
598 )
599 }
600
601 #[expect(clippy::type_complexity, reason = "stream markers")]
602 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
603 &self,
604 from: &External<L>,
605 ) -> (
606 ExternalBincodeBidi<InT, OutT, Many>,
607 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
608 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
609 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
610 )
611 where
612 Self: Sized + NoTick,
613 {
614 let next_external_port_id = {
615 let mut flow_state = from.flow_state.borrow_mut();
616 let id = flow_state.next_external_out;
617 flow_state.next_external_out += 1;
618 id
619 };
620
621 let root = get_this_crate();
622
623 let (fwd_ref, to_sink) =
624 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
625 let mut flow_state_borrow = self.flow_state().borrow_mut();
626
627 let out_t_type = quote_type::<OutT>();
628 let ser_fn: syn::Expr = syn::parse_quote! {
629 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
630 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
631 )
632 };
633
634 flow_state_borrow.push_root(HydroRoot::SendExternal {
635 to_external_id: from.id,
636 to_key: next_external_port_id,
637 to_many: true,
638 unpaired: false,
639 serialize_fn: Some(ser_fn.into()),
640 instantiate_fn: DebugInstantiate::Building,
641 input: Box::new(to_sink.entries().ir_node.into_inner()),
642 op_metadata: HydroIrOpMetadata::new(),
643 });
644
645 let in_t_type = quote_type::<InT>();
646
647 let deser_fn: syn::Expr = syn::parse_quote! {
648 |res| {
649 let (id, b) = res.unwrap();
650 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
651 }
652 };
653
654 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
655 KeyedStream::new(
656 self.clone(),
657 HydroNode::ExternalInput {
658 from_external_id: from.id,
659 from_key: next_external_port_id,
660 from_many: true,
661 codec_type: quote_type::<LengthDelimitedCodec>().into(),
662 port_hint: NetworkHint::Auto,
663 instantiate_fn: DebugInstantiate::Building,
664 deserialize_fn: Some(deser_fn.into()),
665 metadata: self.new_node_metadata(KeyedStream::<
666 u64,
667 InT,
668 Self,
669 Unbounded,
670 TotalOrder,
671 ExactlyOnce,
672 >::collection_kind()),
673 },
674 );
675
676 let membership_stream_ident = syn::Ident::new(
677 &format!(
678 "__hydro_deploy_many_{}_{}_membership",
679 from.id, next_external_port_id
680 ),
681 Span::call_site(),
682 );
683 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
684 let raw_membership_stream: KeyedStream<
685 u64,
686 bool,
687 Self,
688 Unbounded,
689 TotalOrder,
690 ExactlyOnce,
691 > = KeyedStream::new(
692 self.clone(),
693 HydroNode::Source {
694 source: HydroSource::Stream(membership_stream_expr.into()),
695 metadata: self.new_node_metadata(KeyedStream::<
696 u64,
697 bool,
698 Self,
699 Unbounded,
700 TotalOrder,
701 ExactlyOnce,
702 >::collection_kind()),
703 },
704 );
705
706 (
707 ExternalBincodeBidi {
708 process_id: from.id,
709 port_id: next_external_port_id,
710 _phantom: PhantomData,
711 },
712 raw_stream,
713 raw_membership_stream.map(q!(|join| {
714 if join {
715 MembershipEvent::Joined
716 } else {
717 MembershipEvent::Left
718 }
719 })),
720 fwd_ref,
721 )
722 }
723
724 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
742 where
743 T: Clone,
744 Self: Sized,
745 {
746 let e = e.splice_untyped_ctx(self);
750
751 Singleton::new(
752 self.clone(),
753 HydroNode::SingletonSource {
754 value: e.into(),
755 metadata: self
756 .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
757 },
758 )
759 }
760
761 fn source_interval(
771 &self,
772 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
773 _nondet: NonDet,
774 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
775 where
776 Self: Sized + NoTick,
777 {
778 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
779 tokio::time::interval(interval)
780 )))
781 }
782
783 fn source_interval_delayed(
794 &self,
795 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
796 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
797 _nondet: NonDet,
798 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
799 where
800 Self: Sized + NoTick,
801 {
802 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
803 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
804 )))
805 }
806
807 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
808 where
809 S: CycleCollection<'a, ForwardRef, Location = Self>,
810 Self: NoTick,
811 {
812 let next_id = self.flow_state().borrow_mut().next_cycle_id();
813 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
814
815 (
816 ForwardHandle {
817 completed: false,
818 ident: ident.clone(),
819 expected_location: Location::id(self),
820 _phantom: PhantomData,
821 },
822 S::create_source(ident, self.clone()),
823 )
824 }
825}
826
827pub static LOCATION_SELF_ID: LocationSelfId = LocationSelfId { _private: &() };
830
831#[derive(Clone, Copy)]
833pub struct LocationSelfId<'a> {
834 _private: &'a (),
835}
836
837impl<'a, L> FreeVariableWithContext<L> for LocationSelfId<'a>
838where
839 L: Location<'a>,
840{
841 type O = LocationId;
842
843 fn to_tokens(self, ctx: &L) -> QuoteTokens
844 where
845 Self: Sized,
846 {
847 let root = get_this_crate();
848 let location_id = ctx.root().id();
849
850 let variant = match location_id {
851 LocationId::Process(id) => quote! { Process(#id) },
852 LocationId::Cluster(id) => quote! { Cluster(#id) },
853 _other => unreachable!(),
854 };
855
856 QuoteTokens {
857 prelude: None,
858 expr: Some(quote! { #root::location::dynamic::LocationId::#variant }),
859 }
860 }
861}
862
863pub static LOCATION_SELF_NAME: LocationSelfName = LocationSelfName { _private: &() };
866
867#[derive(Clone, Copy)]
869pub struct LocationSelfName<'a> {
870 _private: &'a (),
871}
872
873impl<'a, L> FreeVariableWithContext<L> for LocationSelfName<'a>
874where
875 L: Location<'a>,
876{
877 type O = LocationId;
878
879 fn to_tokens(self, _ctx: &L) -> QuoteTokens
880 where
881 Self: Sized,
882 {
883 let location_name = <L::Root as Location>::name();
884
885 QuoteTokens {
886 prelude: None,
887 expr: Some(quote! { #location_name }),
888 }
889 }
890}
891
892#[cfg(feature = "deploy")]
893#[cfg(test)]
894mod tests {
895 use std::collections::HashSet;
896
897 use futures::{SinkExt, StreamExt};
898 use hydro_deploy::Deployment;
899 use stageleft::q;
900 use tokio_util::codec::LengthDelimitedCodec;
901
902 use crate::compile::builder::FlowBuilder;
903 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
904 use crate::location::{Location, NetworkHint};
905 use crate::nondet::nondet;
906
907 #[tokio::test]
908 async fn top_level_singleton_replay_cardinality() {
909 let mut deployment = Deployment::new();
910
911 let flow = FlowBuilder::new();
912 let node = flow.process::<()>();
913 let external = flow.external::<()>();
914
915 let (in_port, input) =
916 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
917 let singleton = node.singleton(q!(123));
918 let tick = node.tick();
919 let out = input
920 .batch(&tick, nondet!())
921 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
922 .cross_singleton(
923 singleton
924 .snapshot(&tick, nondet!())
925 .into_stream()
926 .count(),
927 )
928 .all_ticks()
929 .send_bincode_external(&external);
930
931 let nodes = flow
932 .with_process(&node, deployment.Localhost())
933 .with_external(&external, deployment.Localhost())
934 .deploy(&mut deployment);
935
936 deployment.deploy().await.unwrap();
937
938 let mut external_in = nodes.connect(in_port).await;
939 let mut external_out = nodes.connect(out).await;
940
941 deployment.start().await.unwrap();
942
943 external_in.send(1).await.unwrap();
944 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
945
946 external_in.send(2).await.unwrap();
947 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
948 }
949
950 #[tokio::test]
951 async fn tick_singleton_replay_cardinality() {
952 let mut deployment = Deployment::new();
953
954 let flow = FlowBuilder::new();
955 let node = flow.process::<()>();
956 let external = flow.external::<()>();
957
958 let (in_port, input) =
959 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
960 let tick = node.tick();
961 let singleton = tick.singleton(q!(123));
962 let out = input
963 .batch(&tick, nondet!())
964 .cross_singleton(singleton.clone())
965 .cross_singleton(singleton.into_stream().count())
966 .all_ticks()
967 .send_bincode_external(&external);
968
969 let nodes = flow
970 .with_process(&node, deployment.Localhost())
971 .with_external(&external, deployment.Localhost())
972 .deploy(&mut deployment);
973
974 deployment.deploy().await.unwrap();
975
976 let mut external_in = nodes.connect(in_port).await;
977 let mut external_out = nodes.connect(out).await;
978
979 deployment.start().await.unwrap();
980
981 external_in.send(1).await.unwrap();
982 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
983
984 external_in.send(2).await.unwrap();
985 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
986 }
987
988 #[tokio::test]
989 async fn external_bytes() {
990 let mut deployment = Deployment::new();
991
992 let flow = FlowBuilder::new();
993 let first_node = flow.process::<()>();
994 let external = flow.external::<()>();
995
996 let (in_port, input) = first_node.source_external_bytes(&external);
997 let out = input.send_bincode_external(&external);
998
999 let nodes = flow
1000 .with_process(&first_node, deployment.Localhost())
1001 .with_external(&external, deployment.Localhost())
1002 .deploy(&mut deployment);
1003
1004 deployment.deploy().await.unwrap();
1005
1006 let mut external_in = nodes.connect(in_port).await.1;
1007 let mut external_out = nodes.connect(out).await;
1008
1009 deployment.start().await.unwrap();
1010
1011 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1012
1013 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1014 }
1015
1016 #[tokio::test]
1017 async fn multi_external_source() {
1018 let mut deployment = Deployment::new();
1019
1020 let flow = FlowBuilder::new();
1021 let first_node = flow.process::<()>();
1022 let external = flow.external::<()>();
1023
1024 let (in_port, input, _membership, complete_sink) =
1025 first_node.bidi_external_many_bincode(&external);
1026 let out = input.entries().send_bincode_external(&external);
1027 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
1028
1029 let nodes = flow
1030 .with_process(&first_node, deployment.Localhost())
1031 .with_external(&external, deployment.Localhost())
1032 .deploy(&mut deployment);
1033
1034 deployment.deploy().await.unwrap();
1035
1036 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1037 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1038 let external_out = nodes.connect(out).await;
1039
1040 deployment.start().await.unwrap();
1041
1042 external_in_1.send(123).await.unwrap();
1043 external_in_2.send(456).await.unwrap();
1044
1045 assert_eq!(
1046 external_out.take(2).collect::<HashSet<_>>().await,
1047 vec![(0, 123), (1, 456)].into_iter().collect()
1048 );
1049 }
1050
1051 #[tokio::test]
1052 async fn second_connection_only_multi_source() {
1053 let mut deployment = Deployment::new();
1054
1055 let flow = FlowBuilder::new();
1056 let first_node = flow.process::<()>();
1057 let external = flow.external::<()>();
1058
1059 let (in_port, input, _membership, complete_sink) =
1060 first_node.bidi_external_many_bincode(&external);
1061 let out = input.entries().send_bincode_external(&external);
1062 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
1063
1064 let nodes = flow
1065 .with_process(&first_node, deployment.Localhost())
1066 .with_external(&external, deployment.Localhost())
1067 .deploy(&mut deployment);
1068
1069 deployment.deploy().await.unwrap();
1070
1071 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1073 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1074 let mut external_out = nodes.connect(out).await;
1075
1076 deployment.start().await.unwrap();
1077
1078 external_in_2.send(456).await.unwrap();
1079
1080 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1081 }
1082
1083 #[tokio::test]
1084 async fn multi_external_bytes() {
1085 let mut deployment = Deployment::new();
1086
1087 let flow = FlowBuilder::new();
1088 let first_node = flow.process::<()>();
1089 let external = flow.external::<()>();
1090
1091 let (in_port, input, _membership, complete_sink) = first_node
1092 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1093 let out = input.entries().send_bincode_external(&external);
1094 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
1095
1096 let nodes = flow
1097 .with_process(&first_node, deployment.Localhost())
1098 .with_external(&external, deployment.Localhost())
1099 .deploy(&mut deployment);
1100
1101 deployment.deploy().await.unwrap();
1102
1103 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1104 let mut external_in_2 = nodes.connect(in_port).await.1;
1105 let external_out = nodes.connect(out).await;
1106
1107 deployment.start().await.unwrap();
1108
1109 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1110 external_in_2.send(vec![4, 5].into()).await.unwrap();
1111
1112 assert_eq!(
1113 external_out.take(2).collect::<HashSet<_>>().await,
1114 vec![
1115 (0, (&[1u8, 2, 3] as &[u8]).into()),
1116 (1, (&[4u8, 5] as &[u8]).into())
1117 ]
1118 .into_iter()
1119 .collect()
1120 );
1121 }
1122
1123 #[tokio::test]
1124 async fn single_client_external_bytes() {
1125 let mut deployment = Deployment::new();
1126 let flow = FlowBuilder::new();
1127 let first_node = flow.process::<()>();
1128 let external = flow.external::<()>();
1129 let (port, input, complete_sink) = first_node
1130 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1131 complete_sink.complete(input.map(q!(|data| {
1132 let mut resp: Vec<u8> = data.into();
1133 resp.push(42);
1134 resp.into() })));
1136
1137 let nodes = flow
1138 .with_process(&first_node, deployment.Localhost())
1139 .with_external(&external, deployment.Localhost())
1140 .deploy(&mut deployment);
1141
1142 deployment.deploy().await.unwrap();
1143 deployment.start().await.unwrap();
1144
1145 let (mut external_out, mut external_in) = nodes.connect(port).await;
1146
1147 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1148 assert_eq!(
1149 external_out.next().await.unwrap().unwrap(),
1150 vec![1, 2, 3, 42]
1151 );
1152 }
1153
1154 #[tokio::test]
1155 async fn echo_external_bytes() {
1156 let mut deployment = Deployment::new();
1157
1158 let flow = FlowBuilder::new();
1159 let first_node = flow.process::<()>();
1160 let external = flow.external::<()>();
1161
1162 let (port, input, _membership, complete_sink) = first_node
1163 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1164 complete_sink
1165 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1166
1167 let nodes = flow
1168 .with_process(&first_node, deployment.Localhost())
1169 .with_external(&external, deployment.Localhost())
1170 .deploy(&mut deployment);
1171
1172 deployment.deploy().await.unwrap();
1173
1174 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1175 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1176
1177 deployment.start().await.unwrap();
1178
1179 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1180 external_in_2.send(vec![4, 5].into()).await.unwrap();
1181
1182 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1183 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1184 }
1185
1186 #[tokio::test]
1187 async fn echo_external_bincode() {
1188 let mut deployment = Deployment::new();
1189
1190 let flow = FlowBuilder::new();
1191 let first_node = flow.process::<()>();
1192 let external = flow.external::<()>();
1193
1194 let (port, input, _membership, complete_sink) =
1195 first_node.bidi_external_many_bincode(&external);
1196 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1197
1198 let nodes = flow
1199 .with_process(&first_node, deployment.Localhost())
1200 .with_external(&external, deployment.Localhost())
1201 .deploy(&mut deployment);
1202
1203 deployment.deploy().await.unwrap();
1204
1205 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1206 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1207
1208 deployment.start().await.unwrap();
1209
1210 external_in_1.send("hi".to_string()).await.unwrap();
1211 external_in_2.send("hello".to_string()).await.unwrap();
1212
1213 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1214 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1215 }
1216}