6 #ifndef CPPIPC_SERVER_COMM_CLIENT_HPP 7 #define CPPIPC_SERVER_COMM_CLIENT_HPP 11 #include <core/parallel/atomic.hpp> 13 #include <boost/thread/thread.hpp> 14 #include <boost/thread/locks.hpp> 15 #include <boost/thread/lock_types.hpp> 16 #include <core/system/nanosockets/socket_errors.hpp> 17 #include <core/system/nanosockets/async_request_socket.hpp> 18 #include <core/system/nanosockets/subscribe_socket.hpp> 19 #include <core/system/cppipc/common/message_types.hpp> 20 #include <core/system/cppipc/common/status_types.hpp> 21 #include <core/system/cppipc/client/issue.hpp> 22 #include <core/system/cppipc/common/ipc_deserializer.hpp> 23 #include <core/system/cppipc/client/console_cancel_handler.hpp> 24 #include <core/system/exceptions/error_types.hpp> 31 std::atomic<size_t>& get_running_command();
32 std::atomic<size_t>& get_cancelled_command();
45 template <
typename RetType,
bool is_proxied_
object>
49 RetType ret = RetType();
205 turi::atomic<size_t> m_command_id;
212 std::map<std::string, std::string> memfn_pointer_to_string;
215 std::vector<std::pair<std::string,
216 std::function<void(std::string)> > > prefix_to_status_callback;
218 boost::mutex status_callback_lock;
220 boost::mutex ref_count_lock;
222 std::map<size_t, size_t> object_ref_count;
233 nanosockets::zmq_msg_vector& ret,
240 void poll_server_pid_is_running();
248 std::string convert_generic_address_to_specific(std::string aux_addr);
259 boost::thread* ping_thread;
264 boost::mutex ping_mutex;
265 boost::condition_variable ping_cond;
270 volatile bool ping_thread_done =
false;
276 volatile bool server_alive =
true;
281 bool socket_closed =
false;
286 volatile size_t ping_failure_count = 0;
288 size_t num_tolerable_ping_failures = 10;
293 void subscribe_callback(
const std::string& msg);
299 std::chrono::steady_clock::time_point object_sync_point;
304 std::string alternate_control_address;
309 std::string alternate_publish_address;
314 bool started =
false;
319 std::string endpoint_name;
324 bool cancel_handling_enabled =
false;
329 int32_t server_alive_watch_pid = 0;
364 size_t num_tolerable_ping_failures = (
size_t)(-1),
365 std::string alternate_control_address=
"",
366 std::string alternate_publish_address=
"",
367 const std::string public_key =
"",
368 const std::string secret_key =
"",
369 const std::string server_public_key =
"",
370 bool ops_interruptible =
false);
385 void set_server_alive_watch_pid(int32_t pid);
390 void init(
bool ops_interruptible =
false);
417 size_t make_object(std::string object_type_name);
425 std::string ping(std::string);
437 size_t incr_ref_count(
size_t object_id);
439 size_t decr_ref_count(
size_t object_id);
441 size_t get_ref_count(
size_t object_id);
447 boost::thread* status_callback_thread = NULL;
452 boost::condition_variable status_buffer_cond;
453 std::vector<std::string> status_buffer;
454 bool status_callback_thread_done =
false;
460 void status_callback_thread_function();
466 void stop_status_callback_thread();
471 void start_status_callback_thread();
504 void add_status_watch(std::string watch_prefix,
505 std::function<
void(std::string)> callback);
514 void remove_status_watch(std::string watch_prefix);
523 void clear_status_watch();
528 void stop_ping_thread();
535 int send_deletion_list(
const std::vector<size_t>& object_ids);
543 template <
typename MemFn>
547 std::string string_f(reinterpret_cast<const char*>(&f),
sizeof(MemFn));
548 string_f = string_f +
typeid(MemFn).name();
549 if (memfn_pointer_to_string.count(string_f) == 0) {
550 memfn_pointer_to_string[string_f] = function_string;
555 template <
typename MemFn>
556 void prepare_call_message_structure(
size_t objectid, MemFn f,
call_message& msg) {
560 std::string string_f(reinterpret_cast<char*>(&f),
sizeof(MemFn));
561 string_f = string_f +
typeid(MemFn).name();
562 if (memfn_pointer_to_string.count(string_f) == 0) {
565 msg.objectid = objectid;
584 template <
typename MemFn,
typename... Args>
585 typename detail::member_function_return_type<MemFn>::type
586 call(
size_t objectid, MemFn f,
const Args&... args) {
590 typedef typename detail::member_function_return_type<MemFn>::type return_type;
592 prepare_call_message_structure(objectid, f, msg);
606 if (oarc.off & 1) oarc.
write(
" ", 1);
612 size_t command_id = m_command_id.inc();
613 auto ret = msg.
properties.insert(std::make_pair<std::string, std::string>(
614 "command_id", std::to_string(command_id)));
617 auto &r = get_running_command();
622 if(cancel_handling_enabled &&
623 !console_cancel_handler::get_instance().set_handler()) {
625 "thus will not respond to CTRL-C.\n";
626 cancel_handling_enabled =
false;
631 int retcode = internal_call(msg, reply);
634 if(cancel_handling_enabled) {
635 if(!console_cancel_handler::get_instance().unset_handler()) {
637 "Could not reset signal handler after server operation. Disabling CTRL-C support.\n";
638 cancel_handling_enabled =
false;
643 if(cancel_handling_enabled) {
645 size_t running_command = get_running_command().load();
646 if(running_command && running_command == get_cancelled_command().load()) {
648 auto ret = reply.
properties.find(std::string(
"cancel"));
656 console_cancel_handler::get_instance().raise_cancel();
662 get_running_command().store(0);
664 bool success = (retcode == 0);
665 std::string custommsg;
667 custommsg = std::string(reply.
body, reply.
bodylen);
672 switch(reply.status) {
674 #ifdef COMPILER_HAS_IOS_BASE_FAILURE_WITH_ERROR_CODE 675 throw(std::ios_base::failure(custommsg, std::error_code()));
677 throw(std::ios_base::failure(custommsg));
680 throw std::out_of_range(custommsg);
686 throw ipcexception(reply.status, retcode, custommsg);
689 detail::set_deserializer_to_client(
this);
691 std::is_convertible<return_type, ipc_object_base*>::value>::exec(*
this, reply);
boost::mutex status_buffer_mutex
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
std::map< std::string, std::string > properties
The status of the call.
detail::member_function_return_type< MemFn >::type call(size_t objectid, MemFn f, const Args &... args)
std::string function_name
the object to call
void clear()
Empties the message, freeing all contents.
std::string delete_object(std::string s3_url, std::string proxy="")
void register_function(MemFn f, std::string function_string)
#define ASSERT_TRUE(cond)
The function requested did not exist.
std::map< std::string, std::string > properties
the function to call on the object
size_t bodylen
The serialized arguments of the call. May point into bodybuf.
void write(const char *c, std::streamsize s)
size_t bodylen
The serialized contents of the reply. May point into bodybuf.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
void issue(turi::oarchive &msg, MemFn fn, const Args &... args)