Source code for pyfixmsg.reference

"""
This module defines a python representation of a Fix Specification.
Most of the content of this module is opaque to the casual user. but is useful to customise
The typical use is to instantiate FixSpec with a file from
the spec according to one's requirements.

This module uses xml parsing logic intensively, so we recommend having lxml (lxml.de) installed
to speed it up. It will work with the python-shipped xml module as well, although will be slower.

Note::
   this module doesn't (yet) support hops as part of a message header (in FIX4.4 onwards)

"""

try:
    from lxml.etree import Comment, parse
except ImportError:
    from xml.etree.ElementTree import Comment, parse  # pylint: disable=C0411

# FIX50 onwards don't define the header & trailer. We still need something to order the tags if
# they end up on messages, so we'll use this instead. This doesn't support the Hops component
# for simplicity's sake for now.
HEADER_TAGS = [8, 9, 35, 1128, 1156, 1129, 49, 56, 115, 128,
               90, 91, 34, 50, 142, 57, 143, 116, 144, 129, 145,
               43, 97, 52, 122, 212, 213, 347, 369, 370, 1128, 1129]
TRAILER_TAGS = [93, 89, 10]
ENCODED_DATA_TAGS = [349, 351, 353, 355, 357, 359, 361, 363, 365]
HEADER_SORT_MAP = {t: i for i, t in enumerate(HEADER_TAGS)}
HEADER_SORT_MAP.update({10: int(10e9), 89: int(10e9-1), 93: int(10e9-2)})


[docs]class FixTag(object): """ Fix tag representation. A fix tag has name, tag (number), type and valid values (enum) """
[docs] def __init__(self, name, tag, tagtype=None, values=tuple()): """ :param name: Tag name :type name: ``str`` :param tag: Tag number :type tag: ``int`` :param tagtype: Type as in quickfix's XML reference documents :type tagtype: ``str`` :param values: The valid enum values :type values: ``tuple((str, str))`` with the first element of each tuple being the value of the enum, the second the name of the value. """ self.name = name self.tag = tag self.type = tagtype self._values = values self._val_by_name = {} self._val_by_val = {}
[docs] def add_enum_value(self, name, value): """Add a value to the tag's enum values.""" if name in set(v[1] for v in self._values): raise KeyError("Name {} is already known in tag {}'s enum".format(name, self.tag)) self._values = self._values + ((value, name),) if self._val_by_name: self._val_by_name[name] = value if self._val_by_val: self._val_by_val[value] = name
[docs] def del_enum_value(self, name=None, value=None): """Delete a value from the tag's enum values. Specify name or value using keyword arguments. If specifying both, they must both match the known name and value otherwise ValueError is raised. """ if name is None and value is None: raise TypeError("either name or value is required") if name and value: if self._val_by_name[name] != value: raise ValueError("The known value {} for enum name " "{} is different to you gave: {} for tag {}".format(self._val_by_name[name], name, value, self.tag)) if name: if name not in set(v[1] for v in self._values): # can't use the maps here because when deleting multiple tags they are empty come the second raise KeyError("{} is not known as a name for tag {}".format(name, self.tag)) self._values = tuple(pair for pair in self._values if pair[1] != name) else: if value not in set(v[0] for v in self._values): raise KeyError("{} is not known as a value for tag {}".format(value, self.tag)) self._values = tuple(pair for pair in self._values if pair[0] != value) self._val_by_name = {} self._val_by_val = {}
[docs] def enum_by_name(self, name): """ Retrieve an enum value by name""" if not self._val_by_name: self._val_by_name = {name: val for val, name in self._values} return self._val_by_name[name]
[docs] def enum_by_value(self, value): """ Retrieve an enum value by value""" if not self._val_by_val: self._val_by_val = {val: name for val, name in self._values} return self._val_by_val[value]
class TagsReference(object): """ Container for tags with maps by name and tag""" def __init__(self, tags, eager=False): """ :param tags: set of FixTag objects :param eager: whether to create the mapping by name and tag immediately or upon the first use of them """ self.tags = tags self._by_name = {} self._by_tag = {} if eager: try: self.by_name(None) except KeyError: pass try: self.by_tag(None) except KeyError: pass def add_tag(self, tag, name): """Add a tag to the list of valid tags""" tag_inst = FixTag(name=name, tag=tag) self.tags.add(tag_inst) if self._by_name: self._by_name[name] = tag_inst if self._by_tag: self._by_tag[tag] = tag_inst def by_name(self, name): """Retrieve a tag by name""" if not self._by_name: self._by_name = {t.name: t for t in self.tags} return self._by_name[name] def by_tag(self, tag): """ Retrieve a tag by number. """ if not self._by_tag: self._by_tag = {t.tag: t for t in self.tags} return self._by_tag[tag]
[docs]class FixSpec(object): """ A python-friendly representation of a FIX spec. This class is built from an XML file sourced from Quickfix (http://www.quickfixengine.org/). It contains the Message Types supported by the specification, as a map (FixSpec.msg_types) of message type value ('D', '6', '8', etc..) to MessageType class, and all the fields supported in the spec as a TagReference instance (FixSpec.tags) which can be accessed by tag name or number. """
[docs] def __init__(self, xml_file, eager=False): """ :param xml_file: path to a quickfix specification xml file :type xml_file: ``str`` :param eager: whether to eagerly populate tags maps for speedy lookup or only on first access :type eager: ``bool`` """ self.source = xml_file self.tree = parse(xml_file).getroot() major = self.tree.get('major') minor = self.tree.get('minor') self.version = "FIX{}.{}".format(major, minor) self._eager = eager self.tags = None self._populate_tags() self.msg_types = {m.msgtype: m for m in (MessageType(e, self) for e in self.tree.findall('messages/message'))} # We need to be able to look msg type for both decoded and raw values of tag 35 msg_type_list = list(self.msg_types.items()) self.msg_types.update( {key.encode('ascii'): val for key, val in msg_type_list}) self.header_tags = [self.tags.by_name(t.get('name')) for t in self.tree.findall('header/field')] self.trailer_tags = [self.tags.by_name(t.get('name')) for t in self.tree.findall('trailer/field')] self.tree = None
def _populate_tags(self): """populate the TagReference from the xml file""" tags = set() for field in self.tree.findall('fields/field'): enums = tuple(e.get('enum') for e in field.findall('value')) descriptions = tuple(e.get('description') for e in field.findall('value')) values = tuple(zip(enums, descriptions)) tag = FixTag(field.get('name'), int(field.get('number')), field.get('type'), values) tags.add(tag) self.tags = TagsReference(tags, self._eager)
def _extract_composition(element, spec): """ Parse XML spec to extract the composition of a nested structure (Component, Group or MsgType) """ returned = [] for elem in list(element): if elem.tag == "field": returned.append((spec.tags.by_name(elem.get('name')), elem.get('required') == "Y")) elif elem.tag == 'component': returned.append((Component(elem, spec), elem.get('required') == "Y")) elif elem.tag == 'group': returned.append((Group.from_element(elem, spec), elem.get('required') == "Y")) elif (parse.__module__ == 'lxml.etree') and (elem.tag == Comment): pass else: raise ValueError("Could not process element '{}'".format(elem.tag)) return returned def _get_groups(composition): """ Recursively extract groups from a composition""" for item, _ in composition: if isinstance(item, Group): yield item elif isinstance(item, Component): for group in _get_groups(item.composition): yield group class Group(object): """ Representation of the specification of a Repeating Group. """ def __init__(self, count_tag, composition, spec): """ :param count_tag: A FixTag object representing a repeating group. Must correspond to the tag name for its count tag in the spec. :type count_tag: ``FixTag`` :param composition: the xml elements representing a fix tag, a component or a group :type composition: ``list`` of ``etree.Element`` :param spec: the (partially populated) specification, containing at least the components and tags. :type spec: ``FixSpec`` """ self.composition = composition self.count_tag = count_tag self.name = count_tag.name self.tags = set(t[0].tag for t in self.composition if isinstance(t[0], FixTag)) self.groups = {group.count_tag.tag: group for group in _get_groups(self.composition)} self._sorting_key = None self._spec = spec @property def sorting_key(self): if not self._sorting_key: self._sorting_key = _extract_sorting_key(self.composition, self._spec) return self._sorting_key @classmethod def from_element(cls, element, spec): """Build the group from an lxml etree element :param element: the xml element representing the group :type element: ``etree.Element`` :param spec: a :py:class:`FixSpec` describing the element :type spec: ``FixSpec`` """ return cls(count_tag=spec.tags.by_name(element.get('name')), composition=_extract_composition(element, spec), spec=spec) def add_group(self, count_tag, composition, insert_at=None): """Add a synthetic group to this group. You may need to add it to the relevant message type as well!""" group = Group(count_tag=count_tag, composition=composition, spec=self._spec) self.groups[count_tag.tag] = group if insert_at: self.sorting_key[count_tag.tag] = insert_at # Will sort by tag number after the sorted tags otherwise class Component(object): """Representation of the specification of a Component""" def __init__(self, element, spec): """ :param element: the xml element representing a component :type element: ``etree.Element`` :param spec: the (partially populated) specification, containing at least the groups and tags """ self.name = element.get('name') elem = spec.tree.findall("components/component[@name='{}']".format(self.name))[0] self.composition = _extract_composition(elem, spec) self._sorting_key = None self._spec = spec @property def sorting_key(self): if not self._sorting_key: self._sorting_key = _extract_sorting_key(self.composition, self._spec) return self._sorting_key class MessageType(object): """ Message Type representation. Contains the valid tags, their order, valid repeating groups, components etc. """ def __init__(self, element, spec): """ :param element: the xml element representing a message type :type element: ``etree.Element`` :param spec: the (partially populated) specification, containing at least the tags. :type spec: ``FixSpec`` """ assert element.tag == "message" self.msgtype = element.get('msgtype') self.name = element.get('name') self.composition = _extract_composition(element, spec) self.groups = {group.count_tag.tag: group for group in _get_groups(self.composition)} self._sorting_key = None self._spec = spec @property def sorting_key(self): if not self._sorting_key: self._sorting_key = _extract_sorting_key(self.composition, self._spec) return self._sorting_key def add_group(self, count_tag, composition, insert_at=None): """Add a synthetic group to this msg type. You may need to add it to the relevant message type as well!""" group = Group(count_tag=count_tag, composition=composition, spec=self._spec) self.groups[count_tag.tag] = group if insert_at: self.sorting_key[count_tag.tag] = insert_at # Will sort by tag number after the sorted tags otherwise def _extract_sorting_key(definition, spec, sorting_key=None, index=0): """ Retrieve the sorting key for an object. The sorting key is used to serialise tags in the order they appear in the spec. It is unclear whether that's required for the root of the message (aside from header and tail) but it is essential in repeating groups. This takes the safe approach of enforcing it at all levels. """ if sorting_key is None: sorting_key = {35: 0, 10: int(10e9)} trailer_tags = [item.tag for item in spec.trailer_tags] or TRAILER_TAGS for index, item in enumerate(trailer_tags[::-1]): sorting_key[item] = 10e9 - index header_tags = [item.tag for item in spec.header_tags] or HEADER_TAGS for index, item in enumerate(header_tags): sorting_key[item] = index start_index = index + 1 for index, (item, _) in enumerate(definition): if isinstance(item, FixTag): sorting_key[item.tag] = index + start_index elif isinstance(item, Component): _extract_sorting_key(item.composition, spec, sorting_key, index=index + start_index) elif isinstance(item, Group): sorting_key[item.count_tag.tag] = index + start_index return sorting_key