Alkalmazásfejlesztés során, fÅ‘leg több szál esetén egy tipikus probléma, amivel találkozunk, az a termelÅ‘-fogyasztó (producer-consumer). Ennek a lényege, hogy egy szálon adatok keletkeznek, amit fel kell dolgozni egy másik szálon. A kihÃvás itt az, hogy a termelÅ‘ sokkal gyorsabban állÃthat elÅ‘ adatokat, mint ahogy azt a feldolgozó szál fel tudná dolgozni. Ezért, hogy együtt tudjanak dolgozni, szükséges egy közös erÅ‘forrás, egy buffer, ami tipikusan egy sor, queue szokott lenni. A termelÅ‘ ebbe Ãr, majd a fogyasztó ebbÅ‘l feldolgozza az adatokat.
Ez a megoldás további kihÃvásokat kreál. Például le kell kezelnünk a túlcsordulás esetét, illetve azt az esetet is, hogy ha a buffer kiürül, valamint még egy értesÃtés is kellene a két szál között, hogy mondjuk a fogyasztó ténylegesen csak akkor fogyasszon, ha van adat a queue-ban.
Ezen problémát a fejezetben tárgyalt megoldásokkal össze tudjuk állÃtani, azonban a .NET kÃnál erre a problémára is egy hatékony megoldást a System.Threading.Channels formájában, ami a .NET Core 2.1 óta része a .NET-nek, illetve korábbi Framework esetén is használható NuGet csomag formában.
A Channel lényegében felfogható egy olyan queue implementációnak, amibe egyszerre több szál Ãrhat és egyszerre több szál olvashatja, valamint értesÃtést is küld, ha van adat a csatornán. Nézzünk egy példát a használatára:
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ChannelsPelda
{
public sealed class ChannelDto
{
public string Message { get; }
public ChannelDto(string message)
{
Message = message;
}
}
public class ChannelProducer
{
private readonly Channel<ChannelDto> _channel;
public ChannelProducer(Channel<ChannelDto> channel)
{
_channel = channel;
}
public async Task Run(CancellationToken cancellationToken)
{
for (int i = 0; i < 100; i++)
{
// adat létrehozása
var data = new ChannelDto($"{Random.Shared.Next()}");
await _channel.Writer.WriteAsync(data, cancellationToken);
}
// jelzés a fogyasztónak, hogy nem lesz több adat
_channel.Writer.Complete();
}
}
public class ChannelConsumer
{
private readonly Channel<ChannelDto> _channel;
public ChannelConsumer(Channel<ChannelDto> channel)
{
_channel = channel;
}
public async Task Run(CancellationToken cancellationToken)
{
// várakozás amÃg nem érkezik adat a csatornára
while (await _channel.Reader.WaitToReadAsync(cancellationToken))
{
// adat kiolvasása
ChannelDto data = await _channel.Reader.ReadAsync(cancellationToken);
// feldolgozás szimulálása
await Task.Delay(1000, cancellationToken);
Console.WriteLine($"message: {data.Message}");
}
}
}
internal class Program
{
private static void Main(string[] args)
{
// Channel<ChannelDto> channel = Channel.CreateBounded<ChannelDto>(10);
Channel<ChannelDto> channel = Channel.CreateUnbounded<ChannelDto>();
// Channel<ChannelDto> channel = Channel.CreateUnboundedPrioritized<ChannelDto>();
var producer = new ChannelProducer(channel);
var consumer = new ChannelConsumer(channel);
using var cts = new CancellationTokenSource();
Task producerTask = producer.Run(cts.Token);
Task consumerTask = consumer.Run(cts.Token);
Console.ReadKey();
cts.Cancel(); // jelzés a producernek és a consumernek, hogy álljanak le
}
}
}
A kódban a ChannelDto az adat, amit megosztunk a fogyasztó és a termelÅ‘ között. A ChannelProducer az adatok előállÃtásáért felelÅ‘s, mÃg a ChannelConsumer azok fogyasztásáért. Mindkét osztály függÅ‘sége egy Channel. A csatorna rendelkezik egy ChannelReader<T> tÃpusú Reader tulajdonsággal, ami az olvasásért felelÅ‘s és egy ChannelWriter<T> tÃpusú Writer tulajdonsággal, ami pedig az Ãrásért.
A ChannelReader<T> osztály WaitToReadAsync metódusával várakozhatunk aszinkron módon az adatok megérkezésére. A csatornát, amit a consumer és producer használ a Channel osztály CreateUnbounded hÃvásával hozzuk létre. Ez egy olyan csatornát hoz létre, aminek nincs méretkorlátja és annyi üzenetet tud tárolni, amennyi memória van a gépünkben. Ha korlátozni szeretnénk a csatornán egyszerre jelenlévÅ‘ üzenetek számát, akkor a CreateBounded metódusával tudunk egy megadott elemszámút létrehozni.
Ha az üzeneteket priorizálni szeretnénk, akkor a CreateUnboundedPrioritized metódussal tudunk csatornát létrehozni. Ha priorizálni szeretnénk, akkor az üzeneteknek implementálniuk kell az IComparable<T> interfészt, vagy a CreateUnboundedPrioritized hÃvásnak átadható UnboundedPrioritizedChannelOptions-ben át kell adnunk egy IComparer<T> implementációt.