Source code for thoth.storages.logs
#!/usr/bin/env python3
# thoth-storages
# Copyright(C) 2021 Fridolin Pokorny
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Adapter for accessing Argo Workflow logs."""
import os
from typing import Optional
from thoth.storages.base import StorageBase
from thoth.storages.ceph import CephStore
from thoth.storages.exceptions import MultipleFoundError
from thoth.storages.exceptions import NotFoundError
[docs]class WorkflowLogsStore(StorageBase):
"""Access logs stored by Argo Workflows."""
def __init__(
self,
deployment_name: Optional[str] = None,
*,
host: Optional[str] = None,
key_id: Optional[str] = None,
secret_key: Optional[str] = None,
bucket: Optional[str] = None,
region: Optional[str] = None,
prefix: Optional[str] = None,
):
"""Initialize the adapter."""
self.deployment_name = deployment_name or os.environ["THOTH_DEPLOYMENT_NAME"]
self.prefix = "{}/{}/argo/artifacts".format(
prefix or os.environ["THOTH_CEPH_BUCKET_PREFIX"],
self.deployment_name,
)
self.ceph = CephStore(
self.prefix, host=host, key_id=key_id, secret_key=secret_key, bucket=bucket, region=region
)
[docs] def get_log(self, workflow_id: str) -> str:
"""Obtain log from the given workflow."""
results = list(self.ceph.get_document_listing(workflow_id))
if len(results) > 1:
raise MultipleFoundError(
f"Multiple results match the given workflow_id ({workflow_id!r}) provided: {results!r}"
)
# Make sure users do not use workflow id prefix.
if not results or not results[0].startswith(f"{workflow_id}/"):
raise NotFoundError(f"No log entry found for {workflow_id!r}")
return self.ceph.retrieve_blob(results[0]).decode()
[docs] def connect(self) -> None:
"""Connect to Ceph."""
self.ceph.connect()