Source code for Kinesis

"""
Kinesis Binding Module with logging handler and stream object
"""
__author__ = 'Omri Eival'

from logging import StreamHandler
from io import BufferedIOBase, BytesIO
from boto3 import client
from aws_logging_handlers.validation import is_non_empty_string, is_positive_int, empty_str_err, \
    bad_integer_err, ValidationRule
from aws_logging_handlers.tasks import Task, task_worker, STOP_SIGNAL

import logging
import atexit
import signal
import threading
from queue import Queue

MAX_CHUNK_SIZE = 1 * 1024 ** 2  # 1 MB
DEFAULT_CHUNK_SIZE = int(0.5 * 1024 ** 2)  # 0.5 MB
MIN_WORKERS_NUM = 1


[docs]class KinesisStream(BufferedIOBase): """ stream interface used by the handler which binds to Kinesis Firehose and uploads log records """ def __init__(self, stream_name: str, partition_key: str, *, chunk_size: int = DEFAULT_CHUNK_SIZE, encoder: str = 'utf-8', workers: int = 1, **boto_session_kwargs): """ :param stream_name: Name of the Kinesis stream :type stream_name: str :param partition_key: Kinesis partition key used to group data by shards :type partition_key: str :param chunk_size: the size of a a chunk of records for rotation threshold (default 524288) :type chunk_size: int :param encoder: the encoder to be used for log records (default 'utf-8') :type encoder: str :param workers: the number of background workers that rotate log records (default 1) :type workers: int :param boto_session_kwargs: additional keyword arguments for the AWS Kinesis Resource :type boto_session_kwargs: boto3 resource keyword arguments """ self._client = client('kinesis', **boto_session_kwargs) self.chunk_size = chunk_size self.stream_name = stream_name self.tasks = Queue() self.partition_key = partition_key self.encoder = encoder try: stream_desc = self._client.describe_stream(StreamName=self.stream_name) if stream_desc['StreamDescription']['StreamStatus'] != 'ACTIVE': raise AssertionError except Exception: raise ValueError('Kinesis stream %s does not exist or inactive, or insufficient permissions' % stream_name) self.workers = [threading.Thread(target=task_worker, args=(self.tasks,)).start() for _ in range(int(max(workers, MIN_WORKERS_NUM) / 2) + 1)] self._stream = BytesIO() self._is_open = True BufferedIOBase.__init__(self) def add_task(self, task): """ Add a new task to the tasks queue :param task: a Task object :return: """ self.tasks.put(task) def join_tasks(self): """ Join tasks in the queue :return: """ self.tasks.join() def _rotate_chunk(self, run_async=True): """ Send the accumulated records to the stream and clear the buffer :param run_async: Indicates whether the rotation should by asynchronous on a different thread :type run_async: bool :return: """ assert self._stream, "Stream object not found" buffer = self._stream self._stream = BytesIO() if buffer.tell() > MAX_CHUNK_SIZE: # We are limited to a size of 1 MB per stream upload command so we need to enforce it chunk_delta = MAX_CHUNK_SIZE - buffer.tell() buffer.seek(chunk_delta) self._stream.write(buffer.read()) buffer.seek(0) if run_async: self.add_task(Task(self._upload_part, buffer)) else: self._upload_part(buffer) def _upload_part(self, buffer): try: self._client.put_record( StreamName=self.stream_name, Data=buffer.read(MAX_CHUNK_SIZE).decode(self.encoder), PartitionKey=self.partition_key ) except Exception: logging.exception("Failed to stream to AWS Firehose data stream {}".format(self.stream_name)) def close(self, *args, **kwargs): """ closes the stream for writing and uploads remaining records to Kinesis :param args: :param kwargs: :return: """ if self._stream.tell() > 0: self._rotate_chunk(run_async=False) self.join_tasks() # Stop the worker threads for _ in range(len(self.workers)): self.tasks.put(STOP_SIGNAL) self._is_open = False @property def closed(self): return not self._is_open @property def writable(self, *args, **kwargs): return True @property def partition_key(self): return self._partition_key @partition_key.setter def partition_key(self, value): if value and type(value) is str: self._partition_key = value def tell(self, *args, **kwargs): """ indication of current size of the stream before rotation :param args: :param kwargs: :return: size of the current stream """ return self._stream.tell() def write(self, *args, **kwargs): """ writes a log record to the stream :param args: :param kwargs: :return: size of record that was written """ s = args[0] self._stream.write(s.encode(self.encoder)) return len(s) def flush(self): """ flushes the current stream if it exceeds the threshold size :return: """ if self._stream.tell() >= self.chunk_size: self._rotate_chunk()
[docs]class KinesisHandler(StreamHandler): """ A Logging handler class that streams log records to AWS Kinesis """ def __init__(self, stream_name: str, partition_key: str, *, chunk_size: int = DEFAULT_CHUNK_SIZE, encoder: str = 'utf-8', workers: int = 1, **boto_session_kwargs): """ :param stream_name: Name of the Kinesis stream :type stream_name: str :param partition_key: Kinesis partition key used to group data by shards :type partition_key: str :param chunk_size: the size of a a chunk of records for rotation threshold (default 524288) :type chunk_size: int :param encoder: the encoder to be used for log records (default 'utf-8') :type encoder: str :param workers: the number of background workers that rotate log records (default 1) :type workers: int :param boto_session_kwargs: additional keyword arguments for the AWS Kinesis Resource :type boto_session_kwargs: boto3 resource keyword arguments """ args_validation = ( ValidationRule(stream_name, is_non_empty_string, empty_str_err('stream_name')), ValidationRule(chunk_size, is_positive_int, bad_integer_err('chunk_size')), ValidationRule(encoder, is_non_empty_string, empty_str_err('encoder')), ValidationRule(workers, is_positive_int, bad_integer_err('workers')), ) for rule in args_validation: assert rule[1](rule[0]), rule[3] self.stream = KinesisStream(stream_name, partition_key, chunk_size=chunk_size, encoder=encoder, workers=workers, **boto_session_kwargs) # Make sure we gracefully clear the buffers and upload the missing parts before exiting signal.signal(signal.SIGTERM, self._teardown) signal.signal(signal.SIGINT, self._teardown) signal.signal(signal.SIGQUIT, self._teardown) atexit.register(self.close) StreamHandler.__init__(self, self.stream) def _teardown(self, _: int): self.close() def close(self, *args, **kwargs): """ Closes the stream """ self.acquire() try: if self.stream: try: self.flush() finally: stream = self.stream self.stream = None if hasattr(stream, "close"): stream.close(*args, **kwargs) finally: self.release()