Skip to content

Commit

Permalink
issue #3753: introduce usage of minimum_object_size_for_transit prope…
Browse files Browse the repository at this point in the history
…rty from sls cloud region config. Expand exception logging.
  • Loading branch information
SilinPavel committed Oct 24, 2024
1 parent 1051912 commit 915320f
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def __init__(self, cloud_operations, testcase):
for file in storage.files:
self.watched_files_by_storages[storage.storage][file.path if file.path.startswith("/") else "/" + file.path] = file

def prepare_bucket_if_needed(self, region, storage_container):
self.cloud_operations.prepare_bucket_if_needed(region, storage_container)
def prepare_bucket_if_needed(self, region, storage_container, object_size_for_transit=None):
self.cloud_operations.prepare_bucket_if_needed(region, storage_container, object_size_for_transit)

def list_objects_by_prefix(self, region, storage_container, classes_to_list=None,
list_versions=False, convert_paths=True):
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

17 changes: 16 additions & 1 deletion storage-lifecycle-service/sls/app/cloud_storage_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# If the metadata with such key is present on the storage,
# StorageLifecycleArchivingSynchronizer will skip such storage
SKIP_ARCHIVING_TAG = "storage_skip_archiving_tag"
MINIMUM_OBJECT_SIZE_FOR_TRANSIT = "minimum_object_size_for_transit"


def _verify_s3_sls_properties(sls_properties, logger):
Expand Down Expand Up @@ -114,11 +115,25 @@ def should_be_skipped(self, storage):
return False
return tag_to_skip_storage in metadata

def get_minimum_object_size_for_transit(self, storage):
region = self.fetch_region(storage.region_id)
return self._get_minimum_object_size_for_transit(region)

def _get_minimum_object_size_for_transit(self, region):
value = region.storage_lifecycle_service_properties.get(MINIMUM_OBJECT_SIZE_FOR_TRANSIT) if region else None
if isinstance(value, str) and value.isnumeric() or isinstance(value, int):
return int(value)
else:
self.logger.log("Bad value for {}: {}, should be non-nagative int. "
"Returning None".format(MINIMUM_OBJECT_SIZE_FOR_TRANSIT, value))
return None

def prepare_bucket_if_needed(self, storage):
region = self.fetch_region(storage.region_id)
storage_cloud_identifier, storage_path_prefix = self._parse_storage_path(storage)
storage_container = CloudPipelineStorageContainer(storage_cloud_identifier, storage_path_prefix, storage)
self.cloud_operations[storage.storage_type].prepare_bucket_if_needed(region, storage_container)
object_size_for_transit = self._get_minimum_object_size_for_transit(region)
self.cloud_operations[storage.storage_type].prepare_bucket_if_needed(region, storage_container, object_size_for_transit)

def list_objects_by_prefix(self, storage, prefix, classes_to_list):
region = self.fetch_region(storage.region_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _process_files(self, storage, folder, file_listing, rule_subject_files, rule
.format(rule.datastorage_id, rule.rule_id, folder, transition_method))

if transition_method == METHOD_ONE_BY_ONE:
resulted_action_items = self._build_action_items_for_files(folder, rule_subject_files, rule)
resulted_action_items = self._build_action_items_for_files(storage, folder, rule_subject_files, rule)
else:
resulted_action_items = self._build_action_items_for_folder(storage, folder, file_listing, rule_subject_files, rule)

Expand All @@ -164,7 +164,7 @@ def _process_files(self, storage, folder, file_listing, rule_subject_files, rule

self._apply_action_items(storage, rule, resulted_action_items)

def _build_action_items_for_files(self, folder, rule_subject_files, rule):
def _build_action_items_for_files(self, storage, folder, rule_subject_files, rule):
result = StorageLifecycleRuleActionItems().with_folder(folder)\
.with_mode(ACTIONS_MODE_FILES).with_rule_id(rule.rule_id)

Expand All @@ -177,12 +177,12 @@ def _build_action_items_for_files(self, folder, rule_subject_files, rule):
date_of_action = self._calculate_transition_date(file, transition)
today = datetime.datetime.now(datetime.timezone.utc).date()

storage_class_file_filter = cloud_utils.get_storage_class_specific_file_filter(transition.storage_class)
storage_object_size_for_transit_filter = self.get_minimum_object_size_for_transit_filter(storage)
if date_utils.is_date_after_that(date_of_action, today):
should_be_transferred = \
file.storage_class != transition.storage_class and \
(transition_date is None or date_utils.is_date_after_that(transition_date, date_of_action)) and \
storage_class_file_filter(file)
storage_object_size_for_transit_filter(file)
if not should_be_transferred:
continue
transition_class = transition.storage_class
Expand Down Expand Up @@ -221,7 +221,8 @@ def _build_action_items_for_folder(self, storage, path, files_listing, subject_f
for execution in rule_executions:
transition = next(filter(lambda t: t.storage_class == execution.storage_class, effective_transitions), None)
updated_execution = self._check_rule_execution_progress(
rule.datastorage_id, transition, files_by_dest_storage_class, execution
storage.id, transition, files_by_dest_storage_class, execution,
self.get_minimum_object_size_for_transit_filter(storage)
)
if updated_execution:
execution.status = updated_execution.status
Expand Down Expand Up @@ -249,7 +250,10 @@ def _build_action_items_for_folder(self, storage, path, files_listing, subject_f
rule.datastorage_id, rule.rule_id, path, transition_class))
continue

trn_subject_files = self._filter_files_to_transit(files_by_dest_storage_class, transition, transition_date)
trn_subject_files = self._filter_files_to_transit(
files_by_dest_storage_class, transition, transition_date,
self.get_minimum_object_size_for_transit_filter(storage)
)
if not trn_subject_files:
self.logger.log(
"Storage: {}. Rule: {}. Path: '{}'. Transition: {}. No files to transit.".format(
Expand Down Expand Up @@ -279,15 +283,15 @@ def _build_action_items_for_folder(self, storage, path, files_listing, subject_f
continue
return result

def _filter_files_to_transit(self, files_by_dest_storage_class, transition, transition_date):
def _filter_files_to_transit(self, files_by_dest_storage_class, transition, transition_date,
storage_object_size_for_transit_filter):
trn_subject_files = files_by_dest_storage_class.get(transition.storage_class, None)
if transition.transition_date is not None:
trn_subject_files = [
trn_file for trn_file in trn_subject_files if trn_file.creation_date.date() < transition_date
]

storage_class_file_filter = cloud_utils.get_storage_class_specific_file_filter(transition.storage_class)
trn_subject_files = [trn_file for trn_file in trn_subject_files if storage_class_file_filter(trn_file)]
trn_subject_files = [trn_file for trn_file in trn_subject_files if storage_object_size_for_transit_filter(trn_file)]

return trn_subject_files

Expand Down Expand Up @@ -337,7 +341,8 @@ def _create_or_update_execution(self, storage_id, rule, storage_class, path, sta
}
)

def _check_rule_execution_progress(self, storage_id, transition, subject_files_by_dest_storage_class, execution):
def _check_rule_execution_progress(self, storage_id, transition, subject_files_by_dest_storage_class, execution,
storage_object_size_for_transit_filter):
if not execution.status:
raise RuntimeError("Storage: {}. Rule: {}. Path: '{}'. Transition: {}. Malformed rule execution found."
" Status not found!".format(storage_id, execution.rule_id,
Expand All @@ -361,13 +366,12 @@ def _check_rule_execution_progress(self, storage_id, transition, subject_files_b
# - We found a file in source location,
# - And creation date is before execution start date
# (if transition was configured with date - file should be created before that date)
storage_class_file_filter = cloud_utils.get_storage_class_specific_file_filter(transition.storage_class)
file_in_wrong_location = next(
filter(lambda file: file.creation_date < execution.updated and
(transition.transition_date is None or file.creation_date.date() < transition.transition_date) and
storage_class_file_filter(file),
storage_object_size_for_transit_filter(file),
subject_files
), None
), None
)

all_files_moved = file_in_wrong_location is None
Expand Down Expand Up @@ -459,6 +463,17 @@ def _prepare_massage():
"subject: {} body: {} to_user: {}"
.format(storage.id, rule.rule_id, path, subject, body, to_user))

def get_minimum_object_size_for_transit_filter(self, storage):
object_size_for_transit = self.cloud_bridge.get_minimum_object_size_for_transit(storage) if self.cloud_bridge else None
if object_size_for_transit is None:
# Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-transition-general-considerations.html
# Files with size smaller that 128kb will not be moved from the S3 Standard or
# S3 Standard-IA storage classes by default
return lambda f: f.size > cloud_utils.DEFAULT_MIN_SIZE_OF_OBJECT_TO_TRANSIT
else:
return lambda f: f.size > object_size_for_transit


@staticmethod
def _notification_should_be_sent(notification, execution, date_of_action, today):
if not notification.enabled:
Expand Down
2 changes: 1 addition & 1 deletion storage-lifecycle-service/sls/cloud/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


class StorageOperations:
def prepare_bucket_if_needed(self, region, storage_container):
def prepare_bucket_if_needed(self, region, storage_container, object_size_for_transit=None):
pass

def list_objects_by_prefix(self, region, storage_container, classes_to_list=None,
Expand Down
9 changes: 1 addition & 8 deletions storage-lifecycle-service/sls/cloud/cloud_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,4 @@
#
# This filter applying in both situation, when actual archiving action is performed
# and when we check the result of the existing actions
def get_storage_class_specific_file_filter(target_storage_class):
if target_storage_class == "GLACIER_IR":
# Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-transition-general-considerations.html
# Files with size smaller that 128kb will not be moved from the S3 Standard or
# S3 Standard-IA storage classes to S3 Intelligent-Tiering or S3 Glacier Instant Retrieval
return lambda f: f.size > 128 * 1024
else:
return lambda f: True
DEFAULT_MIN_SIZE_OF_OBJECT_TO_TRANSIT = 128 * 1024
Loading

0 comments on commit 915320f

Please sign in to comment.