minio_manage.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from minio import Minio
  2. from minio.error import S3Error
  3. import json
  4. import io
  5. class MinIO_globalCfg:
  6. endpoint = "1.94.225.168:6614"
  7. access_key = "dongri"
  8. secret_key = "12345678"
  9. bucket_name = "dongri"
  10. secure = False
  11. class MinIOConfigManager:
  12. def __init__(self, endpoint=MinIO_globalCfg.endpoint,
  13. access_key=MinIO_globalCfg.access_key,
  14. secret_key=MinIO_globalCfg.secret_key, bucket_name=MinIO_globalCfg.bucket_name):
  15. self.client = Minio(
  16. endpoint,
  17. access_key=access_key,
  18. secret_key=secret_key,
  19. secure=False # 根据实际环境调整 HTTPS
  20. )
  21. self.bucket_name = bucket_name
  22. self._ensure_bucket_exists()
  23. def _ensure_bucket_exists(self):
  24. """确保存储桶存在"""
  25. try:
  26. if not self.client.bucket_exists(self.bucket_name):
  27. self.client.make_bucket(self.bucket_name)
  28. except S3Error as e:
  29. raise Exception(f"Bucket operation failed: {e}")
  30. def set_config(self, key, value):
  31. try:
  32. # 显式展示 value 的处理流程
  33. data_bytes = json.dumps(value).encode('utf-8') # 转为字节
  34. # 创建文件流对象
  35. stream = io.BytesIO(data_bytes)
  36. stream.seek(0) # 重置指针位置[6](@ref)
  37. # 上传到 MinIO(value_bytes 即处理后的 value)
  38. self.client.put_object(
  39. bucket_name=self.bucket_name,
  40. object_name=key,
  41. data=stream, # 最终使用 value
  42. length=len(data_bytes)
  43. )
  44. print(f"Config '{key}' saved.")
  45. except Exception as e:
  46. print(f"Error: {e}")
  47. def get_config(self, key):
  48. try:
  49. response = self.client.get_object(self.bucket_name, key)
  50. data = response.read().decode('utf-8') # 正确调用 read() 方法
  51. return data
  52. except S3Error as e:
  53. if e.code == "NoSuchKey":
  54. print(f"Config '{key}' not found.")
  55. return None
  56. print(f"Error fetching config: {e}")
  57. return None
  58. finally:
  59. if 'response' in locals():
  60. response.close()
  61. def list_configs(self):
  62. """列出所有配置键"""
  63. try:
  64. objects = self.client.list_objects(self.bucket_name)
  65. return [obj.object_name for obj in objects]
  66. except S3Error as e:
  67. print(f"Error listing configs: {e}")
  68. return []
  69. def delete_config(self, key):
  70. """删除配置"""
  71. try:
  72. self.client.remove_object(self.bucket_name, key)
  73. print(f"Config '{key}' deleted.")
  74. except S3Error as e:
  75. print(f"Error deleting config: {e}")
  76. def testMinIO():
  77. client = Minio("1.94.225.168:6614", access_key="dongri", secret_key="12345678", secure=False)
  78. # 创建桶并上传文件
  79. value = {"key": "value"} # 可以是任意数据
  80. data_bytes = json.dumps(value).encode('utf-8') # 转为字节
  81. # 创建文件流对象
  82. stream = io.BytesIO(data_bytes)
  83. stream.seek(0) # 重置指针位置[6](@ref)
  84. client.put_object(
  85. bucket_name="test-bucket",
  86. object_name="tesss111112st",
  87. data=stream,
  88. length=len(data_bytes),
  89. )
  90. print("功能验证成功")
  91. if __name__ == "__main__":
  92. #testMinIO()
  93. # 初始化客户端(替换为你的 MinIO 地址和密钥)
  94. config_manager = MinIOConfigManager(
  95. endpoint=MinIO_globalCfg.endpoint,
  96. access_key=MinIO_globalCfg.access_key,
  97. secret_key=MinIO_globalCfg.secret_key,
  98. bucket_name=MinIO_globalCfg.bucket_name
  99. )
  100. # 设置配置
  101. config_manager.set_config("database", {"host": "127.0.0.1", "port": 3306})
  102. config_manager.set_config("app_settings", {"theme": "dark", "timeout": 30})
  103. # 获取配置
  104. db_config = config_manager.get_config("database")
  105. print("Database config:", db_config) # 输出: {'host': '127.0.0.1', 'port': 3306}
  106. # 列出所有配置键
  107. print("All config keys:", config_manager.list_configs()) # 输出: ['database', 'app_settings']
  108. # 删除配置
  109. config_manager.delete_config("app_settings")