1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
19#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
20
21#include <atomic>
22#include <functional>
23#include <type_traits>
24
25#include <grpcpp/impl/codegen/call.h>
26#include <grpcpp/impl/codegen/call_op_set.h>
27#include <grpcpp/impl/codegen/callback_common.h>
28#include <grpcpp/impl/codegen/config.h>
29#include <grpcpp/impl/codegen/core_codegen_interface.h>
30#include <grpcpp/impl/codegen/message_allocator.h>
31#include <grpcpp/impl/codegen/status.h>
32
33namespace grpc_impl {
34
35// Declare base class of all reactors as internal
36namespace internal {
37
38// Forward declarations
39template <class Request, class Response>
40class CallbackUnaryHandler;
41template <class Request, class Response>
42class CallbackClientStreamingHandler;
43template <class Request, class Response>
44class CallbackServerStreamingHandler;
45template <class Request, class Response>
46class CallbackBidiHandler;
47
48class ServerReactor {
49 public:
50 virtual ~ServerReactor() = default;
51 virtual void OnDone() = 0;
52 virtual void OnCancel() = 0;
53
54 // The following is not API. It is for internal use only and specifies whether
55 // all reactions of this Reactor can be run without an extra executor
56 // scheduling. This should only be used for internally-defined reactors with
57 // trivial reactions.
58 virtual bool InternalInlineable() { return false; }
59
60 private:
61 template <class Request, class Response>
62 friend class CallbackUnaryHandler;
63 template <class Request, class Response>
64 friend class CallbackClientStreamingHandler;
65 template <class Request, class Response>
66 friend class CallbackServerStreamingHandler;
67 template <class Request, class Response>
68 friend class CallbackBidiHandler;
69};
70
71/// The base class of ServerCallbackUnary etc.
72class ServerCallbackCall {
73 public:
74 virtual ~ServerCallbackCall() {}
75
76 // This object is responsible for tracking when it is safe to call OnDone and
77 // OnCancel. OnDone should not be called until the method handler is complete,
78 // Finish has been called, the ServerContext CompletionOp (which tracks
79 // cancellation or successful completion) has completed, and all outstanding
80 // Read/Write actions have seen their reactions. OnCancel should not be called
81 // until after the method handler is done and the RPC has completed with a
82 // cancellation. This is tracked by counting how many of these conditions have
83 // been met and calling OnCancel when none remain unmet.
84
85 // Public versions of MaybeDone: one where we don't know the reactor in
86 // advance (used for the ServerContext CompletionOp), and one for where we
87 // know the inlineability of the OnDone reaction. You should set the inline
88 // flag to true if either the Reactor is InternalInlineable() or if this
89 // callback is already being forced to run dispatched to an executor
90 // (typically because it contains additional work than just the MaybeDone).
91
92 void MaybeDone() {
93 if (GPR_UNLIKELY(Unref() == 1)) {
94 ScheduleOnDone(inline_ondone: reactor()->InternalInlineable());
95 }
96 }
97
98 void MaybeDone(bool inline_ondone) {
99 if (GPR_UNLIKELY(Unref() == 1)) {
100 ScheduleOnDone(inline_ondone);
101 }
102 }
103
104 // Fast version called with known reactor passed in, used from derived
105 // classes, typically in non-cancel case
106 void MaybeCallOnCancel(ServerReactor* reactor) {
107 if (GPR_UNLIKELY(UnblockCancellation())) {
108 CallOnCancel(reactor);
109 }
110 }
111
112 // Slower version called from object that doesn't know the reactor a priori
113 // (such as the ServerContext CompletionOp which is formed before the
114 // reactor). This is used in cancel cases only, so it's ok to be slower and
115 // invoke a virtual function.
116 void MaybeCallOnCancel() {
117 if (GPR_UNLIKELY(UnblockCancellation())) {
118 CallOnCancel(reactor: reactor());
119 }
120 }
121
122 protected:
123 /// Increases the reference count
124 void Ref() { callbacks_outstanding_.fetch_add(i: 1, m: std::memory_order_relaxed); }
125
126 private:
127 virtual ServerReactor* reactor() = 0;
128
129 // CallOnDone performs the work required at completion of the RPC: invoking
130 // the OnDone function and doing all necessary cleanup. This function is only
131 // ever invoked on a fully-Unref'fed ServerCallbackCall.
132 virtual void CallOnDone() = 0;
133
134 // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
135 // to an executor.
136 void ScheduleOnDone(bool inline_ondone);
137
138 // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
139 // it to an executor.
140 void CallOnCancel(ServerReactor* reactor);
141
142 // Implement the cancellation constraint counter. Return true if OnCancel
143 // should be called, false otherwise.
144 bool UnblockCancellation() {
145 return on_cancel_conditions_remaining_.fetch_sub(
146 i: 1, m: std::memory_order_acq_rel) == 1;
147 }
148
149 /// Decreases the reference count and returns the previous value
150 int Unref() {
151 return callbacks_outstanding_.fetch_sub(i: 1, m: std::memory_order_acq_rel);
152 }
153
154 std::atomic_int on_cancel_conditions_remaining_{2};
155 std::atomic_int callbacks_outstanding_{
156 3}; // reserve for start, Finish, and CompletionOp
157};
158
159template <class Request, class Response>
160class DefaultMessageHolder
161 : public ::grpc::experimental::MessageHolder<Request, Response> {
162 public:
163 DefaultMessageHolder() {
164 this->set_request(&request_obj_);
165 this->set_response(&response_obj_);
166 }
167 void Release() override {
168 // the object is allocated in the call arena.
169 this->~DefaultMessageHolder<Request, Response>();
170 }
171
172 private:
173 Request request_obj_;
174 Response response_obj_;
175};
176
177} // namespace internal
178
179// Forward declarations
180class ServerUnaryReactor;
181template <class Request>
182class ServerReadReactor;
183template <class Response>
184class ServerWriteReactor;
185template <class Request, class Response>
186class ServerBidiReactor;
187
188// NOTE: The actual call/stream object classes are provided as API only to
189// support mocking. There are no implementations of these class interfaces in
190// the API.
191class ServerCallbackUnary : public internal::ServerCallbackCall {
192 public:
193 virtual ~ServerCallbackUnary() {}
194 virtual void Finish(::grpc::Status s) = 0;
195 virtual void SendInitialMetadata() = 0;
196
197 protected:
198 // Use a template rather than explicitly specifying ServerUnaryReactor to
199 // delay binding and avoid a circular forward declaration issue
200 template <class Reactor>
201 void BindReactor(Reactor* reactor) {
202 reactor->InternalBindCall(this);
203 }
204};
205
206template <class Request>
207class ServerCallbackReader : public internal::ServerCallbackCall {
208 public:
209 virtual ~ServerCallbackReader() {}
210 virtual void Finish(::grpc::Status s) = 0;
211 virtual void SendInitialMetadata() = 0;
212 virtual void Read(Request* msg) = 0;
213
214 protected:
215 void BindReactor(ServerReadReactor<Request>* reactor) {
216 reactor->InternalBindReader(this);
217 }
218};
219
220template <class Response>
221class ServerCallbackWriter : public internal::ServerCallbackCall {
222 public:
223 virtual ~ServerCallbackWriter() {}
224
225 virtual void Finish(::grpc::Status s) = 0;
226 virtual void SendInitialMetadata() = 0;
227 virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
228 virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
229 ::grpc::Status s) = 0;
230
231 protected:
232 void BindReactor(ServerWriteReactor<Response>* reactor) {
233 reactor->InternalBindWriter(this);
234 }
235};
236
237template <class Request, class Response>
238class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
239 public:
240 virtual ~ServerCallbackReaderWriter() {}
241
242 virtual void Finish(::grpc::Status s) = 0;
243 virtual void SendInitialMetadata() = 0;
244 virtual void Read(Request* msg) = 0;
245 virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
246 virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
247 ::grpc::Status s) = 0;
248
249 protected:
250 void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
251 reactor->InternalBindStream(this);
252 }
253};
254
255// The following classes are the reactor interfaces that are to be implemented
256// by the user, returned as the output parameter of the method handler for a
257// callback method. Note that none of the classes are pure; all reactions have a
258// default empty reaction so that the user class only needs to override those
259// classes that it cares about.
260
261/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
262template <class Request, class Response>
263class ServerBidiReactor : public internal::ServerReactor {
264 public:
265 // NOTE: Initializing stream_ as a constructor initializer rather than a
266 // default initializer because gcc-4.x requires a copy constructor for
267 // default initializing a templated member, which isn't ok for atomic.
268 // TODO(vjpai): Switch to default constructor and default initializer when
269 // gcc-4.x is no longer supported
270 ServerBidiReactor() : stream_(nullptr) {}
271 ~ServerBidiReactor() = default;
272
273 /// Send any initial metadata stored in the RPC context. If not invoked,
274 /// any initial metadata will be passed along with the first Write or the
275 /// Finish (if there are no writes).
276 void StartSendInitialMetadata() {
277 ServerCallbackReaderWriter<Request, Response>* stream =
278 stream_.load(std::memory_order_acquire);
279 if (stream == nullptr) {
280 grpc::internal::MutexLock l(&stream_mu_);
281 stream = stream_.load(std::memory_order_relaxed);
282 if (stream == nullptr) {
283 backlog_.send_initial_metadata_wanted = true;
284 return;
285 }
286 }
287 stream->SendInitialMetadata();
288 }
289
290 /// Initiate a read operation.
291 ///
292 /// \param[out] req Where to eventually store the read message. Valid when
293 /// the library calls OnReadDone
294 void StartRead(Request* req) {
295 ServerCallbackReaderWriter<Request, Response>* stream =
296 stream_.load(std::memory_order_acquire);
297 if (stream == nullptr) {
298 grpc::internal::MutexLock l(&stream_mu_);
299 stream = stream_.load(std::memory_order_relaxed);
300 if (stream == nullptr) {
301 backlog_.read_wanted = req;
302 return;
303 }
304 }
305 stream->Read(req);
306 }
307
308 /// Initiate a write operation.
309 ///
310 /// \param[in] resp The message to be written. The library does not take
311 /// ownership but the caller must ensure that the message is
312 /// not deleted or modified until OnWriteDone is called.
313 void StartWrite(const Response* resp) {
314 StartWrite(resp, ::grpc::WriteOptions());
315 }
316
317 /// Initiate a write operation with specified options.
318 ///
319 /// \param[in] resp The message to be written. The library does not take
320 /// ownership but the caller must ensure that the message is
321 /// not deleted or modified until OnWriteDone is called.
322 /// \param[in] options The WriteOptions to use for writing this message
323 void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
324 ServerCallbackReaderWriter<Request, Response>* stream =
325 stream_.load(std::memory_order_acquire);
326 if (stream == nullptr) {
327 grpc::internal::MutexLock l(&stream_mu_);
328 stream = stream_.load(std::memory_order_relaxed);
329 if (stream == nullptr) {
330 backlog_.write_wanted = resp;
331 backlog_.write_options_wanted = std::move(options);
332 return;
333 }
334 }
335 stream->Write(resp, std::move(options));
336 }
337
338 /// Initiate a write operation with specified options and final RPC Status,
339 /// which also causes any trailing metadata for this RPC to be sent out.
340 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
341 /// single step. A key difference, though, is that this operation doesn't have
342 /// an OnWriteDone reaction - it is considered complete only when OnDone is
343 /// available. An RPC can either have StartWriteAndFinish or Finish, but not
344 /// both.
345 ///
346 /// \param[in] resp The message to be written. The library does not take
347 /// ownership but the caller must ensure that the message is
348 /// not deleted or modified until OnDone is called.
349 /// \param[in] options The WriteOptions to use for writing this message
350 /// \param[in] s The status outcome of this RPC
351 void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
352 ::grpc::Status s) {
353 ServerCallbackReaderWriter<Request, Response>* stream =
354 stream_.load(std::memory_order_acquire);
355 if (stream == nullptr) {
356 grpc::internal::MutexLock l(&stream_mu_);
357 stream = stream_.load(std::memory_order_relaxed);
358 if (stream == nullptr) {
359 backlog_.write_and_finish_wanted = true;
360 backlog_.write_wanted = resp;
361 backlog_.write_options_wanted = std::move(options);
362 backlog_.status_wanted = std::move(s);
363 return;
364 }
365 }
366 stream->WriteAndFinish(resp, std::move(options), std::move(s));
367 }
368
369 /// Inform system of a planned write operation with specified options, but
370 /// allow the library to schedule the actual write coalesced with the writing
371 /// of trailing metadata (which takes place on a Finish call).
372 ///
373 /// \param[in] resp The message to be written. The library does not take
374 /// ownership but the caller must ensure that the message is
375 /// not deleted or modified until OnWriteDone is called.
376 /// \param[in] options The WriteOptions to use for writing this message
377 void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
378 StartWrite(resp, std::move(options.set_last_message()));
379 }
380
381 /// Indicate that the stream is to be finished and the trailing metadata and
382 /// RPC status are to be sent. Every RPC MUST be finished using either Finish
383 /// or StartWriteAndFinish (but not both), even if the RPC is already
384 /// cancelled.
385 ///
386 /// \param[in] s The status outcome of this RPC
387 void Finish(::grpc::Status s) {
388 ServerCallbackReaderWriter<Request, Response>* stream =
389 stream_.load(std::memory_order_acquire);
390 if (stream == nullptr) {
391 grpc::internal::MutexLock l(&stream_mu_);
392 stream = stream_.load(std::memory_order_relaxed);
393 if (stream == nullptr) {
394 backlog_.finish_wanted = true;
395 backlog_.status_wanted = std::move(s);
396 return;
397 }
398 }
399 stream->Finish(std::move(s));
400 }
401
402 /// Notifies the application that an explicit StartSendInitialMetadata
403 /// operation completed. Not used when the sending of initial metadata
404 /// piggybacks onto the first write.
405 ///
406 /// \param[in] ok Was it successful? If false, no further write-side operation
407 /// will succeed.
408 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
409
410 /// Notifies the application that a StartRead operation completed.
411 ///
412 /// \param[in] ok Was it successful? If false, no further read-side operation
413 /// will succeed.
414 virtual void OnReadDone(bool /*ok*/) {}
415
416 /// Notifies the application that a StartWrite (or StartWriteLast) operation
417 /// completed.
418 ///
419 /// \param[in] ok Was it successful? If false, no further write-side operation
420 /// will succeed.
421 virtual void OnWriteDone(bool /*ok*/) {}
422
423 /// Notifies the application that all operations associated with this RPC
424 /// have completed. This is an override (from the internal base class) but
425 /// still abstract, so derived classes MUST override it to be instantiated.
426 void OnDone() override = 0;
427
428 /// Notifies the application that this RPC has been cancelled. This is an
429 /// override (from the internal base class) but not final, so derived classes
430 /// should override it if they want to take action.
431 void OnCancel() override {}
432
433 private:
434 friend class ServerCallbackReaderWriter<Request, Response>;
435 // May be overridden by internal implementation details. This is not a public
436 // customization point.
437 virtual void InternalBindStream(
438 ServerCallbackReaderWriter<Request, Response>* stream) {
439 // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
440 // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
441 // has stream, then std::get<PreBindBacklog> out of that after the lock.
442 // Do likewise with the remaining InternalBind* functions as well.
443 grpc::internal::ReleasableMutexLock l(&stream_mu_);
444 PreBindBacklog ops(std::move(backlog_));
445 stream_.store(stream, std::memory_order_release);
446 l.Unlock();
447
448 if (ops.send_initial_metadata_wanted) {
449 stream->SendInitialMetadata();
450 }
451 if (ops.read_wanted != nullptr) {
452 stream->Read(ops.read_wanted);
453 }
454 if (ops.write_and_finish_wanted) {
455 stream->WriteAndFinish(ops.write_wanted,
456 std::move(ops.write_options_wanted),
457 std::move(ops.status_wanted));
458 } else {
459 if (ops.write_wanted != nullptr) {
460 stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
461 }
462 if (ops.finish_wanted) {
463 stream->Finish(std::move(ops.status_wanted));
464 }
465 }
466 }
467
468 grpc::internal::Mutex stream_mu_;
469 // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
470 // once C++17 or ABSL is supported since stream and backlog are
471 // mutually exclusive in this class. Do likewise with the
472 // remaining reactor classes and their backlogs as well.
473 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
474 struct PreBindBacklog {
475 bool send_initial_metadata_wanted = false;
476 bool write_and_finish_wanted = false;
477 bool finish_wanted = false;
478 Request* read_wanted = nullptr;
479 const Response* write_wanted = nullptr;
480 ::grpc::WriteOptions write_options_wanted;
481 ::grpc::Status status_wanted;
482 };
483 PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
484};
485
486/// \a ServerReadReactor is the interface for a client-streaming RPC.
487template <class Request>
488class ServerReadReactor : public internal::ServerReactor {
489 public:
490 ServerReadReactor() : reader_(nullptr) {}
491 ~ServerReadReactor() = default;
492
493 /// The following operation initiations are exactly like ServerBidiReactor.
494 void StartSendInitialMetadata() {
495 ServerCallbackReader<Request>* reader =
496 reader_.load(std::memory_order_acquire);
497 if (reader == nullptr) {
498 grpc::internal::MutexLock l(&reader_mu_);
499 reader = reader_.load(std::memory_order_relaxed);
500 if (reader == nullptr) {
501 backlog_.send_initial_metadata_wanted = true;
502 return;
503 }
504 }
505 reader->SendInitialMetadata();
506 }
507 void StartRead(Request* req) {
508 ServerCallbackReader<Request>* reader =
509 reader_.load(std::memory_order_acquire);
510 if (reader == nullptr) {
511 grpc::internal::MutexLock l(&reader_mu_);
512 reader = reader_.load(std::memory_order_relaxed);
513 if (reader == nullptr) {
514 backlog_.read_wanted = req;
515 return;
516 }
517 }
518 reader->Read(req);
519 }
520 void Finish(::grpc::Status s) {
521 ServerCallbackReader<Request>* reader =
522 reader_.load(std::memory_order_acquire);
523 if (reader == nullptr) {
524 grpc::internal::MutexLock l(&reader_mu_);
525 reader = reader_.load(std::memory_order_relaxed);
526 if (reader == nullptr) {
527 backlog_.finish_wanted = true;
528 backlog_.status_wanted = std::move(s);
529 return;
530 }
531 }
532 reader->Finish(std::move(s));
533 }
534
535 /// The following notifications are exactly like ServerBidiReactor.
536 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
537 virtual void OnReadDone(bool /*ok*/) {}
538 void OnDone() override = 0;
539 void OnCancel() override {}
540
541 private:
542 friend class ServerCallbackReader<Request>;
543
544 // May be overridden by internal implementation details. This is not a public
545 // customization point.
546 virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
547 grpc::internal::ReleasableMutexLock l(&reader_mu_);
548 PreBindBacklog ops(std::move(backlog_));
549 reader_.store(reader, std::memory_order_release);
550 l.Unlock();
551
552 if (ops.send_initial_metadata_wanted) {
553 reader->SendInitialMetadata();
554 }
555 if (ops.read_wanted != nullptr) {
556 reader->Read(ops.read_wanted);
557 }
558 if (ops.finish_wanted) {
559 reader->Finish(std::move(ops.status_wanted));
560 }
561 }
562
563 grpc::internal::Mutex reader_mu_;
564 std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
565 struct PreBindBacklog {
566 bool send_initial_metadata_wanted = false;
567 bool finish_wanted = false;
568 Request* read_wanted = nullptr;
569 ::grpc::Status status_wanted;
570 };
571 PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
572};
573
574/// \a ServerWriteReactor is the interface for a server-streaming RPC.
575template <class Response>
576class ServerWriteReactor : public internal::ServerReactor {
577 public:
578 ServerWriteReactor() : writer_(nullptr) {}
579 ~ServerWriteReactor() = default;
580
581 /// The following operation initiations are exactly like ServerBidiReactor.
582 void StartSendInitialMetadata() {
583 ServerCallbackWriter<Response>* writer =
584 writer_.load(std::memory_order_acquire);
585 if (writer == nullptr) {
586 grpc::internal::MutexLock l(&writer_mu_);
587 writer = writer_.load(std::memory_order_relaxed);
588 if (writer == nullptr) {
589 backlog_.send_initial_metadata_wanted = true;
590 return;
591 }
592 }
593 writer->SendInitialMetadata();
594 }
595 void StartWrite(const Response* resp) {
596 StartWrite(resp, ::grpc::WriteOptions());
597 }
598 void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
599 ServerCallbackWriter<Response>* writer =
600 writer_.load(std::memory_order_acquire);
601 if (writer == nullptr) {
602 grpc::internal::MutexLock l(&writer_mu_);
603 writer = writer_.load(std::memory_order_relaxed);
604 if (writer == nullptr) {
605 backlog_.write_wanted = resp;
606 backlog_.write_options_wanted = std::move(options);
607 return;
608 }
609 }
610 writer->Write(resp, std::move(options));
611 }
612 void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
613 ::grpc::Status s) {
614 ServerCallbackWriter<Response>* writer =
615 writer_.load(std::memory_order_acquire);
616 if (writer == nullptr) {
617 grpc::internal::MutexLock l(&writer_mu_);
618 writer = writer_.load(std::memory_order_relaxed);
619 if (writer == nullptr) {
620 backlog_.write_and_finish_wanted = true;
621 backlog_.write_wanted = resp;
622 backlog_.write_options_wanted = std::move(options);
623 backlog_.status_wanted = std::move(s);
624 return;
625 }
626 }
627 writer->WriteAndFinish(resp, std::move(options), std::move(s));
628 }
629 void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
630 StartWrite(resp, std::move(options.set_last_message()));
631 }
632 void Finish(::grpc::Status s) {
633 ServerCallbackWriter<Response>* writer =
634 writer_.load(std::memory_order_acquire);
635 if (writer == nullptr) {
636 grpc::internal::MutexLock l(&writer_mu_);
637 writer = writer_.load(std::memory_order_relaxed);
638 if (writer == nullptr) {
639 backlog_.finish_wanted = true;
640 backlog_.status_wanted = std::move(s);
641 return;
642 }
643 }
644 writer->Finish(std::move(s));
645 }
646
647 /// The following notifications are exactly like ServerBidiReactor.
648 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
649 virtual void OnWriteDone(bool /*ok*/) {}
650 void OnDone() override = 0;
651 void OnCancel() override {}
652
653 private:
654 friend class ServerCallbackWriter<Response>;
655 // May be overridden by internal implementation details. This is not a public
656 // customization point.
657 virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
658 grpc::internal::ReleasableMutexLock l(&writer_mu_);
659 PreBindBacklog ops(std::move(backlog_));
660 writer_.store(writer, std::memory_order_release);
661 l.Unlock();
662
663 if (ops.send_initial_metadata_wanted) {
664 writer->SendInitialMetadata();
665 }
666 if (ops.write_and_finish_wanted) {
667 writer->WriteAndFinish(ops.write_wanted,
668 std::move(ops.write_options_wanted),
669 std::move(ops.status_wanted));
670 } else {
671 if (ops.write_wanted != nullptr) {
672 writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
673 }
674 if (ops.finish_wanted) {
675 writer->Finish(std::move(ops.status_wanted));
676 }
677 }
678 }
679
680 grpc::internal::Mutex writer_mu_;
681 std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
682 struct PreBindBacklog {
683 bool send_initial_metadata_wanted = false;
684 bool write_and_finish_wanted = false;
685 bool finish_wanted = false;
686 const Response* write_wanted = nullptr;
687 ::grpc::WriteOptions write_options_wanted;
688 ::grpc::Status status_wanted;
689 };
690 PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
691};
692
693class ServerUnaryReactor : public internal::ServerReactor {
694 public:
695 ServerUnaryReactor() : call_(nullptr) {}
696 ~ServerUnaryReactor() = default;
697
698 /// StartSendInitialMetadata is exactly like ServerBidiReactor.
699 void StartSendInitialMetadata() {
700 ServerCallbackUnary* call = call_.load(m: std::memory_order_acquire);
701 if (call == nullptr) {
702 grpc::internal::MutexLock l(&call_mu_);
703 call = call_.load(m: std::memory_order_relaxed);
704 if (call == nullptr) {
705 backlog_.send_initial_metadata_wanted = true;
706 return;
707 }
708 }
709 call->SendInitialMetadata();
710 }
711 /// Finish is similar to ServerBidiReactor except for one detail.
712 /// If the status is non-OK, any message will not be sent. Instead,
713 /// the client will only receive the status and any trailing metadata.
714 void Finish(::grpc::Status s) {
715 ServerCallbackUnary* call = call_.load(m: std::memory_order_acquire);
716 if (call == nullptr) {
717 grpc::internal::MutexLock l(&call_mu_);
718 call = call_.load(m: std::memory_order_relaxed);
719 if (call == nullptr) {
720 backlog_.finish_wanted = true;
721 backlog_.status_wanted = std::move(s);
722 return;
723 }
724 }
725 call->Finish(s: std::move(s));
726 }
727
728 /// The following notifications are exactly like ServerBidiReactor.
729 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
730 void OnDone() override = 0;
731 void OnCancel() override {}
732
733 private:
734 friend class ServerCallbackUnary;
735 // May be overridden by internal implementation details. This is not a public
736 // customization point.
737 virtual void InternalBindCall(ServerCallbackUnary* call) {
738 grpc::internal::ReleasableMutexLock l(&call_mu_);
739 PreBindBacklog ops(std::move(backlog_));
740 call_.store(p: call, m: std::memory_order_release);
741 l.Unlock();
742
743 if (ops.send_initial_metadata_wanted) {
744 call->SendInitialMetadata();
745 }
746 if (ops.finish_wanted) {
747 call->Finish(s: std::move(ops.status_wanted));
748 }
749 }
750
751 grpc::internal::Mutex call_mu_;
752 std::atomic<ServerCallbackUnary*> call_{nullptr};
753 struct PreBindBacklog {
754 bool send_initial_metadata_wanted = false;
755 bool finish_wanted = false;
756 ::grpc::Status status_wanted;
757 };
758 PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
759};
760
761namespace internal {
762
763template <class Base>
764class FinishOnlyReactor : public Base {
765 public:
766 explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); }
767 void OnDone() override { this->~FinishOnlyReactor(); }
768};
769
770using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
771template <class Request>
772using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
773template <class Response>
774using UnimplementedWriteReactor =
775 FinishOnlyReactor<ServerWriteReactor<Response>>;
776template <class Request, class Response>
777using UnimplementedBidiReactor =
778 FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
779
780} // namespace internal
781} // namespace grpc_impl
782
783#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
784

source code of include/grpcpp/impl/codegen/server_callback_impl.h