一千萬個為什麽

搜索

IObservable <double> .Average應該如何工作?


更新

看起來像Jon Skeet是對的(大驚喜!),問題在於我假設 Average 擴展提供連續平均值(它沒有)。

對於我所追求的行為,我編寫了一個簡單的 ContinuousAverage 擴展方法,我在這裏實現的方法是為了其他可能需要類似內容的人的利益:

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

       //undecided whether this method needs to be made thread-safe or not
       //seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta/(double)(++_count));
            return _mean;
        }
    }

    public static IObservable ContinousAverage(this IObservable source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

我正在考慮繼續為其他明顯的候選人做類似的事情 - 所以, ContinuousCountContinuousSumContinuousMinContinuousMax ...也許是 ContinuousVarianceContinuousStandardDeviation ?有什麽想法嗎?


原始問題

我在這裏和那裏稍微使用 Rx Extensions ,感受到我已經有了基本的想法。

現在這裏有些奇怪的事情:我的印象是如果我寫這個:

var ticks = Observable.FromEvent(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

I would get two IObservable objects (bids and avgOfBids); one would basically be a stream of all the market bids from my MarketDataProvider, the other would be a stream of the average of these bids.

所以這樣的事情:

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

It seems that my avgOfBids object isn't doing anything. What am I missing? I think I've probably misunderstood what Average is actually supposed to do. (This also seems to be the case for all of the aggregate-like extension methods on IObservable -- e.g., Max, Count, etc.)

最佳答案

另一種做ContinousAverage的方法是使用.Scan():

bids.Scan(new { sum = .0, count = 0 }, 
          (agg, x) => new { sum = agg.sum + x, count = agg.count + 1 })
    .Select(agg => agg.sum/agg.count)

轉載註明原文: IObservable <double> .Average應該如何工作?

猜你喜歡