""" The ``collections`` module contains types and functions that define
various collections and algorithms.
"""
import struct
import zlib
from collections import defaultdict
from operator import itemgetter
GZIP_HEADER = "\x1F\x8B\x08\x00\x00\x00\x00\x00\x02\xFF".encode("latin1")
MAX_INT = int("FFFFFFFF", 16)
[docs]def first_item_adapter(adaptee):
"""Adapts ``defaultdict(list)``.__getitem__ accessor to
return the first item from the list.
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = first_item_adapter(d)
Return a first item from the list.
>>> a['a']
1
"""
return ItemAdapter(adaptee, 0)
[docs]def last_item_adapter(adaptee):
"""Adapts ``defaultdict(list)``.__getitem__ accessor to
return the last item from the list.
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = last_item_adapter(d)
Return a last item from the list.
>>> a['a']
3
"""
return ItemAdapter(adaptee, -1)
[docs]class ItemAdapter(object):
"""Adapts ``defaultdict(list)``.__getitem__ accessor to
return item at ``index`` from the list. If ``key`` is not
found return None.
"""
__slots__ = ("adaptee", "index")
def __init__(self, adaptee, index):
"""``adaptee`` must be defaultdict(list).
>>> d = defaultdict(list)
>>> a = ItemAdapter(d, 0)
Otherwise raise ``TypeError``.
>>> ItemAdapter(None, 0) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: ...
"""
if adaptee is None or not isinstance(adaptee, dict):
raise TypeError("first argument must be defaultdict(list)")
self.adaptee = adaptee
self.index = index
def __getitem__(self, key):
"""
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = ItemAdapter(d, 0)
Return a first item from the list.
>>> a['a']
1
>>> a = ItemAdapter(d, -1)
Return a last item from the list.
>>> a['a']
3
If ``key`` not found return ``None``.
>>> a['x']
"""
items = self.adaptee[key]
if items:
return items[self.index]
return None
[docs] def get(self, key, default=None):
"""Return the value for key if key is in the adaptee,
else default. If default is not given, it defaults
to None, so that this method never raises a KeyError.
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = ItemAdapter(d, 0)
>>> a.get('a')
1
>>> a.get('b', 100)
100
"""
if key in self.adaptee:
items = self.adaptee[key]
if items:
return items[self.index]
return default
[docs]class attrdict(dict): # noqa: N801
"""A dictionary with attribute-style access. Maps attribute
access to dictionary.
>>> d = attrdict(a=1, b=2)
>>> sorted(d.items())
[('a', 1), ('b', 2)]
>>> d.a
1
>>> d.c = 3
>>> d.c
3
>>> d.d # doctest: +ELLIPSIS
Traceback (most recent call last):
...
AttributeError: ...
"""
__slots__ = ()
def __setattr__(self, key, value):
return super(attrdict, self).__setitem__(key, value)
def __getattr__(self, name):
try:
return super(attrdict, self).__getitem__(name)
except KeyError:
raise AttributeError(name)
[docs]class defaultattrdict(defaultdict): # noqa: N801
"""A dictionary with attribute-style access. Maps attribute
access to dictionary.
>>> d = defaultattrdict(str, a=1, b=2)
>>> d.a
1
>>> d.c = 3
>>> d.c
3
>>> d.d
''
"""
__slots__ = ()
def __setattr__(self, key, value):
return super(defaultattrdict, self).__setitem__(key, value)
def __getattr__(self, name):
return super(defaultattrdict, self).__getitem__(name)
[docs]def distinct(seq):
"""Returns generator for unique items in ``seq`` with preserved
order.
>>> list(distinct('1234512345'))
['1', '2', '3', '4', '5']
If the order is not important consider using ``set`` which is
approximately eight times faster on large sequences.
>>> sorted(list(set('1234512345')))
['1', '2', '3', '4', '5']
"""
unique = {}
for item in seq:
if item not in unique:
unique[item] = None
yield item
[docs]def gzip_iterator(items, compress_level=6):
"""Iterates over ``items`` and returns generator of gzipped items.
``items`` - a list of bytes
>>> items = ['Hello World'.encode('latin1')]
>>> result = list(gzip_iterator(items))
>>> assert 3 == len(result)
>>> assert GZIP_HEADER == result[0]
"""
size = 0
crc = 0
gzip = zlib.compressobj(
compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0
)
yield GZIP_HEADER
for item in items:
size += len(item)
crc = zlib.crc32(item, crc) & MAX_INT
chunk = gzip.compress(item)
if chunk: # pragma: nocover
yield chunk
yield gzip.flush()
yield struct.pack("<2L", crc, size & MAX_INT)
[docs]def map_keys(function, dictionary):
"""Apply `function` to every key of `dictionary` and return
a dictionary of the results.
>>> d = {'1': 1, '2': 2}
>>> sorted_items(map_keys(lambda key: 'k' + key, d))
[('k1', 1), ('k2', 2)]
"""
return dict([(function(key), value) for key, value in dictionary.items()])
[docs]def map_values(function, dictionary):
"""Apply `function` to every value of `dictionary` and return
a dictionary of the results.
>>> d = {'1': 1, '2': 2}
>>> sorted_items(map_values(lambda value: 2 * value, d))
[('1', 2), ('2', 4)]
"""
return dict([(key, function(value)) for key, value in dictionary.items()])
[docs]def list_values(keys, dictionary):
"""Returns `dictionary` values orderd by `keys`.
>>> d = {'1': 1, '2': 2}
>>> list_values(['1', '2', '3'], d)
[1, 2, None]
"""
return [key in dictionary and dictionary[key] or None for key in keys]
[docs]def sorted_items(dictionary):
"""Returns `dictionary` items sorted by key.
>>> d = {'1': 1, '2': 2}
>>> sorted_items(d)
[('1', 1), ('2', 2)]
"""
return sorted(dictionary.items(), key=itemgetter(1))