#!/usr/bin/env python3
# thoth-storages
# Copyright(C) 2018, 2019, 2020 Fridolin Pokorny
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Adapter for storing analysis results onto a persistence remote store."""
import os
import typing
from datetime import date
from datetime import timedelta
from .base import StorageBase
from .ceph import CephStore
from .result_schema import RESULT_SCHEMA
from .exceptions import SchemaError
from .exceptions import NoDocumentIdError
[docs]class ResultStorageBase(StorageBase):
"""Adapter base for storing results."""
# Type of results to distinguish them based on prefix on Ceph.
RESULT_TYPE = ""
# Use core analyzers schema as default one, derived classes can adjust this.
SCHEMA = RESULT_SCHEMA
def __init__(
self,
deployment_name=None,
*,
host: str = None,
key_id: str = None,
secret_key: str = None,
bucket: str = None,
region: str = None,
prefix: str = None,
):
"""Initialize result storage database.
The adapter can take arguments from env variables if not provided
explicitly.
"""
assert (
self.RESULT_TYPE
), "Make sure RESULT_TYPE in derived classes to distinguish between adapter type instances is non-empty."
self.deployment_name = deployment_name or os.environ["THOTH_DEPLOYMENT_NAME"]
self.prefix = "{}/{}/{}".format(
prefix or os.environ["THOTH_CEPH_BUCKET_PREFIX"], self.deployment_name, self.RESULT_TYPE
)
self.ceph = CephStore(
self.prefix, host=host, key_id=key_id, secret_key=secret_key, bucket=bucket, region=region
)
[docs] @classmethod
def get_document_id(cls, document: dict) -> str:
"""Get document id under which the given document should be stored."""
# We use hostname that matches pod id generated by OpenShift as a base.
# Note we need to return job id here - the last part delimited by dash
# is used for specifying pod that runs for the given job. We need job
# id to be returned (remove pod specific part).
document_id = document["metadata"].get("document_id")
if not document_id:
raise NoDocumentIdError("No document id is present in metadata")
return document_id
[docs] def is_connected(self) -> bool:
"""Check if the given database adapter is in connected state."""
return self.ceph.is_connected()
[docs] def connect(self) -> None:
"""Connect the given storage adapter."""
self.ceph.connect()
@staticmethod
def _iter_dates_prefix_addition(
start_date: date,
end_date: typing.Optional[date] = None,
*,
include_end_date: bool = False,
) -> typing.Generator[str, None, None]:
"""Create prefix based on dates supplied."""
if end_date is None:
end_date = date.today() + timedelta(days=1) # Today inclusively.
elif end_date < start_date:
raise ValueError("end_date cannot precede start_date")
elif not include_end_date and end_date == start_date:
raise ValueError("end_date cannot equal to start_date unless include_end_date supplied")
walker = start_date
step = timedelta(days=1)
while walker <= end_date:
if not include_end_date and walker == end_date:
break
yield walker.strftime("-%y%m%d")
walker += step
[docs] def get_document_listing(
self,
*,
start_date: typing.Optional[date] = None,
end_date: typing.Optional[date] = None,
include_end_date: bool = False,
only_requests: bool = False,
) -> typing.Generator[str, None, None]:
"""Get listing of documents available in Ceph as a generator.
Additional parameters can filter results. If start_date is supplied
and no end_date is supplied explicitly, the current date is
considered as end_date (inclusively).
"""
if start_date:
for prefix_addition in self._iter_dates_prefix_addition(
start_date=start_date, end_date=end_date, include_end_date=include_end_date
):
for document_id in self.ceph.get_document_listing(prefix_addition):
if not only_requests:
if not document_id.endswith(".request"):
yield document_id
else:
if document_id.endswith(".request"):
yield document_id
else:
for document_id in self.ceph.get_document_listing():
if not only_requests:
if not document_id.endswith(".request"):
yield document_id
else:
if document_id.endswith(".request"):
yield document_id
[docs] def get_document_count(self, *args, **kwargs) -> int:
"""Get number of documents present."""
return sum(1 for _ in self.get_document_listing(*args, **kwargs))
[docs] def store_document(self, document: dict, document_id: typing.Optional[str] = None) -> str:
"""Store the given document in Ceph."""
if self.SCHEMA:
try:
self.SCHEMA(document)
except Exception as exc:
raise SchemaError("Failed to validate document schema") from exc
if document_id is None:
document_id = self.get_document_id(document)
self.ceph.store_document(document, document_id)
return document_id
[docs] def store_request(self, document_id: str, request: typing.Dict[str, typing.Any]) -> str:
"""Store the given request.
This function stores a request document for user request traceability.
"""
document_id = f"{document_id}.request"
self.ceph.store_document(request, document_id)
return document_id
[docs] def retrieve_request(self, document_id: str) -> typing.Dict[str, typing.Any]:
"""Retrieve document capturing requests."""
return self.ceph.retrieve_document(f"{document_id}.request")
[docs] def request_exists(self, document_id: str) -> bool:
"""Check if a request exists for the given document id."""
return self.ceph.document_exists(f"{document_id}.request")
[docs] def store_file(self, file_path: str, file_id: str) -> str:
"""Store the given file in Ceph."""
self.ceph.store_file(file_path, file_id)
return file_id
[docs] def retrieve_document(self, document_id: str) -> dict:
"""Retrieve a document from Ceph by its id."""
return self.ceph.retrieve_document(document_id)
[docs] def iterate_results(
self,
*,
start_date: typing.Optional[date] = None,
end_date: typing.Optional[date] = None,
include_end_date: bool = False,
) -> typing.Generator[tuple, None, None]:
"""Iterate over results available in the Ceph.
Additional parameters can filter results. If start_date is supplied
and no end_date is supplied explicitly, the current date is
considered as end_date (inclusively).
"""
if start_date:
for prefix_addition in self._iter_dates_prefix_addition(
start_date=start_date, end_date=end_date, include_end_date=include_end_date
):
yield from self.ceph.iterate_results(prefix_addition)
else:
yield from self.ceph.iterate_results()
[docs] def document_exists(self, document_id: str) -> bool:
"""Check if the there is an object with the given key in bucket."""
return self.ceph.document_exists(document_id)