Yazılım Geliştirme: Koroya dayalı tüketici üreticisinin iş akışı | Haberler Online

Adanali

Active member
Yazılım Geliştirme: Koroya dayalı tüketici üreticisinin iş akışı | Haberler Online


  1. Yazılım Geliştirme: Tüketici Üreticisinin Koirutin'e Dayalı İş Akışı

Koro, eşzamansız kod yazmak için sezgisel ve yapılandırılmış bir yol sunar. Asenkron prosedürel stil operasyonları yazmanıza izin verirler. Asenkron programlamayı basitleştirmek için C ++ 20'ye tanıtılan bir işlevdir.










Rainer Grimm yıllardır yazılım mimarı, ekip ve eğitim müdürü olarak çalıştı. C ++ programlama dilleri, Python ve Haskell hakkında makaleler yazmayı seviyor, ancak uzman konferanslarla konuşmayı da seviyor. Modern C ++ blogunda, C ++ tutkusuyla yoğun bir şekilde ilgileniyor.







Bu tek prodüktör-ko-evi iş akışının anlaşılması kolay olmasa bile, choroutine deneyleri için iyi bir başlangıç noktasıdır.









Gibi mevcut mekanizmalar std::async, std::packaged_task veya olaylar (std::condition_variable & std::mutex) Bir iletişim kanalı oluşturarak etkinliğin sonucunda iki veya daha fazla iş parçacığını senkronize edin. Bu iletişim kanalının iki ucu var:

  • std::promiseOrtak durumun sonucu veya istisnasını kim yazar
  • std::future (((std::shared_future) – Etkinliğin (veya istisnanın) sonucunu bekleyen bir alıcı son.
Bu mevcut mekanizmanın aksine, koro, işletim sistemini senkronize etmek için doğrudan ipliklere veya diğer ilkellere bağlı değildir. Aksine, koroutin kontrol nesnesine ve etraflarında inşa edilen koşullarda makinenin mantığına dayanan saf bir yazılım soyutlamasıdır.

Koro istiflemeden – bu, vergi nesnesinin kazıkta oluşturulması gerektiği anlamına gelir. Tesadüfen, bibliyotik bir zarftır promise_type (((std::coroutine_handle<promise_type>), ama gerçekte hiçbir şey std::promise ortak noktası var.



. promise_type Bir koroutinin durum mekanizmasındaki önceden tanımlanmış geçiş durumlarını tanımlayan bir arayüzdür (bir adaptasyon noktası).

Koro çok yönlüdür ve eşzamansız bir haber akışını yönetmek için gereken farklı senaryolarda kullanılabilir. Yaygın bir örnek, sokete dayalı iletişimdir.

Bugün koroyu başka bir örnek kullanarak açıklamaya çalışacağım: tek üretici için tüketici iş akışı.

uygulama


Önce koro için sonuç türünü tanımlamalıyız:



class[[nodiscard]] AudioDataResult final&#13;
{&#13;
public:&#13;
class promise_type;&#13;
using handle_type = std::coroutine_handle<promise_type>;&#13;
&#13;
class promise_type&#13;
{&#13;
...&#13;
};&#13;
};


Bu içeride bir sargı: promise_type Tip. Çevredeki sınıfı öznitelikle dekore ediyoruz [[nodiscard]]Sonuç türü koronun kontrolü nesnesi olduğundan: askıya alma/kurtarmasını yönetmek için istemci koduna geri dönelim.

@Ankunkt Sınıfın yok edicisi, kaynakları (dinamik bellek) RAII yolunda düzenler, böylece choroutine koşullarını yönetmek için gerekli değilse, getiri türü titizlikle göz ardı edilebilir.



~AudioDataResult() { if(handle_) { handle_.destroy(); } }


Sonuç türü, kontrol nesnesinin çoğaltılmasını önlemek için yer değiştirme -solo: kopya işlemleri yasaktır.



// Make the result type move-only, &#13;
//due to exclusive ownership over the handle&#13;
&#13;
AudioDataResult(const AudioDataResult& ) = delete;&#13;
AudioDataResult& operator= (constAudioDataResult& ) = delete;&#13;
&#13;
AudioDataResult(AudioDataResult&& other) noexcept:&#13;
handle_(std::exchange(other.handle_, nullptr))&#13;
{}&#13;
&#13;
AudioDataResult& operator = (AudioDataResult&& other) noexcept&#13;
{&#13;
using namespace std;&#13;
AudioDataResult tmp =std::move(other);&#13;
swap(*this, tmp);&#13;
return *this;&#13;
}


Şimdi Promme_Type arayüzünün kendisini tanımlıyoruz:



// Predefined interface that has to be specify &#13;
//in order to implement&#13;
// coroutine's state-machine transitions&#13;
class promise_type&#13;
{&#13;
&#13;
public:&#13;
using value_type = std::vector<int>;&#13;
AudioDataResult get_return_object()&#13;
{&#13;
return AudioDataResult{ handle_type::from_promise(*this) };&#13;
}&#13;
std::suspend_never initial_suspend() noexcept { return{}; }&#13;
std::suspend_always final_suspend() noexcept { return{}; }&#13;
&#13;
void return_void() {}&#13;
void unhandled_exception()&#13;
{&#13;
std::rethrow_exception(std::current_exception());&#13;
}&#13;
&#13;
// Generates the value and suspend the "producer"&#13;
template <typename Data>&#13;
requires std::convertible_to<std::decay_t<Data>, value_type>&#13;
std::suspend_always yield_value(Data&& value)&#13;
{&#13;
data_ = std::forward<Data>(value);&#13;
data_ready_.store(true, std::memory_order::relaxed);&#13;
return {};&#13;
}&#13;
&#13;
private:&#13;
value_type data_;&#13;
std::atomic<bool> data_ready_;&#13;
};//promise_type interface


. promise_type Koroutinin gerekli altyapısını tanımlar. Ayrıca, promise_type Bir jeneratör olarak hareket etmek isteyen tüm koro için – “üretici” – değerleri vermek yield_value-Modi genişletildi (co_yield ≡ co_await promise_.yield_value). Veriler tüketilirse, karşılık gelen SRAPPE yöntemine sahip olmalıyız resume() Üreticiye devam etmeyi sağlayın.



void resume() { if( not handle_.done()) { handle_.resume();} }


Şimdi koroyu tüketici gereksinimlerini karşılayacak şekilde genişletmeliyiz: verilerin hazır olmasıyla senkronize edilmelidir. Başka bir deyişle, veriler üretici tarafından mevcut olduğu gibi rapor edilene kadar tüketici kesintiye uğrar. Bunu yapmak için arayüzlemeliyiz Awaiter alet:



class promise_type&#13;
{&#13;
// Awaiter interface: for consumer waiting on data being ready&#13;
struct AudioDataAwaiter&#13;
{&#13;
&#13;
explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}&#13;
&#13;
bool await_ready() const&#13;
{&#13;
return promise_.data_ready_.load(std::memory_order::relaxed);&#13;
}&#13;
&#13;
void await_suspend(handle_type) const&#13;
{&#13;
while( not promise_.data_ready_.exchange(false))&#13;
{&#13;
std::this_thread::yield();&#13;
}&#13;
}&#13;
&#13;
// move assignment at client invocation side:&#13;
// const auto data = co_await audioDataResult;&#13;
// This requires that coroutine's result type provides&#13;
// the co_await unary operator&#13;
&#13;
value_type&& await_resume() const&#13;
{&#13;
return std::move(promise_.data_);&#13;
}&#13;
&#13;
private:&#13;
promise_type& promise_;&#13;
&#13;
};//Awaiter interface&#13;
&#13;
};//promise_type


Eyalet makinesinde await_ready() İlk geçiş durumu: Verilerin iradesi için doğrulanmıştır. Veriler hazır değilse, bir sonraki irade await_suspend() isminde. Burada aslında ilgili bayrağın ayarlanmamasını bekliyoruz. Sonunda olur await_resume() Adı: “Hareket ettiriyoruz” değerini promise_typeKoşulsuz bir referans rvalue tarzında dönüştürmek. Müşteri çağrısının yanında, bu, döndürülen değer için atama operatörünün – data – denir.



const auto data = co_await audioDataResult;&#13;



Bunun çalışması için, sonuç türü bir kenarın operatörü olmalıdır. co_await Bizimkini sağlayın Awaiter-arayüz.



class AudioDataResult&#13;
{&#13;
auto operator co_await() noexcept&#13;
{&#13;
return promise_type::AudioDataAwaiter{handle_.promise()};&#13;
}&#13;
};


: https://godbolt.org/z/mvyfbep8r

Aşağıdaki program producerConsumer.cpp Örnek 1'in basitleştirilmiş bir versiyonunu gösterir:



// producerConsumer.cpp&#13;
&#13;
#include <algorithm>&#13;
#include <atomic>&#13;
#include <chrono>&#13;
#include <coroutine>&#13;
#include <functional>&#13;
#include <iostream>&#13;
#include <iterator>&#13;
#include <memory>&#13;
#include <source_location>&#13;
#include <thread>&#13;
#include <utility>&#13;
#include <vector>&#13;
&#13;
&#13;
void funcName(const std::source_location location = std::source_location::current()) {&#13;
std::cout << location.function_name() << 'n';&#13;
}&#13;
&#13;
&#13;
template <typename Container>&#13;
void printContainer(const Container& container)&#13;
{&#13;
typedef typename Container::value_type value_type;&#13;
auto first = std::cbegin(container);&#13;
auto last = std::cend(container);&#13;
&#13;
std::cout << " [";&#13;
std::copy(first, std::prev(last), std::eek:stream_iterator<value_type>(std::cout, ", "));&#13;
std::cout << *std::prev(last) << "]n";&#13;
}&#13;
&#13;
&#13;
&#13;
&#13;
class [[nodiscard]] AudioDataResult final&#13;
{&#13;
public:&#13;
class promise_type;&#13;
using handle_type = std::coroutine_handle<promise_type>;&#13;
&#13;
// Predefined interface that has to be specify in order to implement&#13;
// coroutine's state-machine transitions&#13;
class promise_type &#13;
{&#13;
&#13;
public:&#13;
&#13;
using value_type = std::vector<int>;&#13;
&#13;
AudioDataResult get_return_object() &#13;
{&#13;
return AudioDataResult{handle_type::from_promise(*this)};&#13;
}&#13;
std::suspend_never initial_suspend() noexcept { return {}; }&#13;
std::suspend_always final_suspend() noexcept { return {}; }&#13;
void return_void() {}&#13;
void unhandled_exception() &#13;
{&#13;
std::rethrow_exception(std::current_exception());&#13;
}&#13;
&#13;
// Generates the value and suspend the "producer"&#13;
template <typename Data>&#13;
requires std::convertible_to<std::decay_t<Data>, value_type>&#13;
std::suspend_always yield_value(Data&& value) &#13;
{&#13;
data_ = std::forward<Data>(value);&#13;
data_ready_.store(true);&#13;
return {};&#13;
}&#13;
&#13;
// Awaiter interface: for consumer waiting on data being ready&#13;
struct AudioDataAwaiter &#13;
{&#13;
explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}&#13;
&#13;
bool await_ready() const { return promise_.data_ready_.load();}&#13;
&#13;
void await_suspend(handle_type) const&#13;
{&#13;
while(not promise_.data_ready_.exchange(false)) {&#13;
std::this_thread::yield(); &#13;
}&#13;
}&#13;
// move assignment at client invocation side: const auto data = co_await audioDataResult;&#13;
// This requires that coroutine's result type provides the co_await unary operator&#13;
value_type&& await_resume() const &#13;
{&#13;
return std::move(promise_.data_);&#13;
}&#13;
&#13;
private: &#13;
promise_type& promise_;&#13;
};//Awaiter interface&#13;
&#13;
&#13;
private:&#13;
value_type data_;&#13;
std::atomic<bool> data_ready_;&#13;
}; //promise_type interface&#13;
&#13;
&#13;
auto operator co_await() noexcept &#13;
{&#13;
return promise_type::AudioDataAwaiter{handle_.promise()};&#13;
}&#13;
&#13;
// Make the result type move-only, due to ownership over the handle&#13;
AudioDataResult(const AudioDataResult&) = delete;&#13;
AudioDataResult& operator=(const AudioDataResult&) = delete;&#13;
&#13;
AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}&#13;
AudioDataResult& operator=(AudioDataResult&& other) noexcept &#13;
{&#13;
using namespace std;&#13;
AudioDataResult tmp = std::move(other);&#13;
swap(*this, tmp);&#13;
return *this;&#13;
}&#13;
&#13;
// d-tor: RAII&#13;
~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}&#13;
&#13;
// For resuming the producer - at the point when the data are consumed&#13;
void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}&#13;
&#13;
private:&#13;
AudioDataResult(handle_type handle) noexcept : handle_(handle) {}&#13;
&#13;
private:&#13;
handle_type handle_;&#13;
};&#13;
&#13;
&#13;
using data_type = std::vector<int>;&#13;
AudioDataResult producer(const data_type& data) &#13;
{&#13;
for (std::size_t i = 0; i < 5; ++i) {&#13;
funcName();&#13;
co_yield data;&#13;
}&#13;
co_yield data_type{}; // exit criteria&#13;
}&#13;
&#13;
AudioDataResult consumer(AudioDataResult& audioDataResult) &#13;
{&#13;
while(true)&#13;
{&#13;
funcName();&#13;
const auto data = co_await audioDataResult;&#13;
if (data.empty()) {std::cout << "No data - exit!n"; break;}&#13;
std::cout << "Data received:";&#13;
printContainer(data);&#13;
audioDataResult.resume(); // resume producer&#13;
}&#13;
}&#13;
&#13;
int main() &#13;
{&#13;
{&#13;
const data_type data = {1, 2, 3, 4};&#13;
auto audioDataProducer = producer(data);&#13;
std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});&#13;
t.join();&#13;
}&#13;
&#13;
std::cout << "bye-bye!n";&#13;
}


Son olarak, program programın baskısıdır:









Diğer seçenek de kullanımı promise_type::await_transform()İçindeki değeri bekle promise_type-İnsman üretici tarafından kullanıldığı için arşivlenmiştir.



class promise_type&#13;
{&#13;
auto await_transform(handle_type other)&#13;
{&#13;
// Awaiter interface: remained the same&#13;
struct AudioDataAwaiter&#13;
{&#13;
explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}&#13;
...&#13;
};&#13;
&#13;
return AudioDataAwaiter{other.promise()};&#13;
}&#13;
};


Bu şekilde artık bir kenarın operatörü olmak zorunda değiliz co_await Sonuç türünü belirtin, ancak bir dönüştürme operatörü (açık),



explicit operator handle_type() const {return handle_;}


Böylece tüketicinin bulunduğu noktaya teslim edebiliriz co_await Aramada dahili olarak arayın await_transform() Tercüme edilmiştir.



const auto data = co_await static_cast<AudioDataResult::handle_type>(audioDataResult);&#13;



Bunu şu şekilde açıklayabiliriz: me.handle_.promise().await_transform(other.handle_)

: https://godbolt.org/z/57zsk9ren

çözüm


Bu basit örnekte, üretici cezalandırılmadan durdurulur, çünkü iyileşmeden sonra tam olarak aynı – önceden bilinen – sunar. Muhtemelen gerçek bir senaryoda durum böyle değildir: üreticinin kendisi muhtemelen bir tür arabulucu olacaktır – tüketiciye iade edilecek asenkron verilerin alıcısı. Bu nedenle, veriler tutuklandığında veri kayıplarından kaçınmak ve tüketicinin devam etmesini beklemek, üreticinin varış oranı ile tüketicinin tüketim oranı arasındaki farkları telafi etmek için ürün üzerinde bir kuyruk mantığı uygulanmalıdır.

Sırada ne var?


C ++ 20'de tre-vie tanımlanabilir veya default Rica etmek. Bu, altı karşılaştırma operatörünün de mevcut olduğu anlamına gelir: ==, !=, <, <=, > VE >=. Eşitlik operatörünü de kullanabilirsiniz (==) Tanımlayın veya default Rica etmek.

Kısa Noel Tatmanım


Blogum küçük bir Noel tatili yapıyor. Bir sonraki makaleyi 8 Ocak'ta yayınlayacağım. Tüm okuyuculara iyi vakit geçiriyorum.


(RME)
 
Üst