38fa914eafa11fef98eb3ec976dea024d30fe75a
[project/omcproxy.git] / src / groups.c
1 /*
2 * Author: Steven Barth <steven at midlink.org>
3 *
4 * Copyright 2015 Deutsche Telekom AG
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
19
20 #include <string.h>
21 #include <stdlib.h>
22 #include <errno.h>
23 #include "groups.h"
24
25
26 // Group comparator for AVL-tree
27 static int compare_groups(const void *k1, const void *k2, __unused void *ptr)
28 {
29 return memcmp(k1, k2, sizeof(struct in6_addr));
30 }
31
32 // Remove a source-definition for a group
33 static void querier_remove_source(struct group *group, struct group_source *source)
34 {
35 --group->source_count;
36 list_del(&source->head);
37 free(source);
38 }
39
40 // Clear all sources of a certain group
41 static void querier_clear_sources(struct group *group)
42 {
43 struct group_source *s, *n;
44 list_for_each_entry_safe(s, n, &group->sources, head)
45 querier_remove_source(group, s);
46 }
47
48 // Remove a group and all associated sources from the group state
49 static void querier_remove_group(struct groups *groups, struct group *group, omgp_time_t now)
50 {
51 querier_clear_sources(group);
52 group->exclude_until = 0;
53
54 if (groups->cb_update)
55 groups->cb_update(groups, group, now);
56
57 avl_delete(&groups->groups, &group->node);
58 free(group);
59 }
60
61 // Expire a group and / or its associated sources depending on the current time
62 static omgp_time_t expire_group(struct groups *groups, struct group *group,
63 omgp_time_t now, omgp_time_t next_event)
64 {
65 struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
66 omgp_time_t llqi = now + cfg->last_listener_query_interval;
67 omgp_time_t llqt = now + (cfg->last_listener_query_interval * cfg->last_listener_query_count);
68
69 // Handle group and source-specific query retransmission
70 struct list_head suppressed = LIST_HEAD_INIT(suppressed);
71 struct list_head unsuppressed = LIST_HEAD_INIT(unsuppressed);
72 struct group_source *s, *s2;
73
74 if (group->next_source_transmit > 0 && group->next_source_transmit <= now) {
75 group->next_source_transmit = 0;
76
77 list_for_each_entry_safe(s, s2, &group->sources, head) {
78 if (s->retransmit > 0) {
79 list_move_tail(&s->head, (s->include_until > llqt) ? &suppressed : &unsuppressed);
80 --s->retransmit;
81 }
82
83 if (s->retransmit > 0)
84 group->next_source_transmit = llqi;
85 }
86 }
87
88 if (group->next_source_transmit > 0 && group->next_source_transmit < next_event)
89 next_event = group->next_source_transmit;
90
91 // Handle group-specific query retransmission
92 if (group->retransmit > 0 && group->next_generic_transmit <= now) {
93 group->next_generic_transmit = 0;
94
95 if (groups->cb_query)
96 groups->cb_query(groups, &group->addr, NULL, group->exclude_until > llqt);
97
98 --group->retransmit;
99
100 if (group->retransmit > 0)
101 group->next_generic_transmit = llqi;
102
103 // Skip suppresed source-specific query (RFC 3810 7.6.3.2)
104 list_splice_init(&suppressed, &group->sources);
105 }
106
107 if (group->next_generic_transmit > 0 && group->next_generic_transmit < next_event)
108 next_event = group->next_generic_transmit;
109
110 if (!list_empty(&suppressed)) {
111 if (groups->cb_query)
112 groups->cb_query(groups, &group->addr, &suppressed, true);
113
114 list_splice(&suppressed, &group->sources);
115 }
116
117 if (!list_empty(&unsuppressed)) {
118 if (groups->cb_query)
119 groups->cb_query(groups, &group->addr, &unsuppressed, false);
120
121 list_splice(&unsuppressed, &group->sources);
122 }
123
124 // Handle source and group expiry
125 bool changed = false;
126 if (group->exclude_until > 0) {
127 if (group_is_included(group, now)) {
128 // Leaving exclude mode
129 group->exclude_until = 0;
130 changed = true;
131 } else if (group->exclude_until < next_event) {
132 next_event = group->exclude_until;
133 }
134 }
135
136 list_for_each_entry_safe(s, s2, &group->sources, head) {
137 if (s->include_until > 0) {
138 if (!source_is_included(s, now)) {
139 s->include_until = 0;
140 changed = true;
141 } else if (s->include_until < next_event) {
142 next_event = s->include_until;
143 }
144 }
145
146 if (group->exclude_until == 0 && s->include_until == 0)
147 querier_remove_source(group, s);
148 }
149
150 if (group->exclude_until == 0 && group->source_count == 0)
151 querier_remove_group(groups, group, now);
152 else if (changed && groups->cb_update)
153 groups->cb_update(groups, group, now);
154
155 return next_event;
156 }
157
158 // Rearm the global groups-timer if the next event is before timer expiration
159 static void rearm_timer(struct groups *groups, int msecs)
160 {
161 int remain = uloop_timeout_remaining(&groups->timer);
162 if (remain < 0 || remain >= msecs)
163 uloop_timeout_set(&groups->timer, msecs);
164 }
165
166 // Expire all groups of a group-state (called by timer as callback)
167 static void expire_groups(struct uloop_timeout *t)
168 {
169 struct groups *groups = container_of(t, struct groups, timer);
170 omgp_time_t now = omgp_time();
171 omgp_time_t next_event = now + 3600 * OMGP_TIME_PER_SECOND;
172
173 struct group *group, *n;
174 avl_for_each_element_safe(&groups->groups, group, node, n)
175 next_event = expire_group(groups, group, now, next_event);
176
177 rearm_timer(groups, (next_event > now) ? next_event - now : 0);
178 }
179
180 // Initialize a group-state
181 void groups_init(struct groups *groups)
182 {
183 avl_init(&groups->groups, compare_groups, false, NULL);
184 groups->timer.cb = expire_groups;
185
186 groups_update_config(groups, false, OMGP_TIME_PER_SECOND / 10,
187 125 * OMGP_TIME_PER_SECOND, 2);
188 groups_update_config(groups, true, OMGP_TIME_PER_SECOND / 10,
189 125 * OMGP_TIME_PER_SECOND, 2);
190 }
191
192 // Cleanup a group-state
193 void groups_deinit(struct groups *groups)
194 {
195 omgp_time_t now = omgp_time();
196 struct group *group, *safe;
197 avl_for_each_element_safe(&groups->groups, group, node, safe)
198 querier_remove_group(groups, group, now);
199 uloop_timeout_cancel(&groups->timer);
200 }
201
202 // Get group-object for a given group, create if requested
203 static struct group* groups_get_group(struct groups *groups,
204 const struct in6_addr *addr, bool *created)
205 {
206 struct group *group = avl_find_element(&groups->groups, addr, group, node);
207 if (!group && created && (group = calloc(1, sizeof(*group)))) {
208 group->addr = *addr;
209 group->node.key = &group->addr;
210 avl_insert(&groups->groups, &group->node);
211
212 INIT_LIST_HEAD(&group->sources);
213 *created = true;
214 } else if (created) {
215 *created = false;
216 }
217 return group;
218 }
219
220 // Get source-object for a given source, create if requested
221 static struct group_source* groups_get_source(struct groups *groups,
222 struct group *group, const struct in6_addr *addr, bool *created)
223 {
224 struct group_source *c, *source = NULL;
225 group_for_each_source(c, group)
226 if (IN6_ARE_ADDR_EQUAL(&c->addr, addr))
227 source = c;
228
229 if (!source && created && group->source_count < groups->source_limit &&
230 (source = calloc(1, sizeof(*source)))) {
231 source->addr = *addr;
232 list_add_tail(&source->head, &group->sources);
233 ++group->source_count;
234 *created = true;
235 } else if (created) {
236 *created = false;
237 }
238
239 return source;
240 }
241
242 // Update the IGMP/MLD timers of a group-state
243 void groups_update_config(struct groups *groups, bool v6,
244 omgp_time_t query_response_interval, omgp_time_t query_interval, int robustness)
245 {
246 struct groups_config *cfg = v6 ? &groups->cfg_v6 : &groups->cfg_v4;
247 cfg->query_response_interval = query_response_interval;
248 cfg->query_interval = query_interval;
249 cfg->robustness = robustness;
250 cfg->last_listener_query_count = cfg->robustness;
251 cfg->last_listener_query_interval = 1 * OMGP_TIME_PER_SECOND;
252 }
253
254 // Update timers for a given group (called when receiving queries from other queriers)
255 void groups_update_timers(struct groups *groups,
256 const struct in6_addr *groupaddr,
257 const struct in6_addr *addrs, size_t len)
258 {
259 char addrbuf[INET6_ADDRSTRLEN];
260 inet_ntop(AF_INET6, groupaddr, addrbuf, sizeof(addrbuf));
261 struct group *group = groups_get_group(groups, groupaddr, NULL);
262 if (!group) {
263 L_WARN("%s: failed to update timer: no such group %s", __FUNCTION__, addrbuf);
264 return;
265 }
266
267 struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
268 omgp_time_t now = omgp_time();
269 omgp_time_t llqt = now + (cfg->last_listener_query_count * cfg->last_listener_query_interval);
270
271 if (len == 0) {
272 if (group->exclude_until > llqt)
273 group->exclude_until = llqt;
274 } else {
275 for (size_t i = 0; i < len; ++i) {
276 struct group_source *source = groups_get_source(groups, group, &addrs[i], NULL);
277 if (!source) {
278 L_WARN("%s: failed to update timer: unknown sources for group %s", __FUNCTION__, addrbuf);
279 continue;
280 }
281
282 if (source->include_until > llqt)
283 source->include_until = llqt;
284 }
285 }
286
287 rearm_timer(groups, llqt - now);
288 }
289
290 // Update state of a given group (on reception of node's IGMP/MLD packets)
291 void groups_update_state(struct groups *groups,
292 const struct in6_addr *groupaddr,
293 const struct in6_addr *addrs, size_t len,
294 enum groups_update update)
295 {
296 bool created = false, changed = false;
297 char addrbuf[INET6_ADDRSTRLEN];
298 inet_ntop(AF_INET6, groupaddr, addrbuf, sizeof(addrbuf));
299 L_DEBUG("%s: %s (+%d sources) => %d", __FUNCTION__, addrbuf, (int)len, update);
300
301 struct group *group = groups_get_group(groups, groupaddr, &created);
302 if (!group) {
303 L_ERR("querier_state: failed to allocate group for %s", addrbuf);
304 return;
305 }
306
307 if (created)
308 changed = true;
309
310 omgp_time_t now = omgp_time();
311 omgp_time_t next_event = OMGP_TIME_MAX;
312 struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
313
314 // Backwards compatibility modes
315 if (group->compat_v2_until > now || group->compat_v1_until > now) {
316 if (update == UPDATE_BLOCK)
317 return;
318
319 if (group->compat_v1_until > now && (update == UPDATE_DONE || update == UPDATE_TO_IN))
320 return;
321
322 if (update == UPDATE_TO_EX)
323 len = 0;
324 }
325
326 if (update == UPDATE_REPORT || update == UPDATE_REPORT_V1 || update == UPDATE_DONE) {
327 omgp_time_t compat_until = now + cfg->query_response_interval +
328 (cfg->robustness * cfg->query_interval);
329
330 if (update == UPDATE_REPORT_V1)
331 group->compat_v1_until = compat_until;
332 else if (update == UPDATE_REPORT)
333 group->compat_v2_until = compat_until;
334
335 update = (update == UPDATE_DONE) ? UPDATE_TO_IN : UPDATE_IS_EXCLUDE;
336 len = 0;
337 }
338
339 bool include = group->exclude_until <= now;
340 bool is_include = update == UPDATE_IS_INCLUDE || update == UPDATE_TO_IN || update == UPDATE_ALLOW;
341
342 int llqc = cfg->last_listener_query_count;
343 omgp_time_t mali = now + (cfg->robustness * cfg->query_interval) + cfg->query_response_interval;
344 omgp_time_t llqt = now + (cfg->last_listener_query_interval * llqc);
345
346 // RFC 3810 7.4
347 struct list_head saved = LIST_HEAD_INIT(saved);
348 struct list_head queried = LIST_HEAD_INIT(queried);
349 for (size_t i = 0; i < len; ++i) {
350 bool *create = (include && update == UPDATE_BLOCK) ? NULL : &created;
351 struct group_source *source = groups_get_source(groups, group, &addrs[i], create);
352
353 if (include && update == UPDATE_BLOCK) {
354 if (source)
355 list_move_tail(&source->head, &queried);
356 } else {
357 bool query = false;
358 if (!source) {
359 groups_update_state(groups, groupaddr, NULL, 0, false);
360 L_WARN("querier: failed to allocate source for %s, fallback to ASM", addrbuf);
361 return;
362 }
363
364 if (created)
365 changed = true;
366 else if (include && update == UPDATE_TO_EX)
367 query = true;
368
369 if (source->include_until <= now && update == UPDATE_SET_IN) {
370 source->include_until = mali;
371 changed = true;
372 } else if (source->include_until > now && update == UPDATE_SET_EX) {
373 source->include_until = now;
374 changed = true;
375 }
376
377 if (!include && (update == UPDATE_BLOCK || update == UPDATE_TO_EX) &&
378 (created || source->include_until > now))
379 query = true;
380
381 if ((is_include || (!include && created))) {
382 if (source->include_until <= now)
383 changed = true;
384
385 source->include_until = (is_include || update == UPDATE_IS_EXCLUDE)
386 ? mali : group->exclude_until;
387
388 if (next_event > mali)
389 next_event = mali;
390 }
391
392 if (query)
393 list_move_tail(&source->head, &queried);
394 else if (update == UPDATE_IS_EXCLUDE || update == UPDATE_TO_EX ||
395 update == UPDATE_SET_EX || update == UPDATE_SET_IN)
396 list_move_tail(&source->head, &saved);
397 }
398 }
399
400 if (update == UPDATE_IS_EXCLUDE || update == UPDATE_TO_EX || update == UPDATE_SET_EX) {
401 if (include || !list_empty(&group->sources))
402 changed = true;
403
404 querier_clear_sources(group);
405 list_splice(&saved, &group->sources);
406 group->exclude_until = mali;
407
408 if (next_event > mali)
409 next_event = mali;
410 }
411
412 if (update == UPDATE_SET_IN) {
413 if (!include || !list_empty(&group->sources)) {
414 changed = true;
415 next_event = now;
416 }
417
418 querier_clear_sources(group);
419 list_splice(&saved, &group->sources);
420 group->exclude_until = now;
421 }
422
423 // Prepare queries
424 if (update == UPDATE_TO_IN) {
425 struct group_source *source, *n;
426 list_for_each_entry_safe(source, n, &group->sources, head) {
427 if (source->include_until <= now)
428 continue;
429
430 size_t i;
431 for (i = 0; i < len && !IN6_ARE_ADDR_EQUAL(&source->addr, &addrs[i]); ++i);
432 if (i == len)
433 list_move_tail(&source->head, &queried);
434 }
435 }
436
437 if (!list_empty(&queried)) {
438 struct group_source *source;
439 list_for_each_entry(source, &queried, head) {
440 if (source->include_until > llqt)
441 source->include_until = llqt;
442
443 group->next_source_transmit = now;
444 source->retransmit = llqc;
445 }
446
447 next_event = now;
448 list_splice(&queried, &group->sources);
449 }
450
451 if (!include && update == UPDATE_TO_IN) {
452 if (group->exclude_until > llqt)
453 group->exclude_until = llqt;
454
455 group->next_generic_transmit = now;
456 group->retransmit = llqc;
457 next_event = now;
458 }
459
460 if (changed && groups->cb_update)
461 groups->cb_update(groups, group, now);
462
463 if (group_is_included(group, now) && group->source_count == 0)
464 next_event = now;
465
466 if (next_event < OMGP_TIME_MAX)
467 rearm_timer(groups, next_event - now);
468
469 if (changed)
470 L_DEBUG("%s: %s => %s (+%d sources)", __FUNCTION__, addrbuf,
471 (group_is_included(group, now)) ? "included" : "excluded",
472 (int)group->source_count);
473
474 }
475
476 // Get group object of a given group
477 const struct group* groups_get(struct groups *groups, const struct in6_addr *addr)
478 {
479 return groups_get_group(groups, addr, NULL);
480 }
481
482 // Test if a group (and source) is requested in the current group state
483 // (i.e. for deciding if it should be routed / forwarded)
484 bool groups_includes_group(struct groups *groups, const struct in6_addr *addr,
485 const struct in6_addr *src, omgp_time_t time)
486 {
487 struct group *group = groups_get_group(groups, addr, NULL);
488 if (group) {
489 if (!src && (!group_is_included(group, time) || group->source_count > 0))
490 return true;
491
492 struct group_source *source = groups_get_source(groups, group, src, NULL);
493 if ((!group_is_included(group, time) && (!source || source_is_included(source, time))) ||
494 (group_is_included(group, time) && source && source_is_included(source, time)))
495 return true;
496 }
497 return false;
498 }