"""This codec implements a simpler repeating group logic where the first tag is seen as a marker
for repetition in a repeating group (rather than relying on the order of the tags) """
import re
import warnings
from collections import deque
import six
from pyfixmsg import RepeatingGroup
from pyfixmsg.util import int_or_str
from pyfixmsg.reference import HEADER_TAGS, HEADER_SORT_MAP, ENCODED_DATA_TAGS
SEPARATOR = '\1'
"""
Standard separator for the StringFIX codec between tag value pairs.
However, the codec supports an arbitrary separator as using | or ; or "<SOH>" is
common.
"""
DELIMITER = '='
FIX_REGEX_STRING = r'([^{s}{d}]*)[{d}](.*?){s}(?!\w+{s})'
FIX_REGEX = re.compile(FIX_REGEX_STRING.format(d=DELIMITER, s=SEPARATOR), re.DOTALL)
MICROSECONDS = 0
MILLISECONDS = 1
HEADER_TAGS_SET = {str(tag) for tag in HEADER_TAGS}
ENCODED_TAG_SET = {str(tag) for tag in ENCODED_DATA_TAGS}
[docs]class Codec(object):
"""
FIX codec. Initialise with a :py:class:`~pyfixmsg.reference.FixSpec` to support
repeating groups.
This class is used to transform the serialised FIX message into an instance of ``fragment_class``, default ``dict``
Tags are assumed to be all of type ``int``, repeating groups are lists of ``fragment_class``
Values can either bytes or unicode or a mix thereof, depending on the constructor arguments.
"""
[docs] def __init__(self, spec=None, no_groups=False, fragment_class=dict, decode_as=None, decode_all_as_347=False):
"""
:param spec: the :py:class:`~pyfixmsg.reference.FixSpec` instance to use to parse messages.
If spec is not defined repeating groups will not be parsed correctly, and the logic to handle encoded
tags will not be functional.
:param no_groups: set to ``True`` to ignore repeating groups
:param fragment_class: Which dict-like object to return when parsing messages. Also sets the type of
members of repeating groups
:param decode_as: what encoding to decode all tags. Defaults to None, which returns the raw byte strings.
setting to a non-None value makes both non-numerical tags and values to be unicode,
using this value for decode.
:param decode_all_as_347: whether to trust tag 347 to decode all other tags or only the Encoded* ones.
If set to False, use 347 normally for Encoded* tags, respect ``decode_as`` for all other tags. If 347 is
not present on the message, the values are left encoded.
"""
self.encoding = decode_as
self.decode_all_as_347 = decode_all_as_347
self.spec = spec
if spec is None:
self._no_groups = True
else:
self._no_groups = no_groups
self._frg_class = fragment_class
[docs] def parse(self, buff, delimiter=DELIMITER, separator=SEPARATOR):
"""
Parse a FIX message. The FIX message is expected to be a bytestring and the output
is a dictionary-like object which type is determined by the ``fragment_class`` constructor argument
and which keys are ``int`` and values ``unicode``. Note that if there is a non-int tag in the message,
this will be stored as a key in the original format (i.e. bytestring)
:param buff: Buffer to parse
:type buff: ``bytestr`` or ``unicode``
:param delimiter: A character that separate key and values inside the FIX message. Generally '='. Note the type:
because of the way the buffer is tokenised, this needs to be unicode (or ``str`` in python 2.7*).
:type delimiter: ``unicode``
:param separator: A character that separate key+value pairs inside the FIX message. Generally '\1'. See type
observations above.
:type separator: ``unicode``
"""
def pushback_generator(iterator):
"""
Generator which allows to push back a previously picked item
for example:
gen = pushback_generator(range(10))
print next(gen)
print next(gen)
v = next(gen)
print v
gen.send(v)
print next(gen)
:param iterator:
:return:
"""
for value in iterator:
back = yield value
if back is not None:
yield back
yield back
assert not (delimiter.isalnum() or separator.isalnum())
encoding, encoding_347 = self.encoding, None
input_in_unicode = False
msg_type = None
if isinstance(buff, six.text_type):
input_in_unicode = True
custom_r = re.compile(six.ensure_text(FIX_REGEX_STRING.format(
d=re.escape(delimiter), s=re.escape(separator)), encoding='ascii'), re.DOTALL)
if self.encoding is not None:
encoding = None # No need to decode
warnings.warn('Processing a unicode message and ignore the argument "decode_as={}"'.format(self.encoding))
if self.decode_all_as_347:
warnings.warn('Processing a unicode message and ignore the argument "decode_all_as_347={}"'.format(self.decode_all_as_347))
elif isinstance(buff, bytes):
custom_r = re.compile(six.ensure_binary(FIX_REGEX_STRING.format(
d=re.escape(delimiter), s=re.escape(separator)), encoding='ascii'), re.DOTALL)
else:
raise ValueError('Unsupported type of input: {}'.format(type(buff)))
tagvals = custom_r.findall(buff)
if not self._no_groups and self.spec is not None:
for i in range(4):
if tagvals[i][0] in (b'35', u'35'):
msg_type = self.spec.msg_types.get(tagvals[i][1])
if not input_in_unicode:
for tag, val in tagvals:
if int_or_str(tag) == 347:
encoding_347 = six.ensure_str(val)
break
if six.ensure_str(tag) not in HEADER_TAGS_SET: # already enter the message body
break
if self.decode_all_as_347 and encoding_347:
tagvals = ((int_or_str(tval[0], encoding_347), six.ensure_text(tval[1], encoding_347)) for tval in tagvals)
elif encoding:
tagvals = ((int_or_str(tval[0], encoding),
six.ensure_text(tval[1], (encoding_347 if encoding_347 and tval[0].decode() in ENCODED_TAG_SET else encoding))
) for tval in tagvals)
elif not input_in_unicode and six.PY3:
tagvals = ((int_or_str(tval[0], 'ascii'),
six.ensure_text(tval[1], (encoding_347 if encoding_347 and tval[0].decode() in ENCODED_TAG_SET else 'UTF-8'))
) for tval in tagvals)
elif input_in_unicode and six.PY2:
tagvals = ((int_or_str(six.ensure_binary(tval[0]), 'ascii'),
six.ensure_binary(tval[1], (encoding_347 if encoding_347 and tval[0].encode() in ENCODED_TAG_SET else 'UTF-8'))
) for tval in tagvals)
else:
tagvals = ((int_or_str(tval[0]), tval[1]) for tval in tagvals)
if self._no_groups or self.spec is None or msg_type is None:
# no groups can be found without a spec, so no point looking up the msg type.
return self._frg_class(tagvals)
msg = self._frg_class()
groups = msg_type.groups
tagvals = pushback_generator(tagvals)
for tag, value in tagvals:
if tag not in groups:
msg[tag] = value
else:
if value in (b'0', u'0'):
msg[tag] = RepeatingGroup.create_repeating_group(tag)
else:
contents, last_tagval = self._process_group(tag, tagvals,
msg_type=msg_type,
group=groups[tag])
msg[tag] = contents
if last_tagval:
tagvals.send(last_tagval)
return msg
def _process_group(self, identifying_tag, enumerator, msg_type, group):
"""
Recursively process a group
Returns ``(count_tag, [{}, {}])``
"""
rep_group = RepeatingGroup()
rep_group.number_tag = identifying_tag
member = self._frg_class()
first_tag = None
inner_groups = group.groups
valid_tags = group.tags
for tag, value in enumerator:
if first_tag is None:
# handle first tag: we expect all the members of the group to start with this tag
first_tag = tag
rep_group.first_tag = tag
member[tag] = value
elif first_tag == tag:
# we start a new group, replace the current member by an empty one and add the current tag
rep_group.append(member)
member = self._frg_class()
member[tag] = value
elif tag in valid_tags:
# tag is a member, we just add
member[tag] = value
elif tag in inner_groups:
# tag is starting a new sub group, we recurse
contents, last_tagval = self._process_group(tag, enumerator, msg_type, group.groups[tag])
member[tag] = contents
if last_tagval:
# we are not at the end of the message.
tag, val = last_tagval
if tag == first_tag:
# the embedded group finished this member
rep_group.append(member)
member = self._frg_class()
member[tag] = val
elif tag in group.tags:
# didn't finish this member
member[tag] = val
else:
# didn't finish the message but finished the current group
rep_group.append(member)
return rep_group, (tag, val)
else:
# we're out of the group.
rep_group.append(member)
return rep_group, (tag, value)
# we are reaching the end of the message, so complete, no further tags to pass on
rep_group.append(member)
return rep_group, None
def _unmap(self, msg):
"""
Create a tag,value sequence from a FixMessage (dict-type interface).
This will leverage the spec to order the tags the same way they are defined in the spec.
If tags are present on the message that are not in the spec they are added in order at the end
of the message before the tail (tag 10).
"""
def sort_values(msg, spec):
""" Sort {tag:value} map into an iterable """
tvals = list(msg.items())
get_sorting_key = lambda x: spec.sorting_key.get(x[0], int(1e9 + x[0]))
tvals.sort(key=get_sorting_key)
# using a deque for this already-sorted data structure yields a ~10% speed improvement on serialisation
expanded = deque()
for tag, val in tvals:
if isinstance(val, list): # Repeating groups are also lists, so we only need one type here
downspec = spec.groups[tag]
expanded.append((tag, len(val)))
for member in val:
expanded.extend(sort_values(member, downspec))
else:
expanded.append((tag, val))
return expanded
if self.spec is None:
# No spec, let's just get reasonable header order, and 10 at the end.
tag_vals = list(msg.items())
tag_vals.sort(key=lambda x: HEADER_SORT_MAP.get(x[0], int(1e9 + x[0])))
return tag_vals
else:
return sort_values(msg, self.spec.msg_types[msg[35]])
[docs] def serialise(self, msg, separator=SEPARATOR, delimiter=DELIMITER, encoding=None):
"""
Serialise a message into a bytestring.
:param msg: the message to serialse
:type msg: ``dict``-like interface
:param delimiter: as in ``parse()``
:param separator: as in ``parse()``
:param encoding: encoding mode
:type encoding: ``str``
"""
tag_vals = self._unmap(msg)
output = deque()
for tag, value in tag_vals:
if isinstance(tag, int):
output.append(str(tag).encode('ascii'))
elif isinstance(tag, bytes):
output.append(tag)
elif isinstance(tag, six.text_type):
output.append(tag.encode('ascii'))
else:
output.append(str(tag).encode('ascii'))
output.append(delimiter.encode('ascii'))
if isinstance(value, int):
output.append(str(value).encode('UTF-8'))
elif isinstance(value, bytes):
output.append(value)
else:
if not isinstance(value, six.text_type):
value = six.ensure_text(value)
if encoding is not None:
output.append(value.encode(encoding))
elif self.encoding is not None:
output.append(value.encode(self.encoding))
else:
output.append(value.encode('UTF-8'))
output.append(separator.encode('ascii'))
return b''.join(output)