Source code for S3

"""
S3 Binding Module with logging handler and stream object
"""
__author__ = 'Omri Eival'


import atexit
import signal
import threading
import queue
import gzip
import codecs

from logging import StreamHandler
from io import BufferedIOBase, BytesIO
from boto3 import Session
from datetime import datetime
from aws_logging_handlers.validation import is_non_empty_string, is_positive_int, empty_str_err, bad_integer_err, ValidationRule
from aws_logging_handlers.tasks import Task, task_worker, STOP_SIGNAL

DEFAULT_CHUNK_SIZE = 5 * 1024 ** 2  # 5 MB
DEFAULT_ROTATION_TIME_SECS = 12 * 60 * 60  # 12 hours
MAX_FILE_SIZE_BYTES = 100 * 1024 ** 2  # 100 MB
MIN_WORKERS_NUM = 1


class StreamObject:
    """
    Class representation of the AWS s3 object along with all the needed metadata to stream to s3
    """

    def __init__(self, s3_resource, bucket_name, filename, buffer_queue):
        self.object = s3_resource.Object(bucket_name, filename)
        self.uploader = self.object.initiate_multipart_upload()
        self.bucket = bucket_name
        try:
            total_bytes = s3_resource.meta.client.head_object(Bucket=self.bucket.name, Key=filename)
        except Exception:
            total_bytes = 0

        self.buffer = BytesIO()
        self.chunk_count = 0
        self.byte_count = total_bytes
        self.parts = []
        self.tasks = buffer_queue

    def add_task(self, task):
        """
        Add a task to the tasks queue
        :param task: Task object
        :return:
        """
        self.tasks.put(task)

    def join_tasks(self):
        """
        Join all the tasks
        :return:
        """
        self.tasks.join()


[docs]class S3Stream(BufferedIOBase): """ stream interface used by the handler """ _stream_buffer_queue = queue.Queue() _rotation_queue = queue.Queue() def __init__(self, bucket: str, key: str, *, chunk_size: int = DEFAULT_CHUNK_SIZE, max_file_log_time: int = DEFAULT_ROTATION_TIME_SECS, max_file_size_bytes: int = MAX_FILE_SIZE_BYTES, encoder: str = 'utf-8', workers: int = 1, compress: bool = False, **boto_session_kwargs): """ :param bucket: name of the s3 bucket :type bucket: str :param key: s3 key path :type key: str :param chunk_size: size of multipart upload chunk size (default 5MB) :type chunk_size: int :param max_file_log_time: threshold period for a log period until file rotation (default 12 Hours) :type max_file_log_time: int :param max_file_size_bytes: threshold for file rotation by bytes (default 100MB) :type max_file_size_bytes: int :param encoder: the encoder to be used for log records (default 'utf-8') :type encoder: str :param workers: the number of background workers that rotate log records (default 1) :type workers: int :param compress: flag indication for archiving the content of a file :type compress: bool :param boto_session_kwargs: additional keyword arguments for the AWS Kinesis Resource :type boto_session_kwargs: boto3 resource keyword arguments """ self._session = Session() self.s3 = self._session.resource('s3', **boto_session_kwargs) self.start_time = int(datetime.utcnow().strftime('%s')) self.key = key self.chunk_size = chunk_size self.max_file_log_time = max_file_log_time self.max_file_size_bytes = max_file_size_bytes self.current_file_name = "{}_{}".format(key, int(datetime.utcnow().strftime('%s'))) if compress: self.current_file_name = "{}.gz".format(self.current_file_name) self.encoder = encoder self.bucket = bucket self._current_object = self._get_stream_object(self.current_file_name) self.workers = [threading.Thread(target=task_worker, args=(self._rotation_queue,)).start() for _ in range(int(max(workers, MIN_WORKERS_NUM) / 2) + 1)] self._stream_bg_workers = [threading.Thread(target=task_worker, args=(self._stream_buffer_queue,)).start() for _ in range(max(int(max(workers, MIN_WORKERS_NUM) / 2), 1))] self._is_open = True self.compress = compress BufferedIOBase.__init__(self) @property def bucket(self): return self._bucket @bucket.setter def bucket(self, val): if not val: raise ValueError("Bucket name is invalid") try: self.s3.meta.client.head_bucket(Bucket=val) except Exception: raise ValueError('Bucket %s does not exist, or insufficient permissions' % val) self._bucket = self.s3.Bucket(val) @property def key(self): return self._key @key.setter def key(self, val): if not val: raise ValueError("Given key is invalid") self._key = val.strip('/') @property def encoder(self): return self._encoder @encoder.setter def encoder(self, val): _ = codecs.getencoder(val) self._encoder = val def get_filename(self): """ returns a log file name :return: name of the log file in s3 """ filename = "{}_{}".format(self.key, self.start_time) if not self.compress: return filename return "{}.gz".format(filename) def _add_task(self, task): self._rotation_queue.put(task) def _join_tasks(self): self._rotation_queue.join() def _get_stream_object(self, filename): try: return StreamObject(self.s3, self.bucket.name, filename, self._stream_buffer_queue) except Exception: raise RuntimeError('Failed to open new S3 stream object') def _rotate_chunk(self, run_async=True): assert self._current_object, "Stream object not found" part_num = self._current_object.chunk_count + 1 part = self._current_object.uploader.Part(part_num) buffer = self._current_object.buffer self._current_object.buffer = BytesIO() buffer.seek(0) if run_async: self._current_object.add_task(Task(self._upload_part, self._current_object, part, part_num, buffer)) else: self._upload_part(self._current_object, part, part_num, buffer) self._current_object.chunk_count += 1 @staticmethod def _upload_part(s3_object, part, part_num, buffer): upload = part.upload(Body=buffer) s3_object.parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) def _rotate_file(self): if self._current_object.buffer.tell() > 0: self._rotate_chunk() temp_object = self._current_object self._add_task(Task(self._close_stream, stream_object=temp_object)) self.start_time = int(datetime.utcnow().strftime('%s')) new_filename = self.get_filename() self._current_object = self._get_stream_object(new_filename) @staticmethod def _close_stream(stream_object, callback=None, *args, **kwargs): stream_object.join_tasks() if stream_object.chunk_count > 0: stream_object.uploader.complete(MultipartUpload={'Parts': stream_object.parts}) else: stream_object.uploader.abort() if callback and callable(callback): callback(*args, **kwargs) def close(self, *args, **kwargs): """ close the stream for writing, upload remaining log records in stream :param args: :param kwargs: :return: """ if self._current_object.buffer.tell() > 0: self._rotate_chunk(run_async=False) self._current_object.join_tasks() self._join_tasks() self._close_stream(self._current_object) # Stop the worker threads for _ in range(len(self.workers)): self._rotation_queue.put(STOP_SIGNAL) for _ in range(len(self._stream_bg_workers)): self._stream_buffer_queue.put(STOP_SIGNAL) self._is_open = False @property def closed(self): return not self._is_open @property def writable(self, *args, **kwargs): return True def tell(self, *args, **kwargs): """ indication of current size of the stream before rotation :param args: :param kwargs: :return: size of the current stream """ return self._current_object.byte_count def write(self, *args, **kwargs): """ writes a log record to the stream :param args: :param kwargs: :return: size of record that was written """ s = self.compress and gzip.compress(args[0].encode(self.encoder)) or args[0].encode(self.encoder) self._current_object.buffer.write(s) self._current_object.byte_count = self._current_object.byte_count + len(s) return len(s) def flush(self, *args, **kwargs): """ flushes the current stream if it exceeds the threshold size :return: """ if self._current_object.buffer.tell() > self.chunk_size: self._rotate_chunk() if (self.max_file_size_bytes and self._current_object.byte_count > self.max_file_size_bytes) or ( self.max_file_log_time and int( datetime.utcnow().strftime('%s')) - self.start_time > self.max_file_log_time): self._rotate_file()
[docs]class S3Handler(StreamHandler): """ A Logging handler class that streams log records to S3 by chunks """ def __init__(self, key: str, bucket: str, *, chunk_size: int = DEFAULT_CHUNK_SIZE, time_rotation: int = DEFAULT_ROTATION_TIME_SECS, max_file_size_bytes: int = MAX_FILE_SIZE_BYTES, encoder: str = 'utf-8', max_threads: int = 1, compress: bool = False, **boto_session_kwargs): """ :param key: The path of the S3 object :type key: str :param bucket: The id of the S3 bucket :type bucket: str :param chunk_size: size of a chunk in the multipart upload in bytes (default 5MB) :type chunk_size: int :param time_rotation: Interval in seconds to rotate the file by (default 12 hours) :type time_rotation: int :param max_file_size_bytes: maximum file size in bytes before rotation (default 100MB) :type max_file_size_bytes: int :param encoder: default utf-8 :type encoder: str :param max_threads: the number of threads that a stream handler would run for file and chunk rotation tasks, only useful if emitting lot's of records :type max_threads: int :param compress: indicating weather to save a compressed gz suffixed file :type compress: bool """ args_validation = ( ValidationRule(time_rotation, is_positive_int, bad_integer_err('time_rotation')), ValidationRule(max_file_size_bytes, is_positive_int, bad_integer_err('max_file_size_bytes')), ValidationRule(encoder, is_non_empty_string, empty_str_err('encoder')), ValidationRule(max_threads, is_positive_int, bad_integer_err('thread_count')), ) for rule in args_validation: assert rule.func(rule.arg), rule.message self.bucket = bucket self.stream = S3Stream(self.bucket, key, chunk_size=chunk_size, max_file_log_time=time_rotation, max_file_size_bytes=max_file_size_bytes, encoder=encoder, workers=max_threads, compress=compress, **boto_session_kwargs) # Make sure we gracefully clear the buffers and upload the missing parts before exiting signal.signal(signal.SIGTERM, self._teardown) signal.signal(signal.SIGINT, self._teardown) signal.signal(signal.SIGQUIT, self._teardown) atexit.register(self.close) StreamHandler.__init__(self, self.stream) def _teardown(self, _: int): return self.close() def close(self, *args, **kwargs): """ Closes the stream """ self.acquire() try: if self.stream: try: self.flush() finally: stream = self.stream self.stream = None if hasattr(stream, "close"): stream.close(*args, **kwargs) finally: self.release()