minio_manage.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from minio import Minio
  2. from minio.error import S3Error
  3. import json
  4. import io
  5. class MinIO_globalCfg:
  6. endpoint = "1.94.225.168:6614"
  7. access_key = "dongri"
  8. secret_key = "12345678"
  9. bucket_name = "dongri"
  10. secure = False
  11. class MinIOConfigManager:
  12. def __init__(self, endpoint, access_key, secret_key, bucket_name="config-bucket"):
  13. self.client = Minio(
  14. endpoint,
  15. access_key=access_key,
  16. secret_key=secret_key,
  17. secure=False # 根据实际环境调整 HTTPS
  18. )
  19. self.bucket_name = bucket_name
  20. self._ensure_bucket_exists()
  21. def _ensure_bucket_exists(self):
  22. """确保存储桶存在"""
  23. try:
  24. if not self.client.bucket_exists(self.bucket_name):
  25. self.client.make_bucket(self.bucket_name)
  26. except S3Error as e:
  27. raise Exception(f"Bucket operation failed: {e}")
  28. def set_config(self, key, value):
  29. try:
  30. # 显式展示 value 的处理流程
  31. data_bytes = json.dumps(value).encode('utf-8') # 转为字节
  32. # 创建文件流对象
  33. stream = io.BytesIO(data_bytes)
  34. stream.seek(0) # 重置指针位置[6](@ref)
  35. # 上传到 MinIO(value_bytes 即处理后的 value)
  36. self.client.put_object(
  37. bucket_name=self.bucket_name,
  38. object_name=key,
  39. data=stream, # 最终使用 value
  40. length=len(data_bytes)
  41. )
  42. print(f"Config '{key}' saved.")
  43. except Exception as e:
  44. print(f"Error: {e}")
  45. def get_config(self, key):
  46. try:
  47. response = self.client.get_object(self.bucket_name, key)
  48. data = response.read().decode('utf-8') # 正确调用 read() 方法
  49. return data
  50. except S3Error as e:
  51. if e.code == "NoSuchKey":
  52. print(f"Config '{key}' not found.")
  53. return None
  54. print(f"Error fetching config: {e}")
  55. return None
  56. finally:
  57. if 'response' in locals():
  58. response.close()
  59. def list_configs(self):
  60. """列出所有配置键"""
  61. try:
  62. objects = self.client.list_objects(self.bucket_name)
  63. return [obj.object_name for obj in objects]
  64. except S3Error as e:
  65. print(f"Error listing configs: {e}")
  66. return []
  67. def delete_config(self, key):
  68. """删除配置"""
  69. try:
  70. self.client.remove_object(self.bucket_name, key)
  71. print(f"Config '{key}' deleted.")
  72. except S3Error as e:
  73. print(f"Error deleting config: {e}")
  74. def testMinIO():
  75. client = Minio("1.94.225.168:6614", access_key="dongri", secret_key="12345678", secure=False)
  76. # 创建桶并上传文件
  77. value = {"key": "value"} # 可以是任意数据
  78. data_bytes = json.dumps(value).encode('utf-8') # 转为字节
  79. # 创建文件流对象
  80. stream = io.BytesIO(data_bytes)
  81. stream.seek(0) # 重置指针位置[6](@ref)
  82. client.put_object(
  83. bucket_name="test-bucket",
  84. object_name="tesss111112st",
  85. data=stream,
  86. length=len(data_bytes),
  87. )
  88. print("功能验证成功")
  89. if __name__ == "__main__":
  90. #testMinIO()
  91. # 初始化客户端(替换为你的 MinIO 地址和密钥)
  92. config_manager = MinIOConfigManager(
  93. endpoint=MinIO_globalCfg.endpoint,
  94. access_key=MinIO_globalCfg.access_key,
  95. secret_key=MinIO_globalCfg.secret_key,
  96. bucket_name=MinIO_globalCfg.bucket_name
  97. )
  98. # 设置配置
  99. config_manager.set_config("database", {"host": "127.0.0.1", "port": 3306})
  100. config_manager.set_config("app_settings", {"theme": "dark", "timeout": 30})
  101. # 获取配置
  102. db_config = config_manager.get_config("database")
  103. print("Database config:", db_config) # 输出: {'host': '127.0.0.1', 'port': 3306}
  104. # 列出所有配置键
  105. print("All config keys:", config_manager.list_configs()) # 输出: ['database', 'app_settings']
  106. # 删除配置
  107. config_manager.delete_config("app_settings")