47 #include <sys/types.h>
48 #include <sys/socket.h>
55 #include <qb/qbdefs.h>
56 #include <qb/qbipcc.h>
69 #define MAP_ANONYMOUS MAP_ANON
75 #define CPG_MEMORY_MAP_UMASK 077
78 qb_ipcc_connection_t *
c;
87 static void cpg_inst_free (
void *inst);
93 qb_ipcc_connection_t *
conn;
106 coroipcc_msg_send_reply_receive (
107 qb_ipcc_connection_t *c,
108 const struct iovec *iov,
109 unsigned int iov_len,
113 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
119 list_del (&cpg_iteration_instance->
list);
123 static void cpg_inst_free (
void *inst)
126 qb_ipcc_disconnect(cpg_inst->
c);
138 iter_next = iter->
next;
142 cpg_iteration_instance_finalize (cpg_iteration_instance);
144 hdb_handle_destroy (&cpg_handle_t_db, handle);
177 struct cpg_inst *cpg_inst;
181 goto error_no_destroy;
184 error =
hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db,
sizeof (
struct cpg_inst), handle));
185 if (error !=
CS_OK) {
186 goto error_no_destroy;
189 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (
void *)&cpg_inst));
190 if (error !=
CS_OK) {
195 if (cpg_inst->
c == NULL) {
197 goto error_put_destroy;
200 if (model_data != NULL) {
218 hdb_handle_put (&cpg_handle_t_db, *handle);
223 hdb_handle_put (&cpg_handle_t_db, *handle);
225 hdb_handle_destroy (&cpg_handle_t_db, *handle);
233 struct cpg_inst *cpg_inst;
239 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
240 if (error !=
CS_OK) {
248 hdb_handle_put (&cpg_handle_t_db, handle);
257 req_lib_cpg_finalize.header.size =
sizeof (
struct req_lib_cpg_finalize);
260 iov.iov_base = (
void *)&req_lib_cpg_finalize;
261 iov.iov_len =
sizeof (
struct req_lib_cpg_finalize);
263 error = coroipcc_msg_send_reply_receive (cpg_inst->
c,
266 &res_lib_cpg_finalize,
267 sizeof (
struct res_lib_cpg_finalize));
269 cpg_inst_finalize (cpg_inst, handle);
270 hdb_handle_put (&cpg_handle_t_db, handle);
280 struct cpg_inst *cpg_inst;
282 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
283 if (error !=
CS_OK) {
289 hdb_handle_put (&cpg_handle_t_db, handle);
299 struct cpg_inst *cpg_inst;
301 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
302 if (error !=
CS_OK) {
308 hdb_handle_put (&cpg_handle_t_db, handle);
318 struct cpg_inst *cpg_inst;
320 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
321 if (error !=
CS_OK) {
327 hdb_handle_put (&cpg_handle_t_db, handle);
339 struct cpg_inst *cpg_inst;
343 struct cpg_inst cpg_inst_copy;
344 struct qb_ipc_response_header *dispatch_data;
357 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
358 if (error !=
CS_OK) {
370 dispatch_data = (
struct qb_ipc_response_header *)dispatch_buf;
372 errno_res = qb_ipcc_event_recv (
396 if (error !=
CS_OK) {
405 memcpy (&cpg_inst_copy, cpg_inst,
sizeof (
struct cpg_inst));
411 switch (dispatch_data->id) {
419 marshall_from_mar_cpg_name_t (
421 &res_cpg_deliver_callback->group_name);
425 res_cpg_deliver_callback->nodeid,
426 res_cpg_deliver_callback->pid,
427 &res_cpg_deliver_callback->message,
428 res_cpg_deliver_callback->msglen);
438 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
439 marshall_from_mar_cpg_address_t (&member_list[i],
442 left_list_start = res_cpg_confchg_callback->
member_list +
443 res_cpg_confchg_callback->member_list_entries;
444 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
445 marshall_from_mar_cpg_address_t (&left_list[i],
446 &left_list_start[i]);
448 joined_list_start = res_cpg_confchg_callback->
member_list +
449 res_cpg_confchg_callback->member_list_entries +
450 res_cpg_confchg_callback->left_list_entries;
451 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
452 marshall_from_mar_cpg_address_t (&joined_list[i],
453 &joined_list_start[i]);
455 marshall_from_mar_cpg_name_t (
457 &res_cpg_confchg_callback->group_name);
462 res_cpg_confchg_callback->member_list_entries,
464 res_cpg_confchg_callback->left_list_entries,
466 res_cpg_confchg_callback->joined_list_entries);
476 marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
477 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
478 totem_member_list[i] = res_cpg_totem_confchg_callback->
member_list[i];
483 res_cpg_totem_confchg_callback->member_list_entries,
512 hdb_handle_put (&cpg_handle_t_db, handle);
521 struct cpg_inst *cpg_inst;
530 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
531 if (error !=
CS_OK) {
536 req_lib_cpg_join.header.size =
sizeof (
struct req_lib_cpg_join);
538 req_lib_cpg_join.pid = getpid();
539 req_lib_cpg_join.flags = 0;
547 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
550 iov[0].iov_base = (
void *)&req_lib_cpg_join;
551 iov[0].iov_len =
sizeof (
struct req_lib_cpg_join);
554 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, iov, 1,
557 if (error !=
CS_OK) {
562 error = response.header.error;
565 hdb_handle_put (&cpg_handle_t_db, handle);
575 struct cpg_inst *cpg_inst;
584 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
585 if (error !=
CS_OK) {
589 req_lib_cpg_leave.header.size =
sizeof (
struct req_lib_cpg_leave);
591 req_lib_cpg_leave.pid = getpid();
592 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
595 iov[0].iov_base = (
void *)&req_lib_cpg_leave;
596 iov[0].iov_len =
sizeof (
struct req_lib_cpg_leave);
599 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, iov, 1,
600 &res_lib_cpg_leave, sizeof (
struct res_lib_cpg_leave));
602 if (error !=
CS_OK) {
605 }
while (res_lib_cpg_leave.header.error ==
CS_ERR_BUSY);
607 error = res_lib_cpg_leave.header.error;
610 hdb_handle_put (&cpg_handle_t_db, handle);
619 int *member_list_entries)
622 struct cpg_inst *cpg_inst;
631 if (member_list == NULL) {
634 if (member_list_entries == NULL) {
638 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
639 if (error !=
CS_OK) {
643 req_lib_cpg_membership_get.header.size =
sizeof (
struct req_lib_cpg_membership_get);
646 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
649 iov.iov_base = (
void *)&req_lib_cpg_membership_get;
650 iov.iov_len =
sizeof (
struct req_lib_cpg_membership_get);
652 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, &iov, 1,
653 &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
655 if (error !=
CS_OK) {
659 error = res_lib_cpg_membership_get.header.error;
664 *member_list_entries = res_lib_cpg_membership_get.member_count;
666 for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
667 marshall_from_mar_cpg_address_t (&member_list[i],
673 hdb_handle_put (&cpg_handle_t_db, handle);
680 unsigned int *local_nodeid)
683 struct cpg_inst *cpg_inst;
688 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
689 if (error !=
CS_OK) {
693 req_lib_cpg_local_get.header.size =
sizeof (
struct qb_ipc_request_header);
696 iov.iov_base = (
void *)&req_lib_cpg_local_get;
697 iov.iov_len =
sizeof (
struct req_lib_cpg_local_get);
699 error = coroipcc_msg_send_reply_receive (cpg_inst->
c, &iov, 1,
700 &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
702 if (error !=
CS_OK) {
706 error = res_lib_cpg_local_get.header.error;
708 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
711 hdb_handle_put (&cpg_handle_t_db, handle);
721 struct cpg_inst *cpg_inst;
723 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
724 if (error !=
CS_OK) {
730 hdb_handle_put (&cpg_handle_t_db, handle);
736 memory_map (
char *path,
const char *file,
void **buf,
size_t bytes)
745 long int sysconf_page_size;
748 snprintf (path, PATH_MAX,
"/dev/shm/%s", file);
752 (void)umask(old_umask);
757 (void)umask(old_umask);
763 res = ftruncate (fd, bytes);
765 goto error_close_unlink;
767 sysconf_page_size = sysconf(_SC_PAGESIZE);
768 if (sysconf_page_size <= 0) {
769 goto error_close_unlink;
771 page_size = sysconf_page_size;
772 buffer = malloc (page_size);
773 if (buffer == NULL) {
774 goto error_close_unlink;
776 memset (buffer, 0, page_size);
777 for (i = 0; i < (bytes / page_size); i++) {
779 written = write (fd, buffer, page_size);
780 if (written == -1 && errno == EINTR) {
783 if (written != page_size) {
785 goto error_close_unlink;
790 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
793 if (addr == MAP_FAILED) {
794 goto error_close_unlink;
797 madvise(addr, bytes, MADV_NOSYNC);
822 struct qb_ipc_response_header res_coroipcs_zc_alloc;
827 struct cpg_inst *cpg_inst;
829 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
830 if (error !=
CS_OK) {
834 map_size = size +
sizeof (
struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
835 assert(memory_map (path,
"corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
839 munmap (buf, map_size);
845 req_coroipcc_zc_alloc.map_size = map_size;
846 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
848 iovec.iov_base = (
void *)&req_coroipcc_zc_alloc;
851 error = coroipcc_msg_send_reply_receive (
855 &res_coroipcs_zc_alloc,
856 sizeof (
struct qb_ipc_response_header));
858 hdr = (
struct coroipcs_zc_header *)buf;
860 *buffer = ((
char *)buf) +
sizeof (
struct coroipcs_zc_header);
862 hdb_handle_put (&cpg_handle_t_db, handle);
873 struct cpg_inst *cpg_inst;
875 struct qb_ipc_response_header res_coroipcs_zc_free;
879 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
880 if (error !=
CS_OK) {
886 req_coroipcc_zc_free.map_size = header->
map_size;
889 iovec.iov_base = (
void *)&req_coroipcc_zc_free;
892 error = coroipcc_msg_send_reply_receive (
896 &res_coroipcs_zc_free,
897 sizeof (
struct qb_ipc_response_header));
899 munmap ((
void *)header, header->
map_size);
901 hdb_handle_put (&cpg_handle_t_db, handle);
913 struct cpg_inst *cpg_inst;
920 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
921 if (error !=
CS_OK) {
924 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)(((
char *)msg) -
sizeof (
struct req_lib_cpg_mcast));
925 req_lib_cpg_mcast->header.size =
sizeof (
struct req_lib_cpg_mcast) +
929 req_lib_cpg_mcast->guarantee =
guarantee;
930 req_lib_cpg_mcast->msglen = msg_len;
938 iovec.iov_base = (
void *)&req_coroipcc_zc_execute;
941 error = coroipcc_msg_send_reply_receive (
946 sizeof(res_lib_cpg_mcast));
948 if (error !=
CS_OK) {
952 error = res_lib_cpg_mcast.header.error;
955 hdb_handle_put (&cpg_handle_t_db, handle);
963 const struct iovec *iovec,
964 unsigned int iov_len)
968 struct cpg_inst *cpg_inst;
969 struct iovec iov[64];
973 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
974 if (error !=
CS_OK) {
978 for (i = 0; i < iov_len; i++ ) {
979 msg_len += iovec[i].iov_len;
982 req_lib_cpg_mcast.header.size =
sizeof (
struct req_lib_cpg_mcast) +
987 req_lib_cpg_mcast.msglen = msg_len;
989 iov[0].iov_base = (
void *)&req_lib_cpg_mcast;
990 iov[0].iov_len =
sizeof (
struct req_lib_cpg_mcast);
991 memcpy (&iov[1], iovec, iov_len *
sizeof (
struct iovec));
993 qb_ipcc_fc_enable_max_set(cpg_inst->
c, 2);
995 qb_ipcc_fc_enable_max_set(cpg_inst->
c, 1);
997 hdb_handle_put (&cpg_handle_t_db, handle);
1010 struct cpg_inst *cpg_inst;
1018 if (cpg_iteration_handle == NULL) {
1033 error =
hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (
void *)&cpg_inst));
1034 if (error !=
CS_OK) {
1038 error =
hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1040 if (error !=
CS_OK) {
1041 goto error_put_cpg_db;
1044 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1045 (
void *)&cpg_iteration_instance));
1046 if (error !=
CS_OK) {
1050 cpg_iteration_instance->
conn = cpg_inst->
c;
1052 list_init (&cpg_iteration_instance->
list);
1054 req_lib_cpg_iterationinitialize.header.size =
sizeof (
struct req_lib_cpg_iterationinitialize);
1056 req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1058 marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1061 iov.iov_base = (
void *)&req_lib_cpg_iterationinitialize;
1062 iov.iov_len =
sizeof (
struct req_lib_cpg_iterationinitialize);
1064 error = coroipcc_msg_send_reply_receive (cpg_inst->
c,
1067 &res_lib_cpg_iterationinitialize,
1068 sizeof (
struct res_lib_cpg_iterationinitialize));
1070 if (error !=
CS_OK) {
1071 goto error_put_destroy;
1075 res_lib_cpg_iterationinitialize.iteration_handle;
1080 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1081 hdb_handle_put (&cpg_handle_t_db, handle);
1083 return (res_lib_cpg_iterationinitialize.header.error);
1086 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1088 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1090 hdb_handle_put (&cpg_handle_t_db, handle);
1104 if (description == NULL) {
1108 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1109 (
void *)&cpg_iteration_instance));
1110 if (error !=
CS_OK) {
1114 req_lib_cpg_iterationnext.header.size =
sizeof (
struct req_lib_cpg_iterationnext);
1119 &req_lib_cpg_iterationnext,
1120 req_lib_cpg_iterationnext.header.size));
1121 if (error !=
CS_OK) {
1126 &res_lib_cpg_iterationnext,
1127 sizeof(
struct res_lib_cpg_iterationnext), -1));
1128 if (error !=
CS_OK) {
1132 marshall_from_mar_cpg_iteration_description_t(
1134 &res_lib_cpg_iterationnext.description);
1136 error = res_lib_cpg_iterationnext.header.error;
1139 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1154 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1155 (
void *)&cpg_iteration_instance));
1156 if (error !=
CS_OK) {
1160 req_lib_cpg_iterationfinalize.header.size =
sizeof (
struct req_lib_cpg_iterationfinalize);
1164 iov.iov_base = (
void *)&req_lib_cpg_iterationfinalize;
1165 iov.iov_len =
sizeof (
struct req_lib_cpg_iterationfinalize);
1167 error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->
conn,
1170 &res_lib_cpg_iterationfinalize,
1171 sizeof (
struct req_lib_cpg_iterationfinalize));
1173 if (error !=
CS_OK) {
1177 cpg_iteration_instance_finalize (cpg_iteration_instance);
1180 return (res_lib_cpg_iterationfinalize.header.error);
1183 hdb_handle_put (&cpg_iteration_handle_t_db, handle);