corosync  2.3.2
main.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2002-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2012 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
72 #include <config.h>
73 
74 #include <pthread.h>
75 #include <assert.h>
76 #include <sys/types.h>
77 #include <sys/file.h>
78 #include <sys/poll.h>
79 #include <sys/uio.h>
80 #include <sys/mman.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <sys/time.h>
84 #include <sys/resource.h>
85 #include <sys/stat.h>
86 #include <netinet/in.h>
87 #include <arpa/inet.h>
88 #include <unistd.h>
89 #include <fcntl.h>
90 #include <stdlib.h>
91 #include <stdio.h>
92 #include <errno.h>
93 #include <signal.h>
94 #include <sched.h>
95 #include <time.h>
96 #include <semaphore.h>
97 
98 #include <qb/qbdefs.h>
99 #include <qb/qblog.h>
100 #include <qb/qbloop.h>
101 #include <qb/qbutil.h>
102 #include <qb/qbipcs.h>
103 
104 #include <corosync/swab.h>
105 #include <corosync/corotypes.h>
106 #include <corosync/corodefs.h>
107 #include <corosync/list.h>
108 #include <corosync/totem/totempg.h>
109 #include <corosync/logsys.h>
110 #include <corosync/icmap.h>
111 
112 #include "quorum.h"
113 #include "totemsrp.h"
114 #include "logconfig.h"
115 #include "totemconfig.h"
116 #include "main.h"
117 #include "sync.h"
118 #include "timer.h"
119 #include "util.h"
120 #include "apidef.h"
121 #include "service.h"
122 #include "schedwrk.h"
123 
124 #ifdef HAVE_SMALL_MEMORY_FOOTPRINT
125 #define IPC_LOGSYS_SIZE 1024*64
126 #else
127 #define IPC_LOGSYS_SIZE 8192*128
128 #endif
129 
130 LOGSYS_DECLARE_SYSTEM ("corosync",
132  LOG_DAEMON,
133  LOG_INFO);
134 
135 LOGSYS_DECLARE_SUBSYS ("MAIN");
136 
137 #define SERVER_BACKLOG 5
138 
139 static int sched_priority = 0;
140 
141 static unsigned int service_count = 32;
142 
144 
145 static struct corosync_api_v1 *api = NULL;
146 
147 static int sync_in_process = 1;
148 
149 static qb_loop_t *corosync_poll_handle;
150 
151 struct sched_param global_sched_param;
152 
153 static corosync_timer_handle_t corosync_stats_timer_handle;
154 
155 static const char *corosync_lock_file = LOCALSTATEDIR"/run/corosync.pid";
156 
157 static int ip_version = AF_INET;
158 
159 qb_loop_t *cs_poll_handle_get (void)
160 {
161  return (corosync_poll_handle);
162 }
163 
164 int cs_poll_dispatch_add (qb_loop_t * handle,
165  int fd,
166  int events,
167  void *data,
168 
169  int (*dispatch_fn) (int fd,
170  int revents,
171  void *data))
172 {
173  return qb_loop_poll_add(handle, QB_LOOP_MED, fd, events, data,
174  dispatch_fn);
175 }
176 
177 int cs_poll_dispatch_delete(qb_loop_t * handle, int fd)
178 {
179  return qb_loop_poll_del(handle, fd);
180 }
181 
183 {
184  int i;
185 
186  for (i = 0; i < SERVICES_COUNT_MAX; i++) {
187  if (corosync_service[i] && corosync_service[i]->exec_dump_fn) {
189  }
190  }
191 }
192 
193 static void corosync_blackbox_write_to_file (void)
194 {
195  char fname[PATH_MAX];
196  char time_str[PATH_MAX];
197  struct tm cur_time_tm;
198  time_t cur_time_t;
199 
200  cur_time_t = time(NULL);
201  localtime_r(&cur_time_t, &cur_time_tm);
202 
203  strftime(time_str, PATH_MAX, "%Y-%m-%dT%H:%M:%S", &cur_time_tm);
204  snprintf(fname, PATH_MAX, "%s/fdata-%s-%lld",
205  LOCALSTATEDIR "/lib/corosync",
206  time_str,
207  (long long int)getpid());
208 
209  qb_log_blackbox_write_to_file(fname);
210 
211  unlink(LOCALSTATEDIR "/lib/corosync/fdata");
212  symlink(fname, LOCALSTATEDIR "/lib/corosync/fdata");
213 }
214 
215 static void unlink_all_completed (void)
216 {
217  api->timer_delete (corosync_stats_timer_handle);
218  qb_loop_stop (corosync_poll_handle);
219  icmap_fini();
220 }
221 
223 {
224  corosync_service_unlink_all (api, unlink_all_completed);
225 }
226 
227 static int32_t sig_diag_handler (int num, void *data)
228 {
230  return 0;
231 }
232 
233 static int32_t sig_exit_handler (int num, void *data)
234 {
235  log_printf(LOGSYS_LEVEL_NOTICE, "Node was shut down by a signal");
236  corosync_service_unlink_all (api, unlink_all_completed);
237  return 0;
238 }
239 
240 static void sigsegv_handler (int num)
241 {
242  (void)signal (SIGSEGV, SIG_DFL);
243  corosync_blackbox_write_to_file ();
244  qb_log_fini();
245  raise (SIGSEGV);
246 }
247 
248 static void sigabrt_handler (int num)
249 {
250  (void)signal (SIGABRT, SIG_DFL);
251  corosync_blackbox_write_to_file ();
252  qb_log_fini();
253  raise (SIGABRT);
254 }
255 
256 #define LOCALHOST_IP inet_addr("127.0.0.1")
257 
258 static void *corosync_group_handle;
259 
260 static struct totempg_group corosync_group = {
261  .group = "a",
262  .group_len = 1
263 };
264 
265 static void serialize_lock (void)
266 {
267 }
268 
269 static void serialize_unlock (void)
270 {
271 }
272 
273 static void corosync_sync_completed (void)
274 {
276  "Completed service synchronization, ready to provide service.");
277  sync_in_process = 0;
278 
279  cs_ipcs_sync_state_changed(sync_in_process);
281  /*
282  * Inform totem to start using new message queue again
283  */
285 }
286 
287 static int corosync_sync_callbacks_retrieve (
288  int service_id,
289  struct sync_callbacks *callbacks)
290 {
291  if (corosync_service[service_id] == NULL) {
292  return (-1);
293  }
294 
295  if (callbacks == NULL) {
296  return (0);
297  }
298 
299  callbacks->name = corosync_service[service_id]->name;
300 
301  callbacks->sync_init = corosync_service[service_id]->sync_init;
302  callbacks->sync_process = corosync_service[service_id]->sync_process;
303  callbacks->sync_activate = corosync_service[service_id]->sync_activate;
304  callbacks->sync_abort = corosync_service[service_id]->sync_abort;
305  return (0);
306 }
307 
308 static struct memb_ring_id corosync_ring_id;
309 
310 static void member_object_joined (unsigned int nodeid)
311 {
312  char member_ip[ICMAP_KEYNAME_MAXLEN];
313  char member_join_count[ICMAP_KEYNAME_MAXLEN];
314  char member_status[ICMAP_KEYNAME_MAXLEN];
315 
316  snprintf(member_ip, ICMAP_KEYNAME_MAXLEN,
317  "runtime.totem.pg.mrp.srp.members.%u.ip", nodeid);
318  snprintf(member_join_count, ICMAP_KEYNAME_MAXLEN,
319  "runtime.totem.pg.mrp.srp.members.%u.join_count", nodeid);
320  snprintf(member_status, ICMAP_KEYNAME_MAXLEN,
321  "runtime.totem.pg.mrp.srp.members.%u.status", nodeid);
322 
323  if (icmap_get(member_ip, NULL, NULL, NULL) == CS_OK) {
324  icmap_inc(member_join_count);
325  icmap_set_string(member_status, "joined");
326  } else {
327  icmap_set_string(member_ip, (char*)api->totem_ifaces_print (nodeid));
328  icmap_set_uint32(member_join_count, 1);
329  icmap_set_string(member_status, "joined");
330  }
331 
333  "Member joined: %s", api->totem_ifaces_print (nodeid));
334 }
335 
336 static void member_object_left (unsigned int nodeid)
337 {
338  char member_status[ICMAP_KEYNAME_MAXLEN];
339 
340  snprintf(member_status, ICMAP_KEYNAME_MAXLEN,
341  "runtime.totem.pg.mrp.srp.members.%u.status", nodeid);
342  icmap_set_string(member_status, "left");
343 
345  "Member left: %s", api->totem_ifaces_print (nodeid));
346 }
347 
348 static void confchg_fn (
349  enum totem_configuration_type configuration_type,
350  const unsigned int *member_list, size_t member_list_entries,
351  const unsigned int *left_list, size_t left_list_entries,
352  const unsigned int *joined_list, size_t joined_list_entries,
353  const struct memb_ring_id *ring_id)
354 {
355  int i;
356  int abort_activate = 0;
357 
358  if (sync_in_process == 1) {
359  abort_activate = 1;
360  }
361  sync_in_process = 1;
362  cs_ipcs_sync_state_changed(sync_in_process);
363  memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id));
364 
365  for (i = 0; i < left_list_entries; i++) {
366  member_object_left (left_list[i]);
367  }
368  for (i = 0; i < joined_list_entries; i++) {
369  member_object_joined (joined_list[i]);
370  }
371  /*
372  * Call configuration change for all services
373  */
374  for (i = 0; i < service_count; i++) {
375  if (corosync_service[i] && corosync_service[i]->confchg_fn) {
376  corosync_service[i]->confchg_fn (configuration_type,
377  member_list, member_list_entries,
378  left_list, left_list_entries,
379  joined_list, joined_list_entries, ring_id);
380  }
381  }
382 
383  if (abort_activate) {
384  sync_abort ();
385  }
386  if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
387  sync_save_transitional (member_list, member_list_entries, ring_id);
388  }
389  if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
390  sync_start (member_list, member_list_entries, ring_id);
391  }
392 }
393 
394 static void priv_drop (void)
395 {
396  return; /* TODO: we are still not dropping privs */
397 }
398 
399 static void corosync_tty_detach (void)
400 {
401  int devnull;
402 
403  /*
404  * Disconnect from TTY if this is not a debug run
405  */
406 
407  switch (fork ()) {
408  case -1:
410  break;
411  case 0:
412  /*
413  * child which is disconnected, run this process
414  */
415  break;
416  default:
417  exit (0);
418  break;
419  }
420 
421  /* Create new session */
422  (void)setsid();
423 
424  /*
425  * Map stdin/out/err to /dev/null.
426  */
427  devnull = open("/dev/null", O_RDWR);
428  if (devnull == -1) {
430  }
431 
432  if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
433  || dup2(devnull, 2) < 0) {
435  }
436 }
437 
438 static void corosync_mlockall (void)
439 {
440  int res;
441  struct rlimit rlimit;
442 
443  rlimit.rlim_cur = RLIM_INFINITY;
444  rlimit.rlim_max = RLIM_INFINITY;
445 
446 #ifndef RLIMIT_MEMLOCK
447 #define RLIMIT_MEMLOCK RLIMIT_VMEM
448 #endif
449 
450  setrlimit (RLIMIT_MEMLOCK, &rlimit);
451 
452  res = mlockall (MCL_CURRENT | MCL_FUTURE);
453  if (res == -1) {
455  "Could not lock memory of service to avoid page faults");
456  };
457 }
458 
459 
460 static void corosync_totem_stats_updater (void *data)
461 {
462  totempg_stats_t * stats;
463  uint32_t total_mtt_rx_token;
464  uint32_t total_backlog_calc;
465  uint32_t total_token_holdtime;
466  int t, prev, i;
467  int32_t token_count;
468  char key_name[ICMAP_KEYNAME_MAXLEN];
469 
470  stats = api->totem_get_stats();
471 
472  icmap_set_uint32("runtime.totem.pg.msg_reserved", stats->msg_reserved);
473  icmap_set_uint32("runtime.totem.pg.msg_queue_avail", stats->msg_queue_avail);
474  icmap_set_uint64("runtime.totem.pg.mrp.srp.orf_token_tx", stats->mrp->srp->orf_token_tx);
475  icmap_set_uint64("runtime.totem.pg.mrp.srp.orf_token_rx", stats->mrp->srp->orf_token_rx);
476  icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_merge_detect_tx", stats->mrp->srp->memb_merge_detect_tx);
477  icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_merge_detect_rx", stats->mrp->srp->memb_merge_detect_rx);
478  icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_join_tx", stats->mrp->srp->memb_join_tx);
479  icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_join_rx", stats->mrp->srp->memb_join_rx);
480  icmap_set_uint64("runtime.totem.pg.mrp.srp.mcast_tx", stats->mrp->srp->mcast_tx);
481  icmap_set_uint64("runtime.totem.pg.mrp.srp.mcast_retx", stats->mrp->srp->mcast_retx);
482  icmap_set_uint64("runtime.totem.pg.mrp.srp.mcast_rx", stats->mrp->srp->mcast_rx);
483  icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_commit_token_tx", stats->mrp->srp->memb_commit_token_tx);
484  icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_commit_token_rx", stats->mrp->srp->memb_commit_token_rx);
485  icmap_set_uint64("runtime.totem.pg.mrp.srp.token_hold_cancel_tx", stats->mrp->srp->token_hold_cancel_tx);
486  icmap_set_uint64("runtime.totem.pg.mrp.srp.token_hold_cancel_rx", stats->mrp->srp->token_hold_cancel_rx);
487  icmap_set_uint64("runtime.totem.pg.mrp.srp.operational_entered", stats->mrp->srp->operational_entered);
488  icmap_set_uint64("runtime.totem.pg.mrp.srp.operational_token_lost", stats->mrp->srp->operational_token_lost);
489  icmap_set_uint64("runtime.totem.pg.mrp.srp.gather_entered", stats->mrp->srp->gather_entered);
490  icmap_set_uint64("runtime.totem.pg.mrp.srp.gather_token_lost", stats->mrp->srp->gather_token_lost);
491  icmap_set_uint64("runtime.totem.pg.mrp.srp.commit_entered", stats->mrp->srp->commit_entered);
492  icmap_set_uint64("runtime.totem.pg.mrp.srp.commit_token_lost", stats->mrp->srp->commit_token_lost);
493  icmap_set_uint64("runtime.totem.pg.mrp.srp.recovery_entered", stats->mrp->srp->recovery_entered);
494  icmap_set_uint64("runtime.totem.pg.mrp.srp.recovery_token_lost", stats->mrp->srp->recovery_token_lost);
495  icmap_set_uint64("runtime.totem.pg.mrp.srp.consensus_timeouts", stats->mrp->srp->consensus_timeouts);
496  icmap_set_uint64("runtime.totem.pg.mrp.srp.rx_msg_dropped", stats->mrp->srp->rx_msg_dropped);
497  icmap_set_uint32("runtime.totem.pg.mrp.srp.continuous_gather", stats->mrp->srp->continuous_gather);
498  icmap_set_uint32("runtime.totem.pg.mrp.srp.continuous_sendmsg_failures",
500 
501  icmap_set_uint8("runtime.totem.pg.mrp.srp.firewall_enabled_or_nic_failure",
502  stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ? 1 : 0);
503 
504  if (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ||
507  "Totem is unable to form a cluster because of an "
508  "operating system or network fault. The most common "
509  "cause of this message is that the local firewall is "
510  "configured improperly.");
511  icmap_set_uint8("runtime.totem.pg.mrp.srp.firewall_enabled_or_nic_failure", 1);
512  } else {
513  icmap_set_uint8("runtime.totem.pg.mrp.srp.firewall_enabled_or_nic_failure", 0);
514  }
515 
516  for (i = 0; i < stats->mrp->srp->rrp->interface_count; i++) {
517  snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.totem.pg.mrp.rrp.%u.faulty", i);
518  icmap_set_uint8(key_name, stats->mrp->srp->rrp->faulty[i]);
519  }
520  total_mtt_rx_token = 0;
521  total_token_holdtime = 0;
522  total_backlog_calc = 0;
523  token_count = 0;
524  t = stats->mrp->srp->latest_token;
525  while (1) {
526  if (t == 0)
527  prev = TOTEM_TOKEN_STATS_MAX - 1;
528  else
529  prev = t - 1;
530  if (prev == stats->mrp->srp->earliest_token)
531  break;
532  /* if tx == 0, then dropped token (not ours) */
533  if (stats->mrp->srp->token[t].tx != 0 ||
534  (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx) > 0 ) {
535  total_mtt_rx_token += (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx);
536  total_token_holdtime += (stats->mrp->srp->token[t].tx - stats->mrp->srp->token[t].rx);
537  total_backlog_calc += stats->mrp->srp->token[t].backlog_calc;
538  token_count++;
539  }
540  t = prev;
541  }
542  if (token_count) {
543  icmap_set_uint32("runtime.totem.pg.mrp.srp.mtt_rx_token", (total_mtt_rx_token / token_count));
544  icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_token_workload", (total_token_holdtime / token_count));
545  icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_backlog_calc", (total_backlog_calc / token_count));
546  }
547 
549 
550  api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
551  corosync_totem_stats_updater,
552  &corosync_stats_timer_handle);
553 }
554 
555 static void totem_dynamic_notify(
556  int32_t event,
557  const char *key_name,
558  struct icmap_notify_value new_val,
559  struct icmap_notify_value old_val,
560  void *user_data)
561 {
562  int res;
563  int ring_no;
564  int member_no;
565  struct totem_ip_address member;
566  int add_new_member = 0;
567  int remove_old_member = 0;
568  char tmp_str[ICMAP_KEYNAME_MAXLEN];
569 
570  res = sscanf(key_name, "nodelist.node.%u.ring%u%s", &member_no, &ring_no, tmp_str);
571  if (res != 3)
572  return ;
573 
574  if (strcmp(tmp_str, "_addr") != 0) {
575  return;
576  }
577 
578  if (event == ICMAP_TRACK_ADD && new_val.type == ICMAP_VALUETYPE_STRING) {
579  add_new_member = 1;
580  }
581 
582  if (event == ICMAP_TRACK_DELETE && old_val.type == ICMAP_VALUETYPE_STRING) {
583  remove_old_member = 1;
584  }
585 
586  if (event == ICMAP_TRACK_MODIFY && new_val.type == ICMAP_VALUETYPE_STRING &&
587  old_val.type == ICMAP_VALUETYPE_STRING) {
588  add_new_member = 1;
589  remove_old_member = 1;
590  }
591 
592  if (remove_old_member) {
594  "removing dynamic member %s for ring %u", (char *)old_val.data, ring_no);
595  if (totemip_parse(&member, (char *)old_val.data, ip_version) == 0) {
596  totempg_member_remove (&member, ring_no);
597  }
598  }
599 
600  if (add_new_member) {
602  "adding dynamic member %s for ring %u", (char *)new_val.data, ring_no);
603  if (totemip_parse(&member, (char *)new_val.data, ip_version) == 0) {
604  totempg_member_add (&member, ring_no);
605  }
606  }
607 }
608 
609 static void corosync_totem_dynamic_init (void)
610 {
611  icmap_track_t icmap_track = NULL;
612 
613  icmap_track_add("nodelist.node.",
615  totem_dynamic_notify,
616  NULL,
617  &icmap_track);
618 }
619 
620 static void corosync_totem_stats_init (void)
621 {
622  icmap_set_uint32("runtime.totem.pg.mrp.srp.mtt_rx_token", 0);
623  icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_token_workload", 0);
624  icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_backlog_calc", 0);
625 
626  /* start stats timer */
627  api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
628  corosync_totem_stats_updater,
629  &corosync_stats_timer_handle);
630 }
631 
632 
633 static void deliver_fn (
634  unsigned int nodeid,
635  const void *msg,
636  unsigned int msg_len,
637  int endian_conversion_required)
638 {
639  const struct qb_ipc_request_header *header;
640  int32_t service;
641  int32_t fn_id;
642  uint32_t id;
643 
644  header = msg;
645  if (endian_conversion_required) {
646  id = swab32 (header->id);
647  } else {
648  id = header->id;
649  }
650 
651  /*
652  * Call the proper executive handler
653  */
654  service = id >> 16;
655  fn_id = id & 0xffff;
656 
657  if (!corosync_service[service]) {
658  return;
659  }
660  if (fn_id >= corosync_service[service]->exec_engine_count) {
661  log_printf(LOGSYS_LEVEL_WARNING, "discarded unknown message %d for service %d (max id %d)",
662  fn_id, service, corosync_service[service]->exec_engine_count);
663  return;
664  }
665 
666  icmap_fast_inc(service_stats_rx[service][fn_id]);
667 
668  if (endian_conversion_required) {
669  assert(corosync_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL);
671  ((void *)msg);
672  }
673 
675  (msg, nodeid);
676 }
677 
679  const struct iovec *iovec,
680  unsigned int iov_len,
681  unsigned int guarantee)
682 {
683  const struct qb_ipc_request_header *req = iovec->iov_base;
684  int32_t service;
685  int32_t fn_id;
686 
687  service = req->id >> 16;
688  fn_id = req->id & 0xffff;
689 
690  if (corosync_service[service]) {
691  icmap_fast_inc(service_stats_tx[service][fn_id]);
692  }
693 
694  return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
695 }
696 
697 static qb_loop_timer_handle recheck_the_q_level_timer;
699 {
700  totempg_check_q_level(corosync_group_handle);
702  qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC,
703  NULL, corosync_recheck_the_q_level, &recheck_the_q_level_timer);
704  }
705 }
706 
709 };
710 
711 
713  unsigned int service,
714  unsigned int id,
715  const void *msg,
716  void *sending_allowed_private_data)
717 {
719  (struct sending_allowed_private_data_struct *)sending_allowed_private_data;
720  struct iovec reserve_iovec;
721  struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg;
722  int sending_allowed;
723 
724  reserve_iovec.iov_base = (char *)header;
725  reserve_iovec.iov_len = header->size;
726 
728  corosync_group_handle,
729  &reserve_iovec, 1);
730  if (pd->reserved_msgs == -1) {
731  return -EINVAL;
732  }
733 
734  sending_allowed = QB_FALSE;
735  if (corosync_quorum_is_quorate() == 1 ||
736  corosync_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) {
737  // we are quorate
738  // now check flow control
739  if (corosync_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) {
740  sending_allowed = QB_TRUE;
741  } else if (pd->reserved_msgs && sync_in_process == 0) {
742  sending_allowed = QB_TRUE;
743  } else if (pd->reserved_msgs == 0) {
744  return -ENOBUFS;
745  } else /* (sync_in_process) */ {
746  return -EINPROGRESS;
747  }
748  } else {
749  return -EHOSTUNREACH;
750  }
751 
752  return (sending_allowed);
753 }
754 
755 void corosync_sending_allowed_release (void *sending_allowed_private_data)
756 {
758  (struct sending_allowed_private_data_struct *)sending_allowed_private_data;
759 
760  if (pd->reserved_msgs == -1) {
761  return;
762  }
764 }
765 
767 {
768  int ret = 0;
769 
770  assert (source != NULL);
771  if (source->nodeid == totempg_my_nodeid_get ()) {
772  ret = 1;
773  }
774  return ret;
775 }
776 
778  mar_message_source_t *source,
779  void *conn)
780 {
781  assert ((source != NULL) && (conn != NULL));
782  memset (source, 0, sizeof (mar_message_source_t));
783  source->nodeid = totempg_my_nodeid_get ();
784  source->conn = conn;
785 }
786 
789  qb_loop_timer_handle handle;
790  unsigned long long tv_prev;
791  unsigned long long max_tv_diff;
792 };
793 
794 static void timer_function_scheduler_timeout (void *data)
795 {
796  struct scheduler_pause_timeout_data *timeout_data = (struct scheduler_pause_timeout_data *)data;
797  unsigned long long tv_current;
798  unsigned long long tv_diff;
799 
800  tv_current = qb_util_nano_current_get ();
801 
802  if (timeout_data->tv_prev == 0) {
803  /*
804  * Initial call -> just pretent everything is ok
805  */
806  timeout_data->tv_prev = tv_current;
807  timeout_data->max_tv_diff = 0;
808  }
809 
810  tv_diff = tv_current - timeout_data->tv_prev;
811  timeout_data->tv_prev = tv_current;
812 
813  if (tv_diff > timeout_data->max_tv_diff) {
814  log_printf (LOGSYS_LEVEL_WARNING, "Corosync main process was not scheduled for %0.4f ms "
815  "(threshold is %0.4f ms). Consider token timeout increase.",
816  (float)tv_diff / QB_TIME_NS_IN_MSEC, (float)timeout_data->max_tv_diff / QB_TIME_NS_IN_MSEC);
817  }
818 
819  /*
820  * Set next threshold, because token_timeout can change
821  */
822  timeout_data->max_tv_diff = timeout_data->totem_config->token_timeout * QB_TIME_NS_IN_MSEC * 0.8;
823  qb_loop_timer_add (corosync_poll_handle,
824  QB_LOOP_MED,
825  timeout_data->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 3,
826  timeout_data,
827  timer_function_scheduler_timeout,
828  &timeout_data->handle);
829 }
830 
831 
832 static void corosync_setscheduler (void)
833 {
834 #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) && defined(HAVE_SCHED_SETSCHEDULER)
835  int res;
836 
837  sched_priority = sched_get_priority_max (SCHED_RR);
838  if (sched_priority != -1) {
839  global_sched_param.sched_priority = sched_priority;
840  res = sched_setscheduler (0, SCHED_RR, &global_sched_param);
841  if (res == -1) {
843  "Could not set SCHED_RR at priority %d",
844  global_sched_param.sched_priority);
845 
846  global_sched_param.sched_priority = 0;
847 #ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET
848  qb_log_thread_priority_set (SCHED_OTHER, 0);
849 #endif
850  } else {
851 
852  /*
853  * Turn on SCHED_RR in logsys system
854  */
855 #ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET
856  res = qb_log_thread_priority_set (SCHED_RR, sched_priority);
857 #else
858  res = -1;
859 #endif
860  if (res == -1) {
862  "Could not set logsys thread priority."
863  " Can't continue because of priority inversions.");
865  }
866  }
867  } else {
869  "Could not get maximum scheduler priority");
870  sched_priority = 0;
871  }
872 #else
874  "The Platform is missing process priority setting features. Leaving at default.");
875 #endif
876 }
877 
878 static void
879 _logsys_log_printf(int level, int subsys,
880  const char *function_name,
881  const char *file_name,
882  int file_line,
883  const char *format,
884  ...) __attribute__((format(printf, 6, 7)));
885 
886 static void
887 _logsys_log_printf(int level, int subsys,
888  const char *function_name,
889  const char *file_name,
890  int file_line,
891  const char *format, ...)
892 {
893  va_list ap;
894 
895  va_start(ap, format);
896  qb_log_from_external_source_va(function_name, file_name,
897  format, level, file_line,
898  subsys, ap);
899  va_end(ap);
900 }
901 
902 static void fplay_key_change_notify_fn (
903  int32_t event,
904  const char *key_name,
905  struct icmap_notify_value new_val,
906  struct icmap_notify_value old_val,
907  void *user_data)
908 {
909  if (strcmp(key_name, "runtime.blackbox.dump_flight_data") == 0) {
910  fprintf(stderr,"Writetofile\n");
911  corosync_blackbox_write_to_file ();
912  }
913  if (strcmp(key_name, "runtime.blackbox.dump_state") == 0) {
914  fprintf(stderr,"statefump\n");
916  }
917 }
918 
919 static void corosync_fplay_control_init (void)
920 {
921  icmap_track_t track = NULL;
922 
923  icmap_set_string("runtime.blackbox.dump_flight_data", "no");
924  icmap_set_string("runtime.blackbox.dump_state", "no");
925 
926  icmap_track_add("runtime.blackbox.dump_flight_data",
928  fplay_key_change_notify_fn,
929  NULL, &track);
930  icmap_track_add("runtime.blackbox.dump_state",
932  fplay_key_change_notify_fn,
933  NULL, &track);
934 }
935 
936 /*
937  * Set RO flag for keys, which ether doesn't make sense to change by user (statistic)
938  * or which when changed are not reflected by runtime (totem.crypto_cipher, ...).
939  *
940  * Also some RO keys cannot be determined in this stage, so they are set later in
941  * other functions (like nodelist.local_node_pos, ...)
942  */
943 static void set_icmap_ro_keys_flag (void)
944 {
945  /*
946  * Set RO flag for all keys of internal configuration and runtime statistics
947  */
948  icmap_set_ro_access("internal_configuration.", CS_TRUE, CS_TRUE);
949  icmap_set_ro_access("runtime.connections.", CS_TRUE, CS_TRUE);
950  icmap_set_ro_access("runtime.totem.", CS_TRUE, CS_TRUE);
951  icmap_set_ro_access("runtime.services.", CS_TRUE, CS_TRUE);
952 
953  /*
954  * Set RO flag for constrete keys of configuration which can't be changed
955  * during runtime
956  */
957  icmap_set_ro_access("totem.crypto_cipher", CS_FALSE, CS_TRUE);
958  icmap_set_ro_access("totem.crypto_hash", CS_FALSE, CS_TRUE);
959  icmap_set_ro_access("totem.secauth", CS_FALSE, CS_TRUE);
960  icmap_set_ro_access("totem.ip_version", CS_FALSE, CS_TRUE);
961  icmap_set_ro_access("totem.rrp_mode", CS_FALSE, CS_TRUE);
962  icmap_set_ro_access("totem.netmtu", CS_FALSE, CS_TRUE);
963  icmap_set_ro_access("qb.ipc_type", CS_FALSE, CS_TRUE);
964 }
965 
966 static void main_service_ready (void)
967 {
968  int res;
969 
970  /*
971  * This must occur after totempg is initialized because "this_ip" must be set
972  */
974  if (res == -1) {
975  log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services");
977  }
978  cs_ipcs_init();
979  corosync_totem_stats_init ();
980  corosync_fplay_control_init ();
981  corosync_totem_dynamic_init ();
982  sync_init (
983  corosync_sync_callbacks_retrieve,
984  corosync_sync_completed);
985 }
986 
987 static enum e_corosync_done corosync_flock (const char *lockfile, pid_t pid)
988 {
989  struct flock lock;
990  enum e_corosync_done err;
991  char pid_s[17];
992  int fd_flag;
993  int lf;
994 
995  err = COROSYNC_DONE_EXIT;
996 
997  lf = open (lockfile, O_WRONLY | O_CREAT, 0640);
998  if (lf == -1) {
999  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create lock file.");
1000  return (COROSYNC_DONE_AQUIRE_LOCK);
1001  }
1002 
1003 retry_fcntl:
1004  lock.l_type = F_WRLCK;
1005  lock.l_start = 0;
1006  lock.l_whence = SEEK_SET;
1007  lock.l_len = 0;
1008  if (fcntl (lf, F_SETLK, &lock) == -1) {
1009  switch (errno) {
1010  case EINTR:
1011  goto retry_fcntl;
1012  break;
1013  case EAGAIN:
1014  case EACCES:
1015  log_printf (LOGSYS_LEVEL_ERROR, "Another Corosync instance is already running.");
1017  goto error_close;
1018  break;
1019  default:
1020  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't aquire lock. Error was %s",
1021  strerror(errno));
1023  goto error_close;
1024  break;
1025  }
1026  }
1027 
1028  if (ftruncate (lf, 0) == -1) {
1029  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't truncate lock file. Error was %s",
1030  strerror (errno));
1032  goto error_close_unlink;
1033  }
1034 
1035  memset (pid_s, 0, sizeof (pid_s));
1036  snprintf (pid_s, sizeof (pid_s) - 1, "%u\n", pid);
1037 
1038 retry_write:
1039  if (write (lf, pid_s, strlen (pid_s)) != strlen (pid_s)) {
1040  if (errno == EINTR) {
1041  goto retry_write;
1042  } else {
1043  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't write pid to lock file. "
1044  "Error was %s", strerror (errno));
1046  goto error_close_unlink;
1047  }
1048  }
1049 
1050  if ((fd_flag = fcntl (lf, F_GETFD, 0)) == -1) {
1051  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't get close-on-exec flag from lock file. "
1052  "Error was %s", strerror (errno));
1054  goto error_close_unlink;
1055  }
1056  fd_flag |= FD_CLOEXEC;
1057  if (fcntl (lf, F_SETFD, fd_flag) == -1) {
1058  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't set close-on-exec flag to lock file. "
1059  "Error was %s", strerror (errno));
1061  goto error_close_unlink;
1062  }
1063 
1064  return (err);
1065 
1066 error_close_unlink:
1067  unlink (lockfile);
1068 error_close:
1069  close (lf);
1070 
1071  return (err);
1072 }
1073 
1074 int main (int argc, char **argv, char **envp)
1075 {
1076  const char *error_string;
1077  struct totem_config totem_config;
1078  int res, ch;
1079  int background, setprio;
1080  struct stat stat_out;
1081  char corosync_lib_dir[PATH_MAX];
1082  enum e_corosync_done flock_err;
1083  uint64_t totem_config_warnings;
1084  struct scheduler_pause_timeout_data scheduler_pause_timeout_data;
1085 
1086  /* default configuration
1087  */
1088  background = 1;
1089  setprio = 0;
1090 
1091  while ((ch = getopt (argc, argv, "fprv")) != EOF) {
1092 
1093  switch (ch) {
1094  case 'f':
1095  background = 0;
1096  break;
1097  case 'p':
1098  break;
1099  case 'r':
1100  setprio = 1;
1101  break;
1102  case 'v':
1103  printf ("Corosync Cluster Engine, version '%s'\n", VERSION);
1104  printf ("Copyright (c) 2006-2009 Red Hat, Inc.\n");
1105  return EXIT_SUCCESS;
1106 
1107  break;
1108  default:
1109  fprintf(stderr, \
1110  "usage:\n"\
1111  " -f : Start application in foreground.\n"\
1112  " -p : Does nothing. \n"\
1113  " -r : Set round robin realtime scheduling \n"\
1114  " -v : Display version and SVN revision of Corosync and exit.\n");
1115  return EXIT_FAILURE;
1116  }
1117  }
1118 
1119  /*
1120  * Set round robin realtime scheduling with priority 99
1121  * Lock all memory to avoid page faults which may interrupt
1122  * application healthchecking
1123  */
1124  if (setprio) {
1125  corosync_setscheduler ();
1126  }
1127 
1128  corosync_mlockall ();
1129 
1130  /*
1131  * Other signals are registered later via qb_loop_signal_add
1132  */
1133  (void)signal (SIGSEGV, sigsegv_handler);
1134  (void)signal (SIGABRT, sigabrt_handler);
1135 #if MSG_NOSIGNAL != 0
1136  (void)signal (SIGPIPE, SIG_IGN);
1137 #endif
1138 
1139  if (icmap_init() != CS_OK) {
1140  log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't initialize configuration component.");
1142  }
1143  set_icmap_ro_keys_flag();
1144 
1145  /*
1146  * Initialize the corosync_api_v1 definition
1147  */
1148  api = apidef_get ();
1149 
1150  res = coroparse_configparse(icmap_get_global_map(), &error_string);
1151  if (res == -1) {
1152  log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
1154  }
1155 
1156  res = corosync_log_config_read (&error_string);
1157  if (res == -1) {
1158  /*
1159  * if we are here, we _must_ flush the logsys queue
1160  * and try to inform that we couldn't read the config.
1161  * this is a desperate attempt before certain death
1162  * and there is no guarantee that we can print to stderr
1163  * nor that logsys is sending the messages where we expect.
1164  */
1165  log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
1166  fprintf(stderr, "%s", error_string);
1167  syslog (LOGSYS_LEVEL_ERROR, "%s", error_string);
1169  }
1170 
1171  log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.", VERSION);
1172  log_printf (LOGSYS_LEVEL_INFO, "Corosync built-in features:" PACKAGE_FEATURES "");
1173 
1174  /*
1175  * Make sure required directory is present
1176  */
1177  sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR);
1178  res = stat (corosync_lib_dir, &stat_out);
1179  if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
1180  log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.", corosync_lib_dir);
1182  }
1183 
1184  res = totem_config_read (&totem_config, &error_string, &totem_config_warnings);
1185  if (res == -1) {
1186  log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
1188  }
1189 
1190  if (totem_config_warnings & TOTEM_CONFIG_WARNING_MEMBERS_IGNORED) {
1191  log_printf (LOGSYS_LEVEL_WARNING, "member section is used together with nodelist. Members ignored.");
1192  }
1193 
1194  if (totem_config_warnings & TOTEM_CONFIG_WARNING_MEMBERS_DEPRECATED) {
1195  log_printf (LOGSYS_LEVEL_WARNING, "member section is deprecated.");
1196  }
1197 
1198  if (totem_config_warnings & TOTEM_CONFIG_WARNING_TOTEM_NODEID_IGNORED) {
1199  log_printf (LOGSYS_LEVEL_WARNING, "nodeid appears both in totem section and nodelist. Nodelist one is used.");
1200  }
1201 
1202  if (totem_config_warnings != 0) {
1203  log_printf (LOGSYS_LEVEL_WARNING, "Please migrate config file to nodelist.");
1204  }
1205 
1206  res = totem_config_keyread (&totem_config, &error_string);
1207  if (res == -1) {
1208  log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
1210  }
1211 
1212  res = totem_config_validate (&totem_config, &error_string);
1213  if (res == -1) {
1214  log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
1216  }
1217 
1218  ip_version = totem_config.ip_version;
1219 
1221  totem_config.totem_logging_configuration.log_subsys_id = _logsys_subsys_create("TOTEM", "totem");
1228  totem_config.totem_logging_configuration.log_printf = _logsys_log_printf;
1230 
1231  /*
1232  * Now we are fully initialized.
1233  */
1234  if (background) {
1235  corosync_tty_detach ();
1236  }
1237 
1238  corosync_poll_handle = qb_loop_create ();
1239 
1240  memset(&scheduler_pause_timeout_data, 0, sizeof(scheduler_pause_timeout_data));
1241  scheduler_pause_timeout_data.totem_config = &totem_config;
1242  timer_function_scheduler_timeout (&scheduler_pause_timeout_data);
1243 
1244  qb_loop_signal_add(corosync_poll_handle, QB_LOOP_LOW,
1245  SIGUSR2, NULL, sig_diag_handler, NULL);
1246  qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH,
1247  SIGINT, NULL, sig_exit_handler, NULL);
1248  qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH,
1249  SIGQUIT, NULL, sig_exit_handler, NULL);
1250  qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH,
1251  SIGTERM, NULL, sig_exit_handler, NULL);
1252 
1253  if (logsys_thread_start() != 0) {
1254  log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize log thread");
1256  }
1257 
1258  if ((flock_err = corosync_flock (corosync_lock_file, getpid ())) != COROSYNC_DONE_EXIT) {
1259  corosync_exit_error (flock_err);
1260  }
1261 
1262  /*
1263  * if totempg_initialize doesn't have root priveleges, it cannot
1264  * bind to a specific interface. This only matters if
1265  * there is more then one interface in a system, so
1266  * in this case, only a warning is printed
1267  */
1268  /*
1269  * Join multicast group and setup delivery
1270  * and configuration change functions
1271  */
1273  corosync_poll_handle,
1274  &totem_config);
1275 
1277  main_service_ready);
1278 
1280  &corosync_group_handle,
1281  deliver_fn,
1282  confchg_fn);
1283 
1285  corosync_group_handle,
1286  &corosync_group,
1287  1);
1288 
1289  /*
1290  * Drop root privleges to user 'corosync'
1291  * TODO: Don't really need full root capabilities;
1292  * needed capabilities are:
1293  * CAP_NET_RAW (bindtodevice)
1294  * CAP_SYS_NICE (setscheduler)
1295  * CAP_IPC_LOCK (mlockall)
1296  */
1297  priv_drop ();
1298 
1299  schedwrk_init (
1300  serialize_lock,
1301  serialize_unlock);
1302 
1303  /*
1304  * Start main processing loop
1305  */
1306  qb_loop_run (corosync_poll_handle);
1307 
1308  /*
1309  * Exit was requested
1310  */
1311  totempg_finalize ();
1312 
1313  /*
1314  * free the loop resources
1315  */
1316  qb_loop_destroy (corosync_poll_handle);
1317 
1318  /*
1319  * free up the icmap
1320  */
1321 
1322  /*
1323  * Remove pid lock file
1324  */
1325  unlink (corosync_lock_file);
1326 
1328 
1329  return EXIT_SUCCESS;
1330 }