1、 pip安装minio

pip3 install minio

2、 服务器启动minio

export MINIO_ACCESS_KEY="test"
export MINIO_SECRET_KEY="12345678"
./minio server --address localhost:9900 ./data

3、 测试

"""
    mini_store.py
    ~~~~~~~~~

    web server,定义前端调用接口

    :date: 2020-02-14 14:36:00
    :author:
"""
from minio import Minio
from minio.error import S3Error
import io
import uuid 
class MinioStore:

    def __init__(self,host,access_key,secret_key,bucket,model_dirs):
        self.host=host
        self.access_key=access_key
        self.secret_key=secret_key
        self.secure=False
        self.bucket=bucket
        self.model_dirs=model_dirs
        self.client = Minio(
            self.host,
            secure=self.secure,
            access_key=self.access_key,
            secret_key=self.secret_key,
        )
    def __new__(cls, *args, **kw):
        '''
        启用单例模式
        :param args:
        :param kw:
        :return:
        '''
        if not hasattr(cls, '_instance'):
            cls._instance = object.__new__(cls)
        return cls._instance
    def get_object(self,file_name):
       
        #self.bucket ="rasamodels"
        found = self.client.bucket_exists(self.bucket)
        if not found:
            self.client.make_bucket(self.bucket)
            print('create {} success'.format(self.bucket))
        
        
        model_file=self.model_dirs+file_name
        data=self.client.get_object(
            self.bucket, file_name )

        with open(model_file, 'wb') as f:
            for d in data:
                f.write(d)
        return model_file

    def put_object(self,object_name,raw_data,raw_size):
        
        #self.bucket ="rasamodels"
        found = self.client.bucket_exists(self.bucket)
        if not found:
            self.client.make_bucket(self.bucket)
            print('create {} sucess!'.format(self.bucket))
        
         

        return self.client.put_object(
            self.bucket, object_name, raw_data,raw_size
        )
        
    def fget_object(self,object_name,file_name):
        self.client.fget_object(
            self.bucket, object_name,file_name )
    
    def fput_object(self,object_name,file_name):
        self.client.fput_object(
            self.bucket, object_name,file_name )


if __name__ == '__main__':
   
   miniostore_= MinioStore("127.0.0.1:9900","test","test123456","rasamodels","/home/huangqh/rasa3/models/")
   print(miniostore_.put_object("helloworld",io.BytesIO(b"Hello world !"), 12))
   print(miniostore_.get_object("helloworld"))
   miniostore_.fget_object("helloworld","a.txt")
   miniostore_.fput_object("test1","file.txt")
   miniostore_.fget_object("test1","file_1.txt")

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐