123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- from minio import Minio
- from minio.error import S3Error
- import json
- import io
- class MinIO_globalCfg:
- endpoint = "1.94.225.168:6614"
- access_key = "dongri"
- secret_key = "12345678"
- bucket_name = "dongri"
- secure = False
- class MinIOConfigManager:
- def __init__(self, endpoint, access_key, secret_key, bucket_name="config-bucket"):
- self.client = Minio(
- endpoint,
- access_key=access_key,
- secret_key=secret_key,
- secure=False # 根据实际环境调整 HTTPS
- )
- self.bucket_name = bucket_name
- self._ensure_bucket_exists()
- def _ensure_bucket_exists(self):
- """确保存储桶存在"""
- try:
- if not self.client.bucket_exists(self.bucket_name):
- self.client.make_bucket(self.bucket_name)
- except S3Error as e:
- raise Exception(f"Bucket operation failed: {e}")
- def set_config(self, key, value):
- try:
- # 显式展示 value 的处理流程
- data_bytes = json.dumps(value).encode('utf-8') # 转为字节
- # 创建文件流对象
- stream = io.BytesIO(data_bytes)
- stream.seek(0) # 重置指针位置[6](@ref)
-
- # 上传到 MinIO(value_bytes 即处理后的 value)
- self.client.put_object(
- bucket_name=self.bucket_name,
- object_name=key,
- data=stream, # 最终使用 value
- length=len(data_bytes)
- )
- print(f"Config '{key}' saved.")
- except Exception as e:
- print(f"Error: {e}")
- def get_config(self, key):
- try:
- response = self.client.get_object(self.bucket_name, key)
- data = response.read().decode('utf-8') # 正确调用 read() 方法
- return data
- except S3Error as e:
- if e.code == "NoSuchKey":
- print(f"Config '{key}' not found.")
- return None
- print(f"Error fetching config: {e}")
- return None
- finally:
- if 'response' in locals():
- response.close()
- def list_configs(self):
- """列出所有配置键"""
- try:
- objects = self.client.list_objects(self.bucket_name)
- return [obj.object_name for obj in objects]
- except S3Error as e:
- print(f"Error listing configs: {e}")
- return []
- def delete_config(self, key):
- """删除配置"""
- try:
- self.client.remove_object(self.bucket_name, key)
- print(f"Config '{key}' deleted.")
- except S3Error as e:
- print(f"Error deleting config: {e}")
- def testMinIO():
- client = Minio("1.94.225.168:6614", access_key="dongri", secret_key="12345678", secure=False)
- # 创建桶并上传文件
- value = {"key": "value"} # 可以是任意数据
- data_bytes = json.dumps(value).encode('utf-8') # 转为字节
- # 创建文件流对象
- stream = io.BytesIO(data_bytes)
- stream.seek(0) # 重置指针位置[6](@ref)
- client.put_object(
- bucket_name="test-bucket",
- object_name="tesss111112st",
- data=stream,
- length=len(data_bytes),
- )
- print("功能验证成功")
- if __name__ == "__main__":
- #testMinIO()
-
- # 初始化客户端(替换为你的 MinIO 地址和密钥)
- config_manager = MinIOConfigManager(
- endpoint=MinIO_globalCfg.endpoint,
- access_key=MinIO_globalCfg.access_key,
- secret_key=MinIO_globalCfg.secret_key,
- bucket_name=MinIO_globalCfg.bucket_name
- )
-
- # 设置配置
- config_manager.set_config("database", {"host": "127.0.0.1", "port": 3306})
- config_manager.set_config("app_settings", {"theme": "dark", "timeout": 30})
- # 获取配置
- db_config = config_manager.get_config("database")
- print("Database config:", db_config) # 输出: {'host': '127.0.0.1', 'port': 3306}
- # 列出所有配置键
- print("All config keys:", config_manager.list_configs()) # 输出: ['database', 'app_settings']
- # 删除配置
- config_manager.delete_config("app_settings")
-
|