101http://rxwiki.wikidot.com/101samples
昨天,隔壁大娘教了我五个字母,今天一兴奋,决定和机翻配合用下。不要问为什么全漏了,我倒是要会呀!!放一个鬼在这。
Func<Action<
int
>, Action<
int
[]>> iter =
new
Func<Action<
int
>,Action<
int
[]>>
(x =>
new
Action<
int
[]>(y => {
foreach
(
int
i
in
y) x(i); }));
var OutputArray = iter(x => Console.WriteLine(x));
int
[] data = { 1, 2, 3, 4, 5 };
OutputArray(data);
Why Rx?
通常,unity的网络操作需要使用到WWW和协程(Coroutine),但是,使用协程(Coroutine)对于异步操作而言不是一个好的办法,原因如下:
- 协程不能返回任何值,因为它的返回类型必须是IEnumerator。
- 协程不能够捕捉异常,因为yield return 声明不支持try-catch结构。
Introduction
下面的代码,通过UniRx的实现了双击操作:
var clickStream = Observable.EveryUpdate()
.Where(_ => Input.GetMouseButtonDown(0));
clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMilliseconds(250)))
.Where(xs => xs.Count >= 2)
.Subscribe(xs => Debug.Log("DoubleClick Detected! Count:" + xs.Count));
这个例子演示了以下的特征:
- 游戏循环作为事件流
- 事件流可以组合
- 合并自身事件流
- 基于时间操作处理很简单
Network operations
对异步网络操作使用ObservableWWW ,它的Get/Post 函数返回可预见可观测:
ObservableWWW.Get("http://google.co.jp/") .Subscribe( x => Debug.Log(x.Substring(0, 100)), // onSuccess ex => Debug.LogException(ex)); // onError
Rx可以组合并可以撤销,你也能通过LINQ表达式查询:
// composing asynchronous sequence with LINQ query expressions var query = from google in ObservableWWW.Get("http://google.com/") from bing in ObservableWWW.Get("http://bing.com/") from unknown in ObservableWWW.Get(google + bing) select new { google, bing, unknown }; var cancel = query.Subscribe(x => Debug.Log(x)); // Call Dispose is cancel. cancel.Dispose();
对所有的并行请求使用Observable.WhenAll :
// Observable.WhenAll is for parallel asynchronous operation // (It's like Observable.Zip but specialized for single async operations like Task.WhenAll) var parallel = Observable.WhenAll( ObservableWWW.Get("http://google.com/"), ObservableWWW.Get("http://bing.com/"), ObservableWWW.Get("http://unity3d.com/")); parallel.Subscribe(xs => { Debug.Log(xs[0].Substring(0, 100)); // google Debug.Log(xs[1].Substring(0, 100)); // bing Debug.Log(xs[2].Substring(0, 100)); // unity });
获得进度信息:
// notifier for progress use ScheudledNotifier or new Progress<float>(/* action */) var progressNotifier = new ScheduledNotifier<float>(); progressNotifier.Subscribe(x => Debug.Log(x)); // write www.progress // pass notifier to WWW.Get/Post ObservableWWW.Get("http://google.com/", progress: progressNotifier).Subscribe();
错误处理:
// If WWW has .error, ObservableWWW throws WWWErrorException to onError pipeline. // WWWErrorException has RawErrorMessage, HasResponse, StatusCode, ResponseHeaders ObservableWWW.Get("http://www.google.com/404") .CatchIgnore((WWWErrorException ex) => { Debug.Log(ex.RawErrorMessage); if (ex.HasResponse) { Debug.Log(ex.StatusCode); } foreach (var item in ex.ResponseHeaders) { Debug.Log(item.Key + ":" + item.Value); } }) .Subscribe();
Using with IEnumerators (Coroutines)
IEnumerator (协程)是unity原始的异步工具。UniRx 集成了协程并可观测,你能在协程里写异步代码,并使用UniRx编排,这是控制异步流的最佳办法。
// two coroutines IEnumerator AsyncA() { Debug.Log("a start"); yield return new WaitForSeconds(1); Debug.Log("a end"); } IEnumerator AsyncB() { Debug.Log("b start"); yield return new WaitForEndOfFrame(); Debug.Log("b end"); } // main code // Observable.FromCoroutine converts IEnumerator to Observable<Unit>. // You can also use the shorthand, AsyncA().ToObservable() // after AsyncA completes, run AsyncB as a continuous routine. // UniRx expands SelectMany(IEnumerator) as SelectMany(IEnumerator.ToObservable()) var cancel = Observable.FromCoroutine(AsyncA) .SelectMany(AsyncB) .Subscribe(); // you can stop a coroutine by calling your subscription's Dispose. cancel.Dispose();
假如在Unity5.3,你能使用ToYieldInstruction 对协程观测。
IEnumerator TestNewCustomYieldInstruction() { // wait Rx Observable. yield return Observable.Timer(TimeSpan.FromSeconds(1)).ToYieldInstruction(); // you can change the scheduler(this is ignore Time.scale) yield return Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThreadIgnoreTimeScale).ToYieldInstruction(); // get return value from ObservableYieldInstruction var o = ObservableWWW.Get("http://unity3d.com/").ToYieldInstruction(throwOnError: false); yield return o; if (o.HasError) { Debug.Log(o.Error.ToString()); } if (o.HasResult) { Debug.Log(o.Result); } // other sample(wait until transform.position.y >= 100) yield return this.transform.ObserveEveryValueChanged(x => x.position).FirstOrDefault(p => p.y >= 100).ToYieldInstruction(); }
通常,我们需要协程返回一个值就必须使用回调,Observable.FromCoroutine 能转换协程通过可撤销的IObservable[T] 替代
// public method public static IObservable<string> GetWWW(string url) { // convert coroutine to IObservable return Observable.FromCoroutine<string>((observer, cancellationToken) => GetWWWCore(url, observer, cancellationToken)); } // IObserver is a callback publisher // Note: IObserver's basic scheme is "OnNext* (OnError | Oncompleted)?" static IEnumerator GetWWWCore(string url, IObserver<string> observer, CancellationToken cancellationToken) { var www = new UnityEngine.WWW(url); while (!www.isDone && !cancellationToken.IsCancellationRequested) { yield return null; } if (cancellationToken.IsCancellationRequested) yield break; if (www.error != null) { observer.OnError(new Exception(www.error)); } else { observer.OnNext(www.text); observer.OnCompleted(); // IObserver needs OnCompleted after OnNext! } }
这里有一些更多的例子,接下来是一个多OnNext模式。
public static IObservable<float> ToObservable(this UnityEngine.AsyncOperation asyncOperation) { if (asyncOperation == null) throw new ArgumentNullException("asyncOperation"); return Observable.FromCoroutine<float>((observer, cancellationToken) => RunAsyncOperation(asyncOperation, observer, cancellationToken)); } static IEnumerator RunAsyncOperation(UnityEngine.AsyncOperation asyncOperation, IObserver<float> observer, CancellationToken cancellationToken) { while (!asyncOperation.isDone && !cancellationToken.IsCancellationRequested) { observer.OnNext(asyncOperation.progress); yield return null; } if (!cancellationToken.IsCancellationRequested) { observer.OnNext(asyncOperation.progress); // push 100% observer.OnCompleted(); } } // usecase Application.LoadLevelAsync("testscene") .ToObservable() .Do(x => Debug.Log(x)) // output progress .Last() // last sequence is load completed .Subscribe();
Using for MultiThreading
// Observable.Start is start factory methods on specified scheduler // default is on ThreadPool var heavyMethod = Observable.Start(() => { // heavy method... System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); return 10; }); var heavyMethod2 = Observable.Start(() => { // heavy method... System.Threading.Thread.Sleep(TimeSpan.FromSeconds(3)); return 10; }); // Join and await two other thread values Observable.WhenAll(heavyMethod, heavyMethod2) .ObserveOnMainThread() // return to main thread .Subscribe(xs => { // Unity can't touch GameObject from other thread // but use ObserveOnMainThread, you can touch GameObject naturally. (GameObject.Find("myGuiText")).guiText.text = xs[0] + ":" + xs[1]; });
DefaultScheduler
UniRx默认的基于时间操作使用Scheduler.MainThread作为他们的调度程序,这意味着大多数操作(除了Observable.Start)在单线程工作,所以ObserverOn 不需要并且线程安全措施可以忽略。这对于标准的RxNet实施并不相同,但是更合适Unity环境。
Scheduler.MainThread运行受时间缩放的影响,如果你想忽略时间缩放,使用Scheduler.MainThreadIgnoreTimeScale代替。
MonoBehaviour triggers
UniRx通过UniRx.Triggers能处理MonoBehavirour事件:
using UniRx; using UniRx.Triggers; // need UniRx.Triggers namespace public class MyComponent : MonoBehaviour { void Start() { // Get the plain object var cube = GameObject.CreatePrimitive(PrimitiveType.Cube); // Add ObservableXxxTrigger for handle MonoBehaviour's event as Observable cube.AddComponent<ObservableUpdateTrigger>() .UpdateAsObservable() .SampleFrame(30) .Subscribe(x => Debug.Log("cube"), () => Debug.Log("destroy")); // destroy after 3 second:) GameObject.Destroy(cube, 3f); } }
支持的触发器有:ObservableAnimatorTrigger, ObservableCollision2DTrigger, ObservableCollisionTrigger, ObservableDestroyTrigger, ObservableEnableTrigger, ObservableFixedUpdateTrigger, ObservableUpdateTrigger, ObservableLastUpdateTrigger, ObservableMouseTrigger, ObservableTrigger2DTrigger, ObservableTriggerTrigger, ObservableVisibleTrigger, ObservableTransformChangedTrigger, ObservableRectTransformTrigger, ObservableCanvasGroupChangedTrigger, ObservableStateMachineTrigger, ObservableEventTrigger.
这些通过在组件/游戏物体上扩展方法也可以跟容易的处理。这些方法自动注入ObservableTrigger 除了ObservableEventTrigger 和ObservableStateMachineTrigger
using UniRx; using UniRx.Triggers; // need UniRx.Triggers namespace for extend gameObejct public class DragAndDropOnce : MonoBehaviour { void Start() { // All events can subscribe by ***AsObservable this.OnMouseDownAsObservable() .SelectMany(_ => this.UpdateAsObservable()) .TakeUntil(this.OnMouseUpAsObservable()) .Select(_ => Input.mousePosition) .Subscribe(x => Debug.Log(x)); } }
Creating custom triggers
处理Unity事件转到Observable是最好的方法,如果UniRx提供的标准触发器还不够,你能创建自定义触发器,为了证明,这里有一个uGUI长按触发器:
public class ObservableLongPointerDownTrigger : ObservableTriggerBase, IPointerDownHandler, IPointerUpHandler { public float IntervalSecond = 1f; Subject<Unit> onLongPointerDown; float? raiseTime; void Update() { if (raiseTime != null && raiseTime <= Time.realtimeSinceStartup) { if (onLongPointerDown != null) onLongPointerDown.OnNext(Unit.Default); raiseTime = null; } } void IPointerDownHandler.OnPointerDown(PointerEventData eventData) { raiseTime = Time.realtimeSinceStartup + IntervalSecond; } void IPointerUpHandler.OnPointerUp(PointerEventData eventData) { raiseTime = null; } public IObservable<Unit> OnLongPointerDownAsObservable() { return onLongPointerDown ?? (onLongPointerDown = new Subject<Unit>()); } protected override void RaiseOnCompletedOnDestroy() { if (onLongPointerDown != null) { onLongPointerDown.OnCompleted(); } } }
它能像标准触发器一样容易使用
var trigger = button.AddComponent<ObservableLongPointerDownTrigger>(); trigger.OnLongPointerDownAsObservable().Subscribe();
Observable Lifecycle Management
什么时候呼叫OnCompleted ?当使用UniRx时考虑订阅的生命周期管理是非常重要的。ObservableTriggers呼叫OnCompleted时,附加的游戏对象被破坏,其它静态产生方法(Observable.Timer, Observable.EveryUpdate, 等等)不会自动停止,并且它们的订阅应该手动管理。
Rx提供一些辅助方法,比如IDisposable.AddTo允许你一次销毁若干订阅。
// CompositeDisposable is similar with List<IDisposable>, manage multiple IDisposable CompositeDisposable disposables = new CompositeDisposable(); // field void Start() { Observable.EveryUpdate().Subscribe(x => Debug.Log(x)).AddTo(disposables); } void OnTriggerEnter(Collider other) { // .Clear() => Dispose is called for all inner disposables, and the list is cleared. // .Dispose() => Dispose is called for all inner disposables, and Dispose is called immediately after additional Adds. disposables.Clear(); }
如果你想在游戏对象销毁时自动销毁,使用AddTo(游戏对象/组件)
void Start() { Observable.IntervalFrame(30).Subscribe(x => Debug.Log(x)).AddTo(this); }
AddTo促进自动销毁,如果你需要在管线里特殊处理OnCompleted,使用TakeWhile, TakeUntil, TakeUntilDestroy 和TakeUntilDisable 替代:
Observable.IntervalFrame(30).TakeUntilDisable(this) .Subscribe(x => Debug.Log(x), () => Debug.Log("completed!"));
如果你处理事件,Repeat是一个重要但是危险的方法,可能会导致一个无限循环,所以小心处理:
using UniRx; using UniRx.Triggers; public class DangerousDragAndDrop : MonoBehaviour { void Start() { this.gameObject.OnMouseDownAsObservable() .SelectMany(_ => this.gameObject.UpdateAsObservable()) .TakeUntil(this.gameObject.OnMouseUpAsObservable()) .Select(_ => Input.mousePosition) .Repeat() // dangerous!!! Repeat cause infinite repeat subscribe at GameObject was destroyed.(If in UnityEditor, Editor is freezed) .Subscribe(x => Debug.Log(x)); } }
UniRx提供额外的安全重复方法,RepeatSafe:如果连续的“OnComplete”触发重复停止。RepeatUntilDestroy(gameObject/component), RepeatUntilDisable(gameObject/component)当目标游戏对象被销毁时允许停止。
this.gameObject.OnMouseDownAsObservable() .SelectMany(_ => this.gameObject.UpdateAsObservable()) .TakeUntil(this.gameObject.OnMouseUpAsObservable()) .Select(_ => Input.mousePosition) .RepeatUntilDestroy(this) // safety way .Subscribe(x => Debug.Log(x));
如果订阅在订阅中,不分离事件的话。UniRx保证观察热点有持久的异常,这是怎么一回事?
button.OnClickAsObservable().Subscribe(_ => { // If throws error in inner subscribe, but doesn't detached OnClick event. ObservableWWW.Get("htttp://error/").Subscribe(x => { Debug.Log(x); }); });
这种行为有时是有用的,如用户事件处理。
所有类的实例提供了一个observeeveryvaluechanged方法,观察每一帧值的改变。
// watch position change this.transform.ObserveEveryValueChanged(x => x.position).Subscribe(x => Debug.Log(x));
这是非常有用的。如果观察的目标是一个游戏对象,当目标被摧毁它将停止观察,呼叫onCompleted。如果观察的目标是一个普通的C #对象,OnCompleted被垃圾回收。
Converting Unity callbacks to IObservables
对于异步操作使用主题(或异步主题)
public class LogCallback { public string Condition; public string StackTrace; public UnityEngine.LogType LogType; } public static class LogHelper { static Subject<LogCallback> subject; public static IObservable<LogCallback> LogCallbackAsObservable() { if (subject == null) { subject = new Subject<LogCallback>(); // Publish to Subject in callback UnityEngine.Application.RegisterLogCallback((condition, stackTrace, type) => { subject.OnNext(new LogCallback { Condition = condition, StackTrace = stackTrace, LogType = type }); }); } return subject.AsObservable(); } } // method is separatable and composable LogHelper.LogCallbackAsObservable() .Where(x => x.LogType == LogType.Warning) .Subscribe(); LogHelper.LogCallbackAsObservable() .Where(x => x.LogType == LogType.Error) .Subscribe();
在Unity5,Application.RegisterLogCallback 移除 in favor of Application.logMessageReceived,所以我们简单的使用 Observable.FromEvent.
public static IObservable<LogCallback> LogCallbackAsObservable() { return Observable.FromEvent<Application.LogCallback, LogCallback>( h => (condition, stackTrace, type) => h(new LogCallback { Condition = condition, StackTrace = stackTrace, LogType = type }), h => Application.logMessageReceived += h, h => Application.logMessageReceived -= h); }
Stream Logger
// using UniRx.Diagnostics; // logger is threadsafe, define per class with name. static readonly Logger logger = new Logger("Sample11"); // call once at applicationinit public static void ApplicationInitialize() { // Log as Stream, UniRx.Diagnostics.ObservableLogger.Listener is IObservable<LogEntry> // You can subscribe and output to any place. ObservableLogger.Listener.LogToUnityDebug(); // for example, filter only Exception and upload to web. // (make custom sink(IObserver<EventEntry>) is better to use) ObservableLogger.Listener .Where(x => x.LogType == LogType.Exception) .Subscribe(x => { // ObservableWWW.Post("", null).Subscribe(); }); } // Debug is write only DebugBuild. logger.Debug("Debug Message"); // or other logging methods logger.Log("Message"); logger.Exception(new Exception("test exception"));
Unity-specific Extra Gems
// Unity's singleton UiThread Queue Scheduler Scheduler.MainThreadScheduler ObserveOnMainThread()/SubscribeOnMainThread() // Global StartCoroutine runner MainThreadDispatcher.StartCoroutine(enumerator) // convert Coroutine to IObservable Observable.FromCoroutine((observer, token) => enumerator(observer, token)); // convert IObservable to Coroutine yield return Observable.Range(1, 10).ToYieldInstruction(); // after Unity 5.3, before can use StartAsCoroutine() // Lifetime hooks Observable.EveryApplicationPause(); Observable.EveryApplicationFocus(); Observable.OnceApplicationQuit();
Framecount-based time operators
UniRx提供一些基于帧数的时间操作:
Method |
---|
EveryUpdate |
EveryFixedUpdate |
EveryEndOfFrame |
EveryGameObjectUpdate |
EveryLateUpdate |
ObserveOnMainThread |
NextFrame |
IntervalFrame |
TimerFrame |
DelayFrame |
SampleFrame |
ThrottleFrame |
ThrottleFirstFrame |
TimeoutFrame |
DelayFrameSubscription |
例如:延迟调用一次:
Observable.TimerFrame(100).Subscribe(_ => Debug.Log("after 100 frame"));
每个方法执行的顺序是
EveryGameObjectUpdate(in MainThreadDispatcher's Execution Order) ->
EveryUpdate ->
EveryLateUpdate ->
EveryEndOfFrame
如果Caller在MainThreadDispatcher.Update前被呼叫,EveryGameObjectUpdate 调用同一帧(我建议MainThreadDispatcher 第一个被呼叫(ScriptExecutionOrder makes -32000)
EveryLateUpdate, EveryEndOfFrame 调用同一帧。
EveryUpdate, 调用下一帧。.
MicroCoroutine
Microcoroutine内存高效并快速协程的worker,这个实现基于Unity 10000次轮询。避免管理-非管理开销,所以快10倍。基于帧时间操作和ObserveEveryValueChanged时MicroCoroutine 自动使用。
如果你想使用MicroCoroutine 代替标准的unity协程,使用MainThreadDispatcher.StartUpdateMicroCoroutine 或 Observable.FromMicroCoroutine.
int counter; IEnumerator Worker() { while(true) { counter++; yield return null; } } void Start() { for(var i = 0; i < 10000; i++) { // fast, memory efficient MainThreadDispatcher.StartUpdateMicroCoroutine(Worker()); // slow... // StartCoroutine(Worker()); } }
MicroCoroutine的限制,仅支持yield return null并且更新时间在开始方法里被确定(StartUpdateMicroCoroutine, StartFixedUpdateMicroCoroutine, StartEndOfFrameMicroCoroutine).如果你结合其他的IObservable,你能像结束一样检测完成属性。
IEnumerator MicroCoroutineWithToYieldInstruction() { var www = ObservableWWW.Get("http://aaa").ToYieldInstruction(); while (!(www.HasResult || www.IsCanceled || www.HasError)) { yield return null; } if (www.HasResult) { UnityEngine.Debug.Log(www.Result); } }
UniRx 还是不敢引入项目里面,很多同事不适应。开头那个差点没看懂
最近由 views63 修改于:2016-11-05 15:34:21写成这样会简洁一点
Func iter = x => y => { foreach (int i in y) x(i); };
评论里Func 那些会被吞掉?
@views63:初学中,还没有碰到吞掉的情况,如果是公司的项目,稳定肯定是第一位的。不过自己确实很喜欢链式的写法,干净整洁,可能因为不是程序出生的缘故,特别怕复杂。
@kuaile:评论里是被吞掉了 我那边写着是你文章开头那个函数签名,现在看到的只有 func,Action> 了。我们项目里面都规定不能用 var ,扩展方法等都限制使用。基本上停留在C# 2.0 的语法
@views63:这个练习恰恰相反,整个游戏建立在一条系统链上,系统链上的每一个系统都需要单一职责,短小精炼,其外的代码全以扩展的方式给出。整个框架也没有两个类,看起很多类,但每个都是局部类,还是很颠覆的——类不要过大。
自己又天性好奇,总想尝试新的东西,探索的乐趣就在其中。
果然会吞掉Func iter = x => y => { foreach (int i in y) x(i); }; 确实简洁很多。
最近由 kuaile 修改于:2016-11-06 07:57:12@kuaile:是说评论里面的代码被吞掉吧?记得是为了安全考虑
@eastecho:是的,评论会吞掉。