Source code for thoth.storages.security_indicators
#!/usr/bin/env python3
# thoth-storages
# Copyright(C) 2020 Kevin Postlethwait, Francesco Murdaca
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Adapter for persisting Security Indicator results."""
import os
import logging
from typing import Any
from typing import Dict
from typing import Optional
from typing import Generator
from .ceph import CephStore
_LOGGER = logging.getLogger(__name__)
def _get_security_indicators_prefix(security_indicator_id: Optional[str] = None) -> str:
"""Get prefix where security indicators are stored."""
bucket_prefix = os.environ["THOTH_CEPH_BUCKET_PREFIX"]
deployment_name = os.environ["THOTH_DEPLOYMENT_NAME"]
if security_indicator_id is None:
return f"{bucket_prefix}/{deployment_name}/security-indicators"
return f"{bucket_prefix}/{deployment_name}/security-indicators/{security_indicator_id}"
class _SecurityIndicatorBase:
"""A base class for security-indicators analyzers results."""
__slots__ = ["ceph", "security_indicator_id"]
def connect(self) -> None:
"""Connect this adapter to Ceph."""
self.ceph.connect()
def is_connected(self) -> bool:
"""Check if this adapter is connected."""
return self.ceph.is_connected()
def check_connection(self) -> None:
"""Check connection of this adapter."""
return self.ceph.check_connection()
[docs]class SIBanditStore(_SecurityIndicatorBase):
"""An adapter for manipulating with security-indicators bandit."""
def __init__(self, security_indicator_id: str) -> None:
"""Constructor."""
prefix = f"{_get_security_indicators_prefix(security_indicator_id)}/"
self.ceph = CephStore(prefix=prefix)
self.security_indicator_id = security_indicator_id
[docs] def retrieve_document(self) -> Dict[str, Any]:
"""Retrieve SI bandit document."""
return self.ceph.retrieve_document("bandit")
[docs] def document_exists(self) -> bool:
"""Check if the there is an object with the given key in bucket."""
return self.ceph.document_exists("bandit")
[docs]class SIClocStore(_SecurityIndicatorBase):
"""An adapter for manipulating with security-indicators cloc."""
def __init__(self, security_indicator_id: str) -> None:
"""Constructor."""
prefix = f"{_get_security_indicators_prefix(security_indicator_id)}/"
self.ceph = CephStore(prefix=prefix)
self.security_indicator_id = security_indicator_id
[docs] def retrieve_document(self) -> Dict[str, Any]:
"""Retrieve SI cloc document."""
return self.ceph.retrieve_document("cloc")
[docs] def document_exists(self) -> bool:
"""Check if the there is an object with the given key in bucket."""
return self.ceph.document_exists("cloc")
[docs]class SIAggregatedStore(_SecurityIndicatorBase):
"""An adapter for manipulating with security-indicators aggregated."""
def __init__(self, security_indicator_id: str) -> None:
"""Constructor."""
prefix = f"{_get_security_indicators_prefix(security_indicator_id)}/"
self.ceph = CephStore(prefix=prefix)
self.security_indicator_id = security_indicator_id
[docs] def retrieve_document(self) -> Dict[str, Any]:
"""Retrieve SI aggregated document."""
return self.ceph.retrieve_document("aggregated")
[docs] def document_exists(self) -> bool:
"""Check if the there is an object with the given key in bucket."""
return self.ceph.document_exists("aggregated")
[docs]class SecurityIndicatorsResultsStore:
"""Adapter for manipulating with Security Indicators."""
__slots__ = ["bandit", "cloc", "aggregated", "security_indicator_id"]
def __init__(self, security_indicator_id: str) -> None:
"""A representation of a Security Indicator."""
self.security_indicator_id = security_indicator_id
self.bandit = SIBanditStore(security_indicator_id)
self.cloc = SIClocStore(security_indicator_id)
self.aggregated = SIAggregatedStore(security_indicator_id)
[docs] def connect(self) -> None:
"""Connect this adapter."""
self.bandit.connect()
self.cloc.connect()
self.aggregated.connect()
[docs] def is_connected(self) -> bool:
"""Check if this adapter is connected."""
return self.bandit.is_connected() and self.cloc.is_connected() and self.aggregated.is_connected()
[docs] def check_connection(self):
"""Check connections of this adapter."""
self.bandit.check_connection()
self.cloc.check_connection()
self.aggregated.check_connection()
[docs] @classmethod
def iter_security_indicators(cls) -> Generator[str, None, None]:
"""Iterate over security_indicators ids stored."""
ceph = CephStore(prefix=_get_security_indicators_prefix())
ceph.connect()
last_id = None
for item in ceph.get_document_listing():
security_indicator_id = item.split("/", maxsplit=1)[0]
if last_id == security_indicator_id:
# Return only unique si ids, discard any results placed under the given prefix.
continue
last_id = security_indicator_id
yield security_indicator_id
[docs] @classmethod
def get_security_indicators_count(cls) -> int:
"""Get number of security_indicators stored."""
return sum(1 for _ in cls.iter_security_indicators())