Unity_Rx

r

Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates). Multicast delegates are not ideal however as they exhibit the following less desirable features;In C#, events have a curious interface. Some find the += and -= operators an unnatural way to register a callbackEvents are difficult to composeEvents don't offer the ability to be easily queried over timeEvents are a common cause of accidental memory leaksEvents do not have a standard pattern for signaling completionEvents provide almost no help for concurrency or multithreaded applications. e.g. To raise an event on a separate thread requires you to do all of the plumbingRx looks to solve these problems. Here I will introduce you to the building blocks and some basic types that make up Rx.

m

(example)

Observable

View(Mono)

(position)

protected

override

Vector3

CalculatevPos (){ ... }

return this.transform.position;

// calculates on each update

IObservable<Vector3>

GetvPosObservable (){ ... }

return base.GetvPosObservable ();

(overrides)

return PositionAsObservable.Select(p=>CalculatevPos());

//only invokes if postion has changed

//p==Vector3 (pos) ???

// or p == p<T> class (property Class for viewmodel )

// Viewmodel collection = ModelCollection<T> class

return PositionAsObservable.Sample(TimeSpan.FromSeconds(1.0f)).Select(p=>CalculatevPos());

//only invokes every x seconds

(Update)

this.UpdateAsObservable().Subscribe(_=>{ Debug.Log("THIS IS CALLED EVERY FRAME") });

this.UpdateAsObservable().Where(_=>Input.GetMouseButtonDown(0)).Subscribe(_=>{ Debug.Log("THIS IS CALLED when the mouse button is down") });

this.UpdateAsObservable().Where(_=>Input.GetMouseButtonDown(0)).Throttle(TimeSpan.FromSeconds(1)).Subscribe(_=>{ Debug.Log("This is only called once person second. This keeps many clicks from firing this to quickly") });

(uGUI)

Button poorButton; // Our button.

poorButton.AsClickObservable()
.Where(_ => YOUR FILTER )
.Throttle(_ => PREVENT SPAMMING )
.DelayFrame( FRAMES TO DELAY )
.SkipWhile( SKIP FIRST TAPS IF CONDITION IS NOT MET )
.Subscribe(_ =>
{
USE LIKE SIMPLE OBSERVABLE
});

//uGUI events as observables. A button as an event stream in your view.

(sprites)

var sprites = GetComponentsInChildren().ToList ();

if(value){
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Subscribe(l =>
{
if (Character.isInvulnerable)
sprites.ForEach(s => s.enabled = 1 % 2 == 0);
}).DisposeWhenChanged(Character._isInvulnerableProperty);
}
else
{
sprites.ForEach(s => s.enabled = true);
}

//Flash sprites on and off rapidly.

(statemachine)

{StateMachineProperty}Property.Subscribe(_=>{ {StateMachineProperty}Property.LastState });

//Bind to state machines last state

Timer

Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(...);

l=>{ myElement.JumpLock = false; }

//l=> ; l==long???

Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Subscribe(_ => {
ExecutedamagePerSecondTick();
}).DisposeWhenChanged(Player.healthStateProperty);

//Once per second, execute command to damage player

if (state is Wave)
{
// Wait for the alloted amount of time
Observable.Interval(TimeSpan.FromSeconds(wavesFPSGame.SpawnWaitSeconds))
.Where(_ => wavesFPSGame.WavesState is Wave)
.Take(wavesFPSGame.KillsToNextWave)
.Subscribe(l => {
SpawnEnemy();
});
}

//Time based functionality without co-routines in controller to trigger waves.

(Transform)

??? What is the difference between

Subscribe

Bind

(def)

LINQ

// allows you query data at rest

Rx

// Rx offers the ability to query data in motion

public

interface

IObservable<out T>{ ... }

IObservable<T>

r

You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.----IObservable<T> is one of the two new core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft is so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.---.NET already has the concept of Streams with the type and sub types of System.IO.Stream. The System.IO.Stream implementations are commonly used to stream data (generally bytes) to or from an I/O device like a file, network or block of memory. System.IO.Stream implementations can have both the ability to read and write, and sometimes the ability to seek (i.e. fast forward through a stream or move backwards). When I refer to an instance of IObservable<T> as a stream, it does not exhibit the seek or write functionality that streams do. This is a fundamental difference preventing Rx being built on top of the System.IO.Stream paradigm. Rx does however have the concept of forward streaming (push), disposing (closing) and completing (eof). Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding. These features are also not an appropriate fit for the existing System.IO.Stream types. Some others refer to instances of IObservable<T> as Observable Collections, which I find hard to understand. While the observable part makes sense to me, I do not find them like collections at all. You generally cannot sort, insert or remove items from an IObservable<T> instance like I would expect you can with a collection. Collections generally have some sort of backing store like an internal array. The values from an IObservable<T> source are not usually pre-materialized as you would expect from a normal collection. There is also a type in WPF/Silverlight called an ObservableCollection<T> that does exhibit collection-like behavior, and is very well suited to this description. In fact IObservable<T> integrates very well with ObservableCollection<T> instances. So to save on any confusion we will refer to instances of IObservable<T> as sequences. While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, and IObservable<T> instances are sequences of data in motion.

// 'write/publisher' interface

// streaming sequence of object T

// not == ioStream; but has streaming(push), disposing(closing) and completing(eof)

// Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding

// IObservable<T> instances are sequences of data in motion

// While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, and

IDisposable

Subscribe(IObserver<T> observer);

//only link to subscription, if not captured is lost...can't gc...memory leak

//built in methods gc???

IObserver<in T>{ ... }

IObserver

r

Rx has an implicit contract that must be followed. An implementation of IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError(Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by an OnError(Exception), or an OnCompleted(). This protocol does not however demand that an OnNext(T), OnError(Exception) or OnCompleted() ever be called. This enables to concept of empty and infinite sequences. We will look into this more later.Interestingly, while you will be exposed to the IObservable<T> interface frequently if you work with Rx, in general you will not need to be concerned with IObserver<T>. This is due to Rx providing anonymous implementations via methods like Subscribe.

// 'reader/consumer' interface

void

OnNext(T value);

//Provides the observer with new data.

OnError(Exception error);

//Notifies the observer that the provider has experienced an error condition.

OnCompleted();

//Notifies the observer that the provider has finished sending push-based notifications.