CDB two-page subscription test case fails after upgrading to 7.8.3 from 7.3

Hi,

I have a unit test that verifies TwoPhaseSubscriber prepare method call, which works fine using 7.3 ConfD version and which fails when 7.8.3 version is used.

Here is the unit test:

import unittest

import confd
import confd.maagic as maagic
import confd.maapi as maapi

from confd.cdb import TwoPhaseSubscriber


class ConfDTestCase(unittest.TestCase):

    init_data = False


    @classmethod
    def setUpClass(cls):
        cls.confd = ConfD(cls.init_data)
        cls.confd.start()

        # ensure schemas are loaded for tests not using maapi
        maapi.Maapi().load_schemas()

    @classmethod
    def tearDownClass(cls):
        cls.confd.stop()
        cls.confd.cleanup()


    def setUp(self):
        self.iter_obj = SubscriptionIterator()
        # set up subscriber
        self.subscriber = TwoPhaseSubscriber('Unittest subscriber')
        self.subscriber.register('/test-model:root', iter_obj=self.iter_obj,
                                 iter_flags=confd.ITER_WANT_REVERSE | confd.ITER_WANT_SCHEMA_ORDER)
        self.subscriber.start()

    def tearDown(self):
        # destroy subscriber
        self.subscriber.stop()


    def test_prepare(self):
        with maapi.single_write_trans('admin', 'system') as trans:
            resource = maagic.get_root(trans).test_model__root.test_list
            item = resource.create('item2')
            test_item1 = item.list_in_list.create('test_item1')
            _ = item.list_in_list.create('test_item2')

            test_item1.list_2_keys.create('item1_key1', 'item1_key2')

            trans.apply()
        self.assertEqual(
            self.iter_obj.prepare_change[0].obj, 
            {'key-name': 'item2', 'list-in-list': [{'key-name': 'test_item1', 'list-2-keys': [{'key1': 'item1_key1', 'key2': 'item1_key2'}]}, {'key-name': 'test_item2', 'list-2-keys': []}]})




class ConfigChange(object):
    def __init__(self, kp, op, obj):
        self.kp = kp
        self.op = op
        self.obj = obj


class SubscriptionIterator(object):
    def __init__(self):
        self.prepare_change = []

    def _apply(self, kp, op, name):
        with maapi.single_write_trans('admin', 'system') as trans:
            if op == confd.MOP_DELETED:
                obj = None
            else:
                node = maagic.get_node(trans, str(kp)) # <---------- FAILES here with KeyError
                obj = maagic_to_yang(node, False)

            return ConfigChange(kp, op, obj)

    def prepare(self, kp, op, oldv, newv, state):
        name = str(kp[1])

        self.prepare_change.append(self._apply(kp, op, name))

        # no need to recurse resource any deeper
        return confd.ITER_CONTINUE

    def abort_build(self, kp, op, oldv, newv, state):
        return confd.ITER_RECURSE

    def abort(self, kp, op, oldv, newv, state):
        return confd.ITER_CONTINUE

    def iterate(self, kp, op, oldv, newv, state):
        pass

def maagic_to_yang(mobj, include_oper=False):
    '''Convert maagic object to dictionary with YANG names'''
    pass

Here is the stack trace the test fails with


=====================================================================
ERROR [0.092s]: test_prepare (test_subscriber.ConfDTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/shared/confd/tests/test_subscriber.py", line 181, in test_prepare
    trans.apply()
  File "/root/confd/src/confd/pyapi/confd/maapi.py", line 1885, in apply
    self.maapi.apply_trans_flags(self.th, keep_open, flags)
  File "/root/confd/src/confd/pyapi/confd/maapi.py", line 328, in proxy
    return real(self2.msock, *args, **kwargs)
_confd.error.Error: external error (19): application communication failure

Stdout:

Stderr:
DEBUG:test:Starting subscriber
DEBUG:test:Waiting for subscription event
ERROR:test:'{item2} not in /test-model:root/test-list'
ERROR:test:Traceback (most recent call last):
  File "/root/confd/src/confd/pyapi/confd/cdb.py", line 397, in _read_sub_socket
    cdb.diff_iterate(
  File "/shared/confd/tests/test_subscriber.py", line 84, in prepare
    self.prepare_change.append(self._apply(kp, op, name))
  File "/shared/confd/tests/test_subscriber.py", line 76, in _apply
    node = maagic.get_node(trans, str(kp))
  File "/root/confd/src/confd/pyapi/confd/maagic.py", line 2084, in get_node
    return cd(root, path)
  File "/root/confd/src/confd/pyapi/confd/maagic.py", line 1975, in cd
    node = node[child]
  File "/root/confd/src/confd/pyapi/confd/maagic.py", line 1167, in __getitem__
    raise KeyError('%s not in %s' % (keystr, self._path))
KeyError: '{item2} not in /test-model:root/test-list'


ERROR:test:'KeyError' object has no attribute 'message'
ERROR:test:Traceback (most recent call last):
  File "/root/confd/src/confd/pyapi/confd/cdb.py", line 397, in _read_sub_socket
    cdb.diff_iterate(
  File "/shared/confd/tests/test_subscriber.py", line 84, in prepare
    self.prepare_change.append(self._apply(kp, op, name))
  File "/shared/confd/tests/test_subscriber.py", line 76, in _apply
    node = maagic.get_node(trans, path)
  File "/root/confd/src/confd/pyapi/confd/maagic.py", line 2084, in get_node
    return cd(root, path)
  File "/root/confd/src/confd/pyapi/confd/maagic.py", line 1975, in cd
    node = node[child]
  File "/root/confd/src/confd/pyapi/confd/maagic.py", line 1167, in __getitem__
    raise KeyError('%s not in %s' % (keystr, self._path))
KeyError: '{item2} not in /test-model:root/test-list'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/root/confd/src/confd/pyapi/confd/cdb.py", line 211, in run
    if not self._read_sub_socket():
  File "/root/confd/src/confd/pyapi/confd/cdb.py", line 402, in _read_sub_socket
    message = e.message or 'aborted'
AttributeError: 'KeyError' object has no attribute 'message'

So here it complains KeyError has no attribute message, but it is not a root issue.

The issue is that it fails to get the node it receives notification for, which looks obvious, since the data is not yet on the CDB tree, since it is a first phase of the transaction and the data will be commited after commit phase which is second phase, correct?

But why it was working fine with Confd 7.3?

I am not able to track that change in behavior to a record in the CHANGES file, but yes, I agree that the current behavior looks correct. But more importantly, reading the old or the new configuration in the prepare phase should be done via cdb - you can start session towards PRE_COMMIT_RUNNING for the old configuration or towards RUNNING for the new configuration. Not only this is documented and supported, it actually might even have better performance. (You are losing maagic though, or you would have to implement a cdb session-based transaction-like object to be used instead of the MAAPI transaction.)

Thanks for the reply @mvf

I’ve tried starting a session from the prepare call-back and got

  File "/shared/confd/tests/test_subscriber.py", line 28, in _apply
    with _confd.cdb.start_session(self.sock, confd.RUNNING) as session:
_confd.error.EOF: ConfD closed connection

So I’ve changed creating transaction from

    def _apply(self, kp, op, name):
        with maapi.single_write_trans('admin', 'system') as trans:
            if op == confd.MOP_DELETED:
                obj = None
            else:
                node = maagic.get_node(trans, str(kp))
                obj = maagic_to_yang(node, False)

            return ConfigChange(kp, op, obj)

to

    def _apply(self, kp, op, name):
        with _confd.cdb.start_session(self.sock, confd.RUNNING) as session:  <---------- FAILES here with KeyError
            if op == confd.MOP_DELETED:
                obj = None
            else:
                node = session.get_value(str(kp)) #
                obj = maagic_to_yang(node, False)

            return ConfigChange(kp, op, obj)

and it fails with

Traceback (most recent call last):
  File "/shared/confd/subscribers.py", line 45, in _read_sub_socket
    cdb.diff_iterate(self.sock, point, cb.prepare, flags, state)
  File "/shared/confd/tests/test_subscriber.py", line 65, in prepare
    self.prepare_change.append(self._apply(kp, op, name, []))
  File "/shared/confd/tests/test_subscriber.py", line 28, in _apply
    with _confd.cdb.start_session(self.sock, confd.RUNNING) as session:
_confd.error.EOF: ConfD closed connection

The sock instance I’m passing is the one attached from my custom _read_sub_socket implementation of TwoPhaseSubscriber class

def _read_sub_socker(self):
    ...
    try:
                    cb.sock = self.sock
                    cdb.diff_iterate(self.sock, point, cb.prepare, flags, state)
                except Exception as e:
                    self._log.error('Subscriber: diff iterate error', exc_info=e, extra={'point': point})
                    message = str(e) or 'aborted'
    ...

Trying to get more details, but I am not sure I’m using CDB API in correct way. Is it actually allowed to open CDB session in prepare call of a TwoPhaseSubscriber?

You cannot use a SUBSCRIPTION socket (that’s what you are using for stuff like cdb.read_subscription_socket or cdb.diff_iterate) for CDB sessions, for that you need a DATA socket. Moreover, it is generally not a good idea to reuse a socket for two operations concurrently/at the same time - note that cdb.diff_iterate has not completed yet when you are trying to start the session.