corosync  2.3.2
exec/cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006-2012 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Christine Caulfield (ccaulfie@redhat.com)
7  * Author: Jan Friesse (jfriesse@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #ifdef HAVE_ALLOCA_H
39 #include <alloca.h>
40 #endif
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <time.h>
53 #include <assert.h>
54 #include <unistd.h>
55 #include <netinet/in.h>
56 #include <arpa/inet.h>
57 #include <sys/mman.h>
58 #include <qb/qbmap.h>
59 
60 #include <corosync/corotypes.h>
61 #include <qb/qbipc_common.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/list.h>
64 #include <corosync/logsys.h>
65 #include <corosync/coroapi.h>
66 
67 #include <corosync/cpg.h>
68 #include <corosync/ipc_cpg.h>
69 
70 #ifndef MAP_ANONYMOUS
71 #define MAP_ANONYMOUS MAP_ANON
72 #endif
73 
74 #include "service.h"
75 
76 LOGSYS_DECLARE_SUBSYS ("CPG");
77 
78 #define GROUP_HASH_SIZE 32
79 
87 };
88 
89 struct zcb_mapped {
90  struct list_head list;
91  void *addr;
92  size_t size;
93 };
94 /*
95  * state` exec deliver
96  * match group name, pid -> if matched deliver for YES:
97  * XXX indicates impossible state
98  *
99  * join leave mcast
100  * UNJOINED XXX XXX NO
101  * LEAVE_STARTED XXX YES(unjoined_enter) YES
102  * JOIN_STARTED YES(join_started_enter) XXX NO
103  * JOIN_COMPLETED XXX NO YES
104  *
105  * join_started_enter
106  * set JOIN_COMPLETED
107  * add entry to process_info list
108  * unjoined_enter
109  * set UNJOINED
110  * delete entry from process_info list
111  *
112  *
113  * library accept join error codes
114  * UNJOINED YES(CS_OK) set JOIN_STARTED
115  * LEAVE_STARTED NO(CS_ERR_BUSY)
116  * JOIN_STARTED NO(CS_ERR_EXIST)
117  * JOIN_COMPlETED NO(CS_ERR_EXIST)
118  *
119  * library accept leave error codes
120  * UNJOINED NO(CS_ERR_NOT_EXIST)
121  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
122  * JOIN_STARTED NO(CS_ERR_BUSY)
123  * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
124  *
125  * library accept mcast
126  * UNJOINED NO(CS_ERR_NOT_EXIST)
127  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
128  * JOIN_STARTED YES(CS_OK)
129  * JOIN_COMPLETED YES(CS_OK)
130  */
131 enum cpd_state {
136 };
137 
141 };
142 
147 };
148 static enum cpg_downlist_state_e downlist_state;
149 static struct list_head downlist_messages_head;
150 static struct list_head joinlist_messages_head;
151 
152 struct cpg_pd {
153  void *conn;
155  uint32_t pid;
157  unsigned int flags;
159  struct list_head list;
162 };
163 
166  struct list_head list;
167  struct list_head items_list_head; /* List of process_info */
169 };
170 
171 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
172 
173 DECLARE_LIST_INIT(cpg_pd_list_head);
174 
175 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
176 
177 static unsigned int my_member_list_entries;
178 
179 static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
180 
181 static unsigned int my_old_member_list_entries = 0;
182 
183 static struct corosync_api_v1 *api = NULL;
184 
185 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
186 
187 static mar_cpg_ring_id_t last_sync_ring_id;
188 
189 struct process_info {
190  unsigned int nodeid;
191  uint32_t pid;
193  struct list_head list; /* on the group_info members list */
194 };
195 DECLARE_LIST_INIT(process_info_list_head);
196 
198  uint32_t pid;
200 };
201 
202 /*
203  * Service Interfaces required by service_message_handler struct
204  */
205 static char *cpg_exec_init_fn (struct corosync_api_v1 *);
206 
207 static int cpg_lib_init_fn (void *conn);
208 
209 static int cpg_lib_exit_fn (void *conn);
210 
211 static void message_handler_req_exec_cpg_procjoin (
212  const void *message,
213  unsigned int nodeid);
214 
215 static void message_handler_req_exec_cpg_procleave (
216  const void *message,
217  unsigned int nodeid);
218 
219 static void message_handler_req_exec_cpg_joinlist (
220  const void *message,
221  unsigned int nodeid);
222 
223 static void message_handler_req_exec_cpg_mcast (
224  const void *message,
225  unsigned int nodeid);
226 
227 static void message_handler_req_exec_cpg_downlist_old (
228  const void *message,
229  unsigned int nodeid);
230 
231 static void message_handler_req_exec_cpg_downlist (
232  const void *message,
233  unsigned int nodeid);
234 
235 static void exec_cpg_procjoin_endian_convert (void *msg);
236 
237 static void exec_cpg_joinlist_endian_convert (void *msg);
238 
239 static void exec_cpg_mcast_endian_convert (void *msg);
240 
241 static void exec_cpg_downlist_endian_convert_old (void *msg);
242 
243 static void exec_cpg_downlist_endian_convert (void *msg);
244 
245 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
246 
247 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
248 
249 static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
250 
251 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
252 
253 static void message_handler_req_lib_cpg_membership (void *conn,
254  const void *message);
255 
256 static void message_handler_req_lib_cpg_local_get (void *conn,
257  const void *message);
258 
259 static void message_handler_req_lib_cpg_iteration_initialize (
260  void *conn,
261  const void *message);
262 
263 static void message_handler_req_lib_cpg_iteration_next (
264  void *conn,
265  const void *message);
266 
267 static void message_handler_req_lib_cpg_iteration_finalize (
268  void *conn,
269  const void *message);
270 
271 static void message_handler_req_lib_cpg_zc_alloc (
272  void *conn,
273  const void *message);
274 
275 static void message_handler_req_lib_cpg_zc_free (
276  void *conn,
277  const void *message);
278 
279 static void message_handler_req_lib_cpg_zc_execute (
280  void *conn,
281  const void *message);
282 
283 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
284 
285 static int cpg_exec_send_downlist(void);
286 
287 static int cpg_exec_send_joinlist(void);
288 
289 static void downlist_messages_delete (void);
290 
291 static void downlist_master_choose_and_send (void);
292 
293 static void joinlist_inform_clients (void);
294 
295 static void joinlist_messages_delete (void);
296 
297 static void cpg_sync_init (
298  const unsigned int *trans_list,
299  size_t trans_list_entries,
300  const unsigned int *member_list,
301  size_t member_list_entries,
302  const struct memb_ring_id *ring_id);
303 
304 static int cpg_sync_process (void);
305 
306 static void cpg_sync_activate (void);
307 
308 static void cpg_sync_abort (void);
309 
310 static void do_proc_join(
311  const mar_cpg_name_t *name,
312  uint32_t pid,
313  unsigned int nodeid,
314  int reason);
315 
316 static int notify_lib_totem_membership (
317  void *conn,
318  int member_list_entries,
319  const unsigned int *member_list);
320 
321 static inline int zcb_all_free (
322  struct cpg_pd *cpd);
323 
324 static char *cpg_print_group_name (
325  const mar_cpg_name_t *group);
326 
327 /*
328  * Library Handler Definition
329  */
330 static struct corosync_lib_handler cpg_lib_engine[] =
331 {
332  { /* 0 - MESSAGE_REQ_CPG_JOIN */
333  .lib_handler_fn = message_handler_req_lib_cpg_join,
334  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
335  },
336  { /* 1 - MESSAGE_REQ_CPG_LEAVE */
337  .lib_handler_fn = message_handler_req_lib_cpg_leave,
338  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
339  },
340  { /* 2 - MESSAGE_REQ_CPG_MCAST */
341  .lib_handler_fn = message_handler_req_lib_cpg_mcast,
342  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
343  },
344  { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
345  .lib_handler_fn = message_handler_req_lib_cpg_membership,
346  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
347  },
348  { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
349  .lib_handler_fn = message_handler_req_lib_cpg_local_get,
350  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
351  },
352  { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
353  .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
354  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
355  },
356  { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
357  .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
358  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
359  },
360  { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
361  .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
362  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
363  },
364  { /* 8 - MESSAGE_REQ_CPG_FINALIZE */
365  .lib_handler_fn = message_handler_req_lib_cpg_finalize,
366  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
367  },
368  { /* 9 */
369  .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
370  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
371  },
372  { /* 10 */
373  .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
374  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
375  },
376  { /* 11 */
377  .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
378  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
379  },
380 
381 
382 };
383 
384 static struct corosync_exec_handler cpg_exec_engine[] =
385 {
386  { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
387  .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
388  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
389  },
390  { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
391  .exec_handler_fn = message_handler_req_exec_cpg_procleave,
392  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
393  },
394  { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
395  .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
396  .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
397  },
398  { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
399  .exec_handler_fn = message_handler_req_exec_cpg_mcast,
400  .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
401  },
402  { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
403  .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
404  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
405  },
406  { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
407  .exec_handler_fn = message_handler_req_exec_cpg_downlist,
408  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
409  },
410 };
411 
413  .name = "corosync cluster closed process group service v1.01",
414  .id = CPG_SERVICE,
415  .priority = 1,
416  .private_data_size = sizeof (struct cpg_pd),
417  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
418  .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
419  .lib_init_fn = cpg_lib_init_fn,
420  .lib_exit_fn = cpg_lib_exit_fn,
421  .lib_engine = cpg_lib_engine,
422  .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
423  .exec_init_fn = cpg_exec_init_fn,
424  .exec_dump_fn = NULL,
425  .exec_engine = cpg_exec_engine,
426  .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
427  .sync_init = cpg_sync_init,
428  .sync_process = cpg_sync_process,
429  .sync_activate = cpg_sync_activate,
430  .sync_abort = cpg_sync_abort
431 };
432 
434 {
435  return (&cpg_service_engine);
436 }
437 
439  struct qb_ipc_request_header header __attribute__((aligned(8)));
440  mar_cpg_name_t group_name __attribute__((aligned(8)));
441  mar_uint32_t pid __attribute__((aligned(8)));
442  mar_uint32_t reason __attribute__((aligned(8)));
443 };
444 
446  struct qb_ipc_request_header header __attribute__((aligned(8)));
447  mar_cpg_name_t group_name __attribute__((aligned(8)));
448  mar_uint32_t msglen __attribute__((aligned(8)));
449  mar_uint32_t pid __attribute__((aligned(8)));
450  mar_message_source_t source __attribute__((aligned(8)));
451  mar_uint8_t message[] __attribute__((aligned(8)));
452 };
453 
455  struct qb_ipc_request_header header __attribute__((aligned(8)));
456  mar_uint32_t left_nodes __attribute__((aligned(8)));
457  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
458 };
459 
461  struct qb_ipc_request_header header __attribute__((aligned(8)));
462  /* merge decisions */
463  mar_uint32_t old_members __attribute__((aligned(8)));
464  /* downlist below */
465  mar_uint32_t left_nodes __attribute__((aligned(8)));
466  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
467 };
468 
469 struct downlist_msg {
471  mar_uint32_t old_members __attribute__((aligned(8)));
472  mar_uint32_t left_nodes __attribute__((aligned(8)));
473  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
474  struct list_head list;
475 };
476 
477 struct joinlist_msg {
479  uint32_t pid;
481  struct list_head list;
482 };
483 
484 static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
485 
486 /*
487  * Function print group name. It's not reentrant
488  */
489 static char *cpg_print_group_name(const mar_cpg_name_t *group)
490 {
491  static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
492  int dest_pos = 0;
493  char c;
494  int i;
495 
496  for (i = 0; i < group->length; i++) {
497  c = group->value[i];
498 
499  if (c >= ' ' && c < 0x7f && c != '\\') {
500  res[dest_pos++] = c;
501  } else {
502  if (c == '\\') {
503  res[dest_pos++] = '\\';
504  res[dest_pos++] = '\\';
505  } else {
506  snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
507  dest_pos += 4;
508  }
509  }
510  }
511  res[dest_pos] = 0;
512 
513  return (res);
514 }
515 
516 static void cpg_sync_init (
517  const unsigned int *trans_list,
518  size_t trans_list_entries,
519  const unsigned int *member_list,
520  size_t member_list_entries,
521  const struct memb_ring_id *ring_id)
522 {
523  int entries;
524  int i, j;
525  int found;
526 
527  my_sync_state = CPGSYNC_DOWNLIST;
528 
529  memcpy (my_member_list, member_list, member_list_entries *
530  sizeof (unsigned int));
531  my_member_list_entries = member_list_entries;
532 
533  last_sync_ring_id.nodeid = ring_id->rep.nodeid;
534  last_sync_ring_id.seq = ring_id->seq;
535 
536  downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
537 
538  entries = 0;
539  /*
540  * Determine list of nodeids for downlist message
541  */
542  for (i = 0; i < my_old_member_list_entries; i++) {
543  found = 0;
544  for (j = 0; j < trans_list_entries; j++) {
545  if (my_old_member_list[i] == trans_list[j]) {
546  found = 1;
547  break;
548  }
549  }
550  if (found == 0) {
551  g_req_exec_cpg_downlist.nodeids[entries++] =
552  my_old_member_list[i];
553  }
554  }
555  g_req_exec_cpg_downlist.left_nodes = entries;
556 }
557 
558 static int cpg_sync_process (void)
559 {
560  int res = -1;
561 
562  if (my_sync_state == CPGSYNC_DOWNLIST) {
563  res = cpg_exec_send_downlist();
564  if (res == -1) {
565  return (-1);
566  }
567  my_sync_state = CPGSYNC_JOINLIST;
568  }
569  if (my_sync_state == CPGSYNC_JOINLIST) {
570  res = cpg_exec_send_joinlist();
571  }
572  return (res);
573 }
574 
575 static void cpg_sync_activate (void)
576 {
577  memcpy (my_old_member_list, my_member_list,
578  my_member_list_entries * sizeof (unsigned int));
579  my_old_member_list_entries = my_member_list_entries;
580 
581  if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
582  downlist_master_choose_and_send ();
583  }
584 
585  joinlist_inform_clients ();
586 
587  downlist_messages_delete ();
588  downlist_state = CPG_DOWNLIST_NONE;
589  joinlist_messages_delete ();
590 
591  notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
592 }
593 
594 static void cpg_sync_abort (void)
595 {
596  downlist_state = CPG_DOWNLIST_NONE;
597  downlist_messages_delete ();
598  joinlist_messages_delete ();
599 }
600 
601 static int notify_lib_totem_membership (
602  void *conn,
603  int member_list_entries,
604  const unsigned int *member_list)
605 {
606  struct list_head *iter;
607  char *buf;
608  int size;
610 
611  size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
612  sizeof(mar_uint32_t) * (member_list_entries);
613  buf = alloca(size);
614  if (!buf)
615  return CS_ERR_LIBRARY;
616 
617  res = (struct res_lib_cpg_totem_confchg_callback *)buf;
618  res->member_list_entries = member_list_entries;
619  res->header.size = size;
621  res->header.error = CS_OK;
622 
623  memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
624  memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
625 
626  if (conn == NULL) {
627  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
628  struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
629  api->ipc_dispatch_send (cpg_pd->conn, buf, size);
630  }
631  } else {
632  api->ipc_dispatch_send (conn, buf, size);
633  }
634 
635  return CS_OK;
636 }
637 
638 static int notify_lib_joinlist(
639  const mar_cpg_name_t *group_name,
640  void *conn,
641  int joined_list_entries,
642  mar_cpg_address_t *joined_list,
643  int left_list_entries,
644  mar_cpg_address_t *left_list,
645  int id)
646 {
647  int size;
648  char *buf;
649  struct list_head *iter;
650  int count;
651  struct res_lib_cpg_confchg_callback *res;
652  mar_cpg_address_t *retgi;
653 
654  count = 0;
655 
656  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
657  struct process_info *pi = list_entry (iter, struct process_info, list);
658  if (mar_name_compare (&pi->group, group_name) == 0) {
659  int i;
660  int founded = 0;
661 
662  for (i = 0; i < left_list_entries; i++) {
663  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
664  founded++;
665  }
666  }
667 
668  if (!founded)
669  count++;
670  }
671  }
672 
673  size = sizeof(struct res_lib_cpg_confchg_callback) +
674  sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
675  buf = alloca(size);
676  if (!buf)
677  return CS_ERR_LIBRARY;
678 
679  res = (struct res_lib_cpg_confchg_callback *)buf;
680  res->joined_list_entries = joined_list_entries;
681  res->left_list_entries = left_list_entries;
682  res->member_list_entries = count;
683  retgi = res->member_list;
684  res->header.size = size;
685  res->header.id = id;
686  res->header.error = CS_OK;
687  memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
688 
689  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
690  struct process_info *pi=list_entry (iter, struct process_info, list);
691 
692  if (mar_name_compare (&pi->group, group_name) == 0) {
693  int i;
694  int founded = 0;
695 
696  for (i = 0;i < left_list_entries; i++) {
697  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
698  founded++;
699  }
700  }
701 
702  if (!founded) {
703  retgi->nodeid = pi->nodeid;
704  retgi->pid = pi->pid;
705  retgi++;
706  }
707  }
708  }
709 
710  if (left_list_entries) {
711  memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
712  retgi += left_list_entries;
713  }
714 
715  if (joined_list_entries) {
716  memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
717  retgi += joined_list_entries;
718  }
719 
720  if (conn) {
721  api->ipc_dispatch_send (conn, buf, size);
722  } else {
723  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
724  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
725  if (mar_name_compare (&cpd->group_name, group_name) == 0) {
726  assert (joined_list_entries <= 1);
727  if (joined_list_entries) {
728  if (joined_list[0].pid == cpd->pid &&
729  joined_list[0].nodeid == api->totem_nodeid_get()) {
731  }
732  }
733  if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
735 
736  api->ipc_dispatch_send (cpd->conn, buf, size);
737  }
738  if (left_list_entries) {
739  if (left_list[0].pid == cpd->pid &&
740  left_list[0].nodeid == api->totem_nodeid_get() &&
741  left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) {
742 
743  cpd->pid = 0;
744  memset (&cpd->group_name, 0, sizeof(cpd->group_name));
746  }
747  }
748  }
749  }
750  }
751 
752 
753  /*
754  * Traverse thru cpds and send totem membership for cpd, where it is not send yet
755  */
756  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
757  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
758 
760  cpd->initial_totem_conf_sent = 1;
761 
762  notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
763  }
764  }
765 
766  return CS_OK;
767 }
768 
769 static void downlist_log(const char *msg, struct downlist_msg* dl)
770 {
771  log_printf (LOG_DEBUG,
772  "%s: sender %s; members(old:%d left:%d)",
773  msg,
775  dl->old_members,
776  dl->left_nodes);
777 }
778 
779 static struct downlist_msg* downlist_master_choose (void)
780 {
781  struct downlist_msg *cmp;
782  struct downlist_msg *best = NULL;
783  struct list_head *iter;
784  uint32_t cmp_members;
785  uint32_t best_members;
786  uint32_t i;
787  int ignore_msg;
788 
789  for (iter = downlist_messages_head.next;
790  iter != &downlist_messages_head;
791  iter = iter->next) {
792 
793  cmp = list_entry(iter, struct downlist_msg, list);
794  downlist_log("comparing", cmp);
795 
796  ignore_msg = 0;
797  for (i = 0; i < cmp->left_nodes; i++) {
798  if (cmp->nodeids[i] == api->totem_nodeid_get()) {
799  log_printf (LOG_DEBUG, "Ignoring this entry because I'm in the left list\n");
800 
801  ignore_msg = 1;
802  break;
803  }
804  }
805 
806  if (ignore_msg) {
807  continue ;
808  }
809 
810  if (best == NULL) {
811  best = cmp;
812  continue;
813  }
814 
815  best_members = best->old_members - best->left_nodes;
816  cmp_members = cmp->old_members - cmp->left_nodes;
817 
818  if (cmp_members > best_members) {
819  best = cmp;
820  } else if (cmp_members == best_members) {
821  if (cmp->old_members > best->old_members) {
822  best = cmp;
823  } else if (cmp->old_members == best->old_members) {
824  if (cmp->sender_nodeid < best->sender_nodeid) {
825  best = cmp;
826  }
827  }
828  }
829  }
830 
831  assert (best != NULL);
832 
833  return best;
834 }
835 
836 static void downlist_master_choose_and_send (void)
837 {
838  struct downlist_msg *stored_msg;
839  struct list_head *iter;
840  struct process_info *left_pi;
841  qb_map_t *group_map;
842  struct cpg_name cpg_group;
843  mar_cpg_name_t group;
844  struct confchg_data{
845  struct cpg_name cpg_group;
847  int left_list_entries;
848  struct list_head list;
849  } *pcd;
850  qb_map_iter_t *miter;
851  int i, size;
852 
853  downlist_state = CPG_DOWNLIST_APPLYING;
854 
855  stored_msg = downlist_master_choose ();
856  if (!stored_msg) {
857  log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist");
858  return;
859  }
860  downlist_log("chosen downlist", stored_msg);
861 
862  group_map = qb_skiplist_create();
863 
864  /*
865  * only the cpg groups included in left nodes should receive
866  * confchg event, so we will collect these cpg groups and
867  * relative left_lists here.
868  */
869  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
870  struct process_info *pi = list_entry(iter, struct process_info, list);
871  iter = iter->next;
872 
873  left_pi = NULL;
874  for (i = 0; i < stored_msg->left_nodes; i++) {
875 
876  if (pi->nodeid == stored_msg->nodeids[i]) {
877  left_pi = pi;
878  break;
879  }
880  }
881 
882  if (left_pi) {
883  marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
884  cpg_group.value[cpg_group.length] = 0;
885 
886  pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
887  if (pcd == NULL) {
888  pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
889  memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
890  qb_map_put(group_map, pcd->cpg_group.value, pcd);
891  }
892  size = pcd->left_list_entries;
893  pcd->left_list[size].nodeid = left_pi->nodeid;
894  pcd->left_list[size].pid = left_pi->pid;
895  pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
896  pcd->left_list_entries++;
897  list_del (&left_pi->list);
898  free (left_pi);
899  }
900  }
901 
902  /* send only one confchg event per cpg group */
903  miter = qb_map_iter_create(group_map);
904  while (qb_map_iter_next(miter, (void **)&pcd)) {
905  marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
906 
907  log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
908  for (i=0; i<pcd->left_list_entries; i++) {
909  log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
910  i, cpg_print_group_name(&group),
911  (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
912  pcd->left_list[i].pid);
913  }
914 
915  /* send confchg event */
916  notify_lib_joinlist(&group, NULL,
917  0, NULL,
918  pcd->left_list_entries,
919  pcd->left_list,
921 
922  free(pcd);
923  }
924  qb_map_iter_free(miter);
925  qb_map_destroy(group_map);
926 }
927 
928 static void joinlist_inform_clients (void)
929 {
930  struct joinlist_msg *stored_msg;
931  struct list_head *iter;
932  unsigned int i;
933 
934  i = 0;
935  for (iter = joinlist_messages_head.next;
936  iter != &joinlist_messages_head;
937  iter = iter->next) {
938 
939  stored_msg = list_entry(iter, struct joinlist_msg, list);
940 
941  log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
942  i++, cpg_print_group_name(&stored_msg->group_name),
943  (char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
944  stored_msg->pid);
945 
946  /* Ignore our own messages */
947  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
948  continue ;
949  }
950 
951  do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
953  }
954 }
955 
956 static void downlist_messages_delete (void)
957 {
958  struct downlist_msg *stored_msg;
959  struct list_head *iter, *iter_next;
960 
961  for (iter = downlist_messages_head.next;
962  iter != &downlist_messages_head;
963  iter = iter_next) {
964 
965  iter_next = iter->next;
966 
967  stored_msg = list_entry(iter, struct downlist_msg, list);
968  list_del (&stored_msg->list);
969  free (stored_msg);
970  }
971 }
972 
973 static void joinlist_messages_delete (void)
974 {
975  struct joinlist_msg *stored_msg;
976  struct list_head *iter, *iter_next;
977 
978  for (iter = joinlist_messages_head.next;
979  iter != &joinlist_messages_head;
980  iter = iter_next) {
981 
982  iter_next = iter->next;
983 
984  stored_msg = list_entry(iter, struct joinlist_msg, list);
985  list_del (&stored_msg->list);
986  free (stored_msg);
987  }
988  list_init (&joinlist_messages_head);
989 }
990 
991 static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
992 {
993  list_init (&downlist_messages_head);
994  list_init (&joinlist_messages_head);
995  api = corosync_api;
996  return (NULL);
997 }
998 
999 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
1000 {
1001  struct list_head *iter, *iter_next;
1002  struct process_info *pi;
1003 
1004  for (iter = cpg_iteration_instance->items_list_head.next;
1005  iter != &cpg_iteration_instance->items_list_head;
1006  iter = iter_next) {
1007 
1008  iter_next = iter->next;
1009 
1010  pi = list_entry (iter, struct process_info, list);
1011  list_del (&pi->list);
1012  free (pi);
1013  }
1014 
1015  list_del (&cpg_iteration_instance->list);
1016  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
1017 }
1018 
1019 static void cpg_pd_finalize (struct cpg_pd *cpd)
1020 {
1021  struct list_head *iter, *iter_next;
1022  struct cpg_iteration_instance *cpii;
1023 
1024  zcb_all_free(cpd);
1025  for (iter = cpd->iteration_instance_list_head.next;
1026  iter != &cpd->iteration_instance_list_head;
1027  iter = iter_next) {
1028 
1029  iter_next = iter->next;
1030 
1031  cpii = list_entry (iter, struct cpg_iteration_instance, list);
1032 
1033  cpg_iteration_instance_finalize (cpii);
1034  }
1035 
1036  list_del (&cpd->list);
1037 }
1038 
1039 static int cpg_lib_exit_fn (void *conn)
1040 {
1041  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1042 
1043  log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
1044 
1045  if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
1046  cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
1048  }
1049 
1050  cpg_pd_finalize (cpd);
1051 
1052  api->ipc_refcnt_dec (conn);
1053  return (0);
1054 }
1055 
1056 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
1057 {
1059  struct iovec req_exec_cpg_iovec;
1060  int result;
1061 
1062  memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
1063  req_exec_cpg_procjoin.pid = pid;
1064  req_exec_cpg_procjoin.reason = reason;
1065 
1066  req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
1068 
1069  req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
1070  req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
1071 
1072  result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
1073 
1074  return (result);
1075 }
1076 
1077 /* Can byteswap join & leave messages */
1078 static void exec_cpg_procjoin_endian_convert (void *msg)
1079 {
1081 
1082  req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
1083  swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1084  req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
1085 }
1086 
1087 static void exec_cpg_joinlist_endian_convert (void *msg_v)
1088 {
1089  char *msg = msg_v;
1090  struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
1091  struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
1092 
1093  swab_mar_int32_t (&res->size);
1094 
1095  while ((const char*)jle < msg + res->size) {
1096  jle->pid = swab32(jle->pid);
1097  swab_mar_cpg_name_t (&jle->group_name);
1098  jle++;
1099  }
1100 }
1101 
1102 static void exec_cpg_downlist_endian_convert_old (void *msg)
1103 {
1104 }
1105 
1106 static void exec_cpg_downlist_endian_convert (void *msg)
1107 {
1109  unsigned int i;
1110 
1111  req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
1112  req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
1113 
1114  for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1115  req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
1116  }
1117 }
1118 
1119 
1120 static void exec_cpg_mcast_endian_convert (void *msg)
1121 {
1122  struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
1123 
1124  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1125  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1126  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1127  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1128  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1129 }
1130 
1131 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
1132  struct list_head *iter;
1133 
1134  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1135  struct process_info *pi = list_entry (iter, struct process_info, list);
1136  iter = iter->next;
1137 
1138  if (pi->pid == pid && pi->nodeid == nodeid &&
1139  mar_name_compare (&pi->group, group_name) == 0) {
1140  return pi;
1141  }
1142  }
1143 
1144  return NULL;
1145 }
1146 
1147 static void do_proc_join(
1148  const mar_cpg_name_t *name,
1149  uint32_t pid,
1150  unsigned int nodeid,
1151  int reason)
1152 {
1153  struct process_info *pi;
1154  struct process_info *pi_entry;
1155  mar_cpg_address_t notify_info;
1156  struct list_head *list;
1157  struct list_head *list_to_add = NULL;
1158 
1159  if (process_info_find (name, pid, nodeid) != NULL) {
1160  return ;
1161  }
1162  pi = malloc (sizeof (struct process_info));
1163  if (!pi) {
1164  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
1165  return;
1166  }
1167  pi->nodeid = nodeid;
1168  pi->pid = pid;
1169  memcpy(&pi->group, name, sizeof(*name));
1170  list_init(&pi->list);
1171 
1172  /*
1173  * Insert new process in sorted order so synchronization works properly
1174  */
1175  list_to_add = &process_info_list_head;
1176  for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
1177 
1178  pi_entry = list_entry(list, struct process_info, list);
1179  if (pi_entry->nodeid > pi->nodeid ||
1180  (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
1181 
1182  break;
1183  }
1184  list_to_add = list;
1185  }
1186  list_add (&pi->list, list_to_add);
1187 
1188  notify_info.pid = pi->pid;
1189  notify_info.nodeid = nodeid;
1190  notify_info.reason = reason;
1191 
1192  notify_lib_joinlist(&pi->group, NULL,
1193  1, &notify_info,
1194  0, NULL,
1196 }
1197 
1198 static void message_handler_req_exec_cpg_downlist_old (
1199  const void *message,
1200  unsigned int nodeid)
1201 {
1202  log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node %d",
1203  nodeid);
1204 }
1205 
1206 static void message_handler_req_exec_cpg_downlist(
1207  const void *message,
1208  unsigned int nodeid)
1209 {
1210  const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1211  int i;
1212  struct list_head *iter;
1213  struct downlist_msg *stored_msg;
1214  int found;
1215 
1216  if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
1217  log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
1218  req_exec_cpg_downlist->left_nodes, downlist_state);
1219  return;
1220  }
1221 
1222  stored_msg = malloc (sizeof (struct downlist_msg));
1223  stored_msg->sender_nodeid = nodeid;
1224  stored_msg->old_members = req_exec_cpg_downlist->old_members;
1225  stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
1226  memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
1227  req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
1228  list_init (&stored_msg->list);
1229  list_add (&stored_msg->list, &downlist_messages_head);
1230 
1231  for (i = 0; i < my_member_list_entries; i++) {
1232  found = 0;
1233  for (iter = downlist_messages_head.next;
1234  iter != &downlist_messages_head;
1235  iter = iter->next) {
1236 
1237  stored_msg = list_entry(iter, struct downlist_msg, list);
1238  if (my_member_list[i] == stored_msg->sender_nodeid) {
1239  found = 1;
1240  }
1241  }
1242  if (!found) {
1243  return;
1244  }
1245  }
1246 
1247  downlist_master_choose_and_send ();
1248 }
1249 
1250 
1251 static void message_handler_req_exec_cpg_procjoin (
1252  const void *message,
1253  unsigned int nodeid)
1254 {
1255  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1256 
1257  log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d (%s) for pid %u",
1258  nodeid,
1259  api->totem_ifaces_print(nodeid),
1260  (unsigned int)req_exec_cpg_procjoin->pid);
1261 
1262  do_proc_join (&req_exec_cpg_procjoin->group_name,
1263  req_exec_cpg_procjoin->pid, nodeid,
1265 }
1266 
1267 static void message_handler_req_exec_cpg_procleave (
1268  const void *message,
1269  unsigned int nodeid)
1270 {
1271  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1272  struct process_info *pi;
1273  struct list_head *iter;
1274  mar_cpg_address_t notify_info;
1275 
1276  log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node %d", nodeid);
1277 
1278  notify_info.pid = req_exec_cpg_procjoin->pid;
1279  notify_info.nodeid = nodeid;
1280  notify_info.reason = req_exec_cpg_procjoin->reason;
1281 
1282  notify_lib_joinlist(&req_exec_cpg_procjoin->group_name, NULL,
1283  0, NULL,
1284  1, &notify_info,
1286 
1287  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1288  pi = list_entry(iter, struct process_info, list);
1289  iter = iter->next;
1290 
1291  if (pi->pid == req_exec_cpg_procjoin->pid && pi->nodeid == nodeid &&
1292  mar_name_compare (&pi->group, &req_exec_cpg_procjoin->group_name)==0) {
1293  list_del (&pi->list);
1294  free (pi);
1295  }
1296  }
1297 }
1298 
1299 
1300 /* Got a proclist from another node */
1301 static void message_handler_req_exec_cpg_joinlist (
1302  const void *message_v,
1303  unsigned int nodeid)
1304 {
1305  const char *message = message_v;
1306  const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
1307  const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
1308  struct joinlist_msg *stored_msg;
1309 
1310  log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node %x",
1311  nodeid);
1312 
1313  while ((const char*)jle < message + res->size) {
1314  stored_msg = malloc (sizeof (struct joinlist_msg));
1315  memset(stored_msg, 0, sizeof (struct joinlist_msg));
1316  stored_msg->sender_nodeid = nodeid;
1317  stored_msg->pid = jle->pid;
1318  memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
1319  list_init (&stored_msg->list);
1320  list_add (&stored_msg->list, &joinlist_messages_head);
1321  jle++;
1322  }
1323 }
1324 
1325 static void message_handler_req_exec_cpg_mcast (
1326  const void *message,
1327  unsigned int nodeid)
1328 {
1329  const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
1331  int msglen = req_exec_cpg_mcast->msglen;
1332  struct list_head *iter, *pi_iter;
1333  struct cpg_pd *cpd;
1334  struct iovec iovec[2];
1335  int known_node = 0;
1336 
1338  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1339  res_lib_cpg_mcast.msglen = msglen;
1340  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1341  res_lib_cpg_mcast.nodeid = nodeid;
1342 
1343  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1344  sizeof(mar_cpg_name_t));
1345  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1346  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1347 
1348  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1349  iovec[1].iov_len = msglen;
1350 
1351  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1352  cpd = list_entry(iter, struct cpg_pd, list);
1353  iter = iter->next;
1354 
1356  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1357 
1358  if (!known_node) {
1359  /* Try to find, if we know the node */
1360  for (pi_iter = process_info_list_head.next;
1361  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1362 
1363  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1364 
1365  if (pi->nodeid == nodeid &&
1366  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1367  known_node = 1;
1368  break;
1369  }
1370  }
1371  }
1372 
1373  if (!known_node) {
1374  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1375  return ;
1376  }
1377 
1378  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1379  }
1380  }
1381 }
1382 
1383 
1384 static int cpg_exec_send_downlist(void)
1385 {
1386  struct iovec iov;
1387 
1388  g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
1389  g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
1390 
1391  g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1392 
1393  iov.iov_base = (void *)&g_req_exec_cpg_downlist;
1394  iov.iov_len = g_req_exec_cpg_downlist.header.size;
1395 
1396  return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
1397 }
1398 
1399 static int cpg_exec_send_joinlist(void)
1400 {
1401  int count = 0;
1402  struct list_head *iter;
1403  struct qb_ipc_response_header *res;
1404  char *buf;
1405  struct join_list_entry *jle;
1406  struct iovec req_exec_cpg_iovec;
1407 
1408  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1409  struct process_info *pi = list_entry (iter, struct process_info, list);
1410 
1411  if (pi->nodeid == api->totem_nodeid_get ()) {
1412  count++;
1413  }
1414  }
1415 
1416  /* Nothing to send */
1417  if (!count)
1418  return 0;
1419 
1420  buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
1421  if (!buf) {
1422  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
1423  return -1;
1424  }
1425 
1426  jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
1427  res = (struct qb_ipc_response_header *)buf;
1428 
1429  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1430  struct process_info *pi = list_entry (iter, struct process_info, list);
1431 
1432  if (pi->nodeid == api->totem_nodeid_get ()) {
1433  memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
1434  jle->pid = pi->pid;
1435  jle++;
1436  }
1437  }
1438 
1440  res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
1441 
1442  req_exec_cpg_iovec.iov_base = buf;
1443  req_exec_cpg_iovec.iov_len = res->size;
1444 
1445  return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
1446 }
1447 
1448 static int cpg_lib_init_fn (void *conn)
1449 {
1450  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1451  memset (cpd, 0, sizeof(struct cpg_pd));
1452  cpd->conn = conn;
1453  list_add (&cpd->list, &cpg_pd_list_head);
1454 
1455  list_init (&cpd->iteration_instance_list_head);
1456  list_init (&cpd->zcb_mapped_list_head);
1457 
1458  api->ipc_refcnt_inc (conn);
1459  log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
1460  return (0);
1461 }
1462 
1463 /* Join message from the library */
1464 static void message_handler_req_lib_cpg_join (void *conn, const void *message)
1465 {
1466  const struct req_lib_cpg_join *req_lib_cpg_join = message;
1467  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1469  cs_error_t error = CS_OK;
1470  struct list_head *iter;
1471 
1472  /* Test, if we don't have same pid and group name joined */
1473  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
1474  struct cpg_pd *cpd_item = list_entry (iter, struct cpg_pd, list);
1475 
1476  if (cpd_item->pid == req_lib_cpg_join->pid &&
1477  mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
1478 
1479  /* We have same pid and group name joined -> return error */
1480  error = CS_ERR_EXIST;
1481  goto response_send;
1482  }
1483  }
1484 
1485  /*
1486  * Same check must be done in process info list, because there may be not yet delivered
1487  * leave of client.
1488  */
1489  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1490  struct process_info *pi = list_entry (iter, struct process_info, list);
1491 
1492  if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
1493  mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
1494  /* We have same pid and group name joined -> return error */
1495  error = CS_ERR_TRY_AGAIN;
1496  goto response_send;
1497  }
1498  }
1499 
1500  if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
1501  error = CS_ERR_NAME_TOO_LONG;
1502  goto response_send;
1503  }
1504 
1505  switch (cpd->cpd_state) {
1506  case CPD_STATE_UNJOINED:
1507  error = CS_OK;
1509  cpd->pid = req_lib_cpg_join->pid;
1510  cpd->flags = req_lib_cpg_join->flags;
1511  memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
1512  sizeof (cpd->group_name));
1513 
1514  cpg_node_joinleave_send (req_lib_cpg_join->pid,
1515  &req_lib_cpg_join->group_name,
1517  break;
1519  error = CS_ERR_BUSY;
1520  break;
1522  error = CS_ERR_EXIST;
1523  break;
1525  error = CS_ERR_EXIST;
1526  break;
1527  }
1528 
1529 response_send:
1530  res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1532  res_lib_cpg_join.header.error = error;
1533  api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1534 }
1535 
1536 /* Leave message from the library */
1537 static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
1538 {
1540  cs_error_t error = CS_OK;
1541  struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
1542  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1543 
1544  log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
1545 
1546  switch (cpd->cpd_state) {
1547  case CPD_STATE_UNJOINED:
1548  error = CS_ERR_NOT_EXIST;
1549  break;
1551  error = CS_ERR_NOT_EXIST;
1552  break;
1554  error = CS_ERR_BUSY;
1555  break;
1557  error = CS_OK;
1559  cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1560  &req_lib_cpg_leave->group_name,
1563  break;
1564  }
1565 
1566  /* send return */
1567  res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1569  res_lib_cpg_leave.header.error = error;
1571 }
1572 
1573 /* Finalize message from library */
1574 static void message_handler_req_lib_cpg_finalize (
1575  void *conn,
1576  const void *message)
1577 {
1578  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1580  cs_error_t error = CS_OK;
1581 
1582  log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
1583 
1584  /*
1585  * We will just remove cpd from list. After this call, connection will be
1586  * closed on lib side, and cpg_lib_exit_fn will be called
1587  */
1588  list_del (&cpd->list);
1589  list_init (&cpd->list);
1590 
1591  res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
1593  res_lib_cpg_finalize.header.error = error;
1594 
1596  sizeof (res_lib_cpg_finalize));
1597 }
1598 
1599 static int
1600 memory_map (
1601  const char *path,
1602  size_t bytes,
1603  void **buf)
1604 {
1605  int32_t fd;
1606  void *addr;
1607  int32_t res;
1608 
1609  fd = open (path, O_RDWR, 0600);
1610 
1611  unlink (path);
1612 
1613  if (fd == -1) {
1614  return (-1);
1615  }
1616 
1617  res = ftruncate (fd, bytes);
1618  if (res == -1) {
1619  goto error_close_unlink;
1620  }
1621 
1622  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1623  MAP_SHARED, fd, 0);
1624 
1625  if (addr == MAP_FAILED) {
1626  goto error_close_unlink;
1627  }
1628 #ifdef MADV_NOSYNC
1629  madvise(addr, bytes, MADV_NOSYNC);
1630 #endif
1631 
1632  res = close (fd);
1633  if (res) {
1634  return (-1);
1635  }
1636  *buf = addr;
1637  return (0);
1638 
1639 error_close_unlink:
1640  close (fd);
1641  unlink(path);
1642  return -1;
1643 }
1644 
1645 static inline int zcb_alloc (
1646  struct cpg_pd *cpd,
1647  const char *path_to_file,
1648  size_t size,
1649  void **addr)
1650 {
1651  struct zcb_mapped *zcb_mapped;
1652  unsigned int res;
1653 
1654  zcb_mapped = malloc (sizeof (struct zcb_mapped));
1655  if (zcb_mapped == NULL) {
1656  return (-1);
1657  }
1658 
1659  res = memory_map (
1660  path_to_file,
1661  size,
1662  addr);
1663  if (res == -1) {
1664  free (zcb_mapped);
1665  return (-1);
1666  }
1667 
1668  list_init (&zcb_mapped->list);
1669  zcb_mapped->addr = *addr;
1670  zcb_mapped->size = size;
1671  list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
1672  return (0);
1673 }
1674 
1675 
1676 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
1677 {
1678  unsigned int res;
1679 
1680  res = munmap (zcb_mapped->addr, zcb_mapped->size);
1681  list_del (&zcb_mapped->list);
1682  free (zcb_mapped);
1683  return (res);
1684 }
1685 
1686 static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
1687 {
1688  struct list_head *list;
1689  struct zcb_mapped *zcb_mapped;
1690  unsigned int res = 0;
1691 
1692  for (list = cpd->zcb_mapped_list_head.next;
1693  list != &cpd->zcb_mapped_list_head; list = list->next) {
1694 
1695  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1696 
1697  if (zcb_mapped->addr == addr) {
1698  res = zcb_free (zcb_mapped);
1699  break;
1700  }
1701 
1702  }
1703  return (res);
1704 }
1705 
1706 static inline int zcb_all_free (
1707  struct cpg_pd *cpd)
1708 {
1709  struct list_head *list;
1710  struct zcb_mapped *zcb_mapped;
1711 
1712  for (list = cpd->zcb_mapped_list_head.next;
1713  list != &cpd->zcb_mapped_list_head;) {
1714 
1715  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1716 
1717  list = list->next;
1718 
1719  zcb_free (zcb_mapped);
1720  }
1721  return (0);
1722 }
1723 
1724 union u {
1725  uint64_t server_addr;
1726  void *server_ptr;
1727 };
1728 
1729 static uint64_t void2serveraddr (void *server_ptr)
1730 {
1731  union u u;
1732 
1733  u.server_ptr = server_ptr;
1734  return (u.server_addr);
1735 }
1736 
1737 static void *serveraddr2void (uint64_t server_addr)
1738 {
1739  union u u;
1740 
1742  return (u.server_ptr);
1743 };
1744 
1745 static void message_handler_req_lib_cpg_zc_alloc (
1746  void *conn,
1747  const void *message)
1748 {
1750  struct qb_ipc_response_header res_header;
1751  void *addr = NULL;
1752  struct coroipcs_zc_header *zc_header;
1753  unsigned int res;
1754  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1755 
1756  log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
1757 
1758  res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1759  &addr);
1760  assert(res == 0);
1761 
1762  zc_header = (struct coroipcs_zc_header *)addr;
1763  zc_header->server_address = void2serveraddr(addr);
1764 
1765  res_header.size = sizeof (struct qb_ipc_response_header);
1766  res_header.id = 0;
1767  api->ipc_response_send (conn,
1768  &res_header,
1769  res_header.size);
1770 }
1771 
1772 static void message_handler_req_lib_cpg_zc_free (
1773  void *conn,
1774  const void *message)
1775 {
1777  struct qb_ipc_response_header res_header;
1778  void *addr = NULL;
1779  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1780 
1781  log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
1782 
1783  addr = serveraddr2void (hdr->server_address);
1784 
1785  zcb_by_addr_free (cpd, addr);
1786 
1787  res_header.size = sizeof (struct qb_ipc_response_header);
1788  res_header.id = 0;
1789  api->ipc_response_send (
1790  conn, &res_header,
1791  res_header.size);
1792 }
1793 
1794 /* Mcast message from the library */
1795 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
1796 {
1797  const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
1798  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1799  mar_cpg_name_t group_name = cpd->group_name;
1800 
1801  struct iovec req_exec_cpg_iovec[2];
1802  struct req_exec_cpg_mcast req_exec_cpg_mcast;
1803  int msglen = req_lib_cpg_mcast->msglen;
1804  int result;
1805  cs_error_t error = CS_ERR_NOT_EXIST;
1806 
1807  log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
1808 
1809  switch (cpd->cpd_state) {
1810  case CPD_STATE_UNJOINED:
1811  error = CS_ERR_NOT_EXIST;
1812  break;
1814  error = CS_ERR_NOT_EXIST;
1815  break;
1817  error = CS_OK;
1818  break;
1820  error = CS_OK;
1821  break;
1822  }
1823 
1824  if (error == CS_OK) {
1825  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1826  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1828  req_exec_cpg_mcast.pid = cpd->pid;
1829  req_exec_cpg_mcast.msglen = msglen;
1830  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1831  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1832  sizeof(mar_cpg_name_t));
1833 
1834  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1835  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1836  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1837  req_exec_cpg_iovec[1].iov_len = msglen;
1838 
1839  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1840  assert(result == 0);
1841  } else {
1842  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
1843  conn, group_name.value, cpd->cpd_state, error);
1844  }
1845 }
1846 
1847 static void message_handler_req_lib_cpg_zc_execute (
1848  void *conn,
1849  const void *message)
1850 {
1852  struct qb_ipc_request_header *header;
1854  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1855  struct iovec req_exec_cpg_iovec[2];
1856  struct req_exec_cpg_mcast req_exec_cpg_mcast;
1857  struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1858  int result;
1859  cs_error_t error = CS_ERR_NOT_EXIST;
1860 
1861  log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
1862 
1863  header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
1864  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
1865 
1866  switch (cpd->cpd_state) {
1867  case CPD_STATE_UNJOINED:
1868  error = CS_ERR_NOT_EXIST;
1869  break;
1871  error = CS_ERR_NOT_EXIST;
1872  break;
1874  error = CS_OK;
1875  break;
1877  error = CS_OK;
1878  break;
1879  }
1880 
1881  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
1883  if (error == CS_OK) {
1884  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
1885  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1887  req_exec_cpg_mcast.pid = cpd->pid;
1888  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1889  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1890  memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
1891  sizeof(mar_cpg_name_t));
1892 
1893  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1894  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1895  req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
1896  req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
1897 
1898  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1899  if (result == 0) {
1900  res_lib_cpg_mcast.header.error = CS_OK;
1901  } else {
1902  res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
1903  }
1904  } else {
1905  res_lib_cpg_mcast.header.error = error;
1906  }
1907 
1908  api->ipc_response_send (conn, &res_lib_cpg_mcast,
1909  sizeof (res_lib_cpg_mcast));
1910 
1911 }
1912 
1913 static void message_handler_req_lib_cpg_membership (void *conn,
1914  const void *message)
1915 {
1917  (struct req_lib_cpg_membership_get *)message;
1919  struct list_head *iter;
1920  int member_count = 0;
1921 
1923  res_lib_cpg_membership_get.header.error = CS_OK;
1924  res_lib_cpg_membership_get.header.size =
1925  sizeof (struct res_lib_cpg_membership_get);
1926 
1927  for (iter = process_info_list_head.next;
1928  iter != &process_info_list_head; iter = iter->next) {
1929 
1930  struct process_info *pi = list_entry (iter, struct process_info, list);
1931  if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
1932  res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
1933  res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
1934  member_count += 1;
1935  }
1936  }
1937  res_lib_cpg_membership_get.member_count = member_count;
1938 
1940  sizeof (res_lib_cpg_membership_get));
1941 }
1942 
1943 static void message_handler_req_lib_cpg_local_get (void *conn,
1944  const void *message)
1945 {
1947 
1948  res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
1950  res_lib_cpg_local_get.header.error = CS_OK;
1951  res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
1952 
1954  sizeof (res_lib_cpg_local_get));
1955 }
1956 
1957 static void message_handler_req_lib_cpg_iteration_initialize (
1958  void *conn,
1959  const void *message)
1960 {
1962  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1963  hdb_handle_t cpg_iteration_handle = 0;
1965  struct list_head *iter, *iter2;
1966  struct cpg_iteration_instance *cpg_iteration_instance;
1967  cs_error_t error = CS_OK;
1968  int res;
1969 
1970  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
1971 
1972  /* Because between calling this function and *next can be some operations which will
1973  * change list, we must do full copy.
1974  */
1975 
1976  /*
1977  * Create new iteration instance
1978  */
1979  res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
1980  &cpg_iteration_handle);
1981 
1982  if (res != 0) {
1983  error = CS_ERR_NO_MEMORY;
1984  goto response_send;
1985  }
1986 
1987  res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
1988 
1989  if (res != 0) {
1990  error = CS_ERR_BAD_HANDLE;
1991  goto error_destroy;
1992  }
1993 
1994  list_init (&cpg_iteration_instance->items_list_head);
1995  cpg_iteration_instance->handle = cpg_iteration_handle;
1996 
1997  /*
1998  * Create copy of process_info list "grouped by" group name
1999  */
2000  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
2001  struct process_info *pi = list_entry (iter, struct process_info, list);
2002  struct process_info *new_pi;
2003 
2004  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2005  /*
2006  * Try to find processed group name in our list new list
2007  */
2008  int found = 0;
2009 
2010  for (iter2 = cpg_iteration_instance->items_list_head.next;
2011  iter2 != &cpg_iteration_instance->items_list_head;
2012  iter2 = iter2->next) {
2013  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2014 
2015  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2016  found = 1;
2017  break;
2018  }
2019  }
2020 
2021  if (found) {
2022  /*
2023  * We have this name in list -> don't add
2024  */
2025  continue ;
2026  }
2027  } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
2028  /*
2029  * Test pi group name with request
2030  */
2031  if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2032  /*
2033  * Not same -> don't add
2034  */
2035  continue ;
2036  }
2037 
2038  new_pi = malloc (sizeof (struct process_info));
2039  if (!new_pi) {
2040  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
2041 
2042  error = CS_ERR_NO_MEMORY;
2043 
2044  goto error_put_destroy;
2045  }
2046 
2047  memcpy (new_pi, pi, sizeof (struct process_info));
2048  list_init (&new_pi->list);
2049 
2050  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2051  /*
2052  * pid and nodeid -> undefined
2053  */
2054  new_pi->pid = new_pi->nodeid = 0;
2055  }
2056 
2057  /*
2058  * We will return list "grouped" by "group name", so try to find right place to add
2059  */
2060  for (iter2 = cpg_iteration_instance->items_list_head.next;
2061  iter2 != &cpg_iteration_instance->items_list_head;
2062  iter2 = iter2->next) {
2063  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2064 
2065  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2066  break;
2067  }
2068  }
2069 
2070  list_add (&new_pi->list, iter2);
2071  }
2072 
2073  /*
2074  * Now we have a full "grouped by" copy of process_info list
2075  */
2076 
2077  /*
2078  * Add instance to current cpd list
2079  */
2080  list_init (&cpg_iteration_instance->list);
2081  list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
2082 
2083  cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2084 
2085 error_put_destroy:
2086  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2087 error_destroy:
2088  if (error != CS_OK) {
2089  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2090  }
2091 
2092 response_send:
2095  res_lib_cpg_iterationinitialize.header.error = error;
2096  res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2097 
2100 }
2101 
2102 static void message_handler_req_lib_cpg_iteration_next (
2103  void *conn,
2104  const void *message)
2105 {
2106  const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
2108  struct cpg_iteration_instance *cpg_iteration_instance;
2109  cs_error_t error = CS_OK;
2110  int res;
2111  struct process_info *pi;
2112 
2113  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
2114 
2115  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2116  req_lib_cpg_iterationnext->iteration_handle,
2117  (void *)&cpg_iteration_instance);
2118 
2119  if (res != 0) {
2120  error = CS_ERR_LIBRARY;
2121  goto error_exit;
2122  }
2123 
2124  assert (cpg_iteration_instance);
2125 
2126  cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2127 
2128  if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2129  error = CS_ERR_NO_SECTIONS;
2130  goto error_put;
2131  }
2132 
2133  pi = list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
2134 
2135  /*
2136  * Copy iteration data
2137  */
2138  res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
2139  res_lib_cpg_iterationnext.description.pid = pi->pid;
2140  memcpy (&res_lib_cpg_iterationnext.description.group,
2141  &pi->group,
2142  sizeof (mar_cpg_name_t));
2143 
2144 error_put:
2145  hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2146 error_exit:
2149  res_lib_cpg_iterationnext.header.error = error;
2150 
2152  sizeof (res_lib_cpg_iterationnext));
2153 }
2154 
2155 static void message_handler_req_lib_cpg_iteration_finalize (
2156  void *conn,
2157  const void *message)
2158 {
2161  struct cpg_iteration_instance *cpg_iteration_instance;
2162  cs_error_t error = CS_OK;
2163  int res;
2164 
2165  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
2166 
2167  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2168  req_lib_cpg_iterationfinalize->iteration_handle,
2169  (void *)&cpg_iteration_instance);
2170 
2171  if (res != 0) {
2172  error = CS_ERR_LIBRARY;
2173  goto error_exit;
2174  }
2175 
2176  assert (cpg_iteration_instance);
2177 
2178  cpg_iteration_instance_finalize (cpg_iteration_instance);
2179  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2180 
2181 error_exit:
2184  res_lib_cpg_iterationfinalize.header.error = error;
2185 
2188 }