暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分布式文件服务

AweSomeBaZinGa 2019-09-25
239


8 分布式文件服务(系统)

1、背景

搭建分布式文件系统的初衷只是为了解决文件(附件)的存储问题。所以选型主要考虑的因素有:

通用的文件系统开源轻量级开发友好(Java,Go)性能卓越API操作,并能提供简单的管理视图

2、为什么需要分布式文件服务器

主要原因有以下几点

通过API的方式管理文件,文件管理变成开发问题降低WEB服务器的压力,提高文件访问的效率和稳定性统一、公用独立服务易扩展统一安全认证和保护

3、分布式文件存储要求

数据安全 需要实现数据冗余,避免数据的单点故障可线性扩展 当数据增长到TB、甚至PB以上时,存储方案需要支持可线性扩展,暂时没有此需求存储高可用 某个存储服务宕掉时,不影响整体存储方案的可用性能 性能达到应用要求

4、分布式文件系统对比

对比说明/文件系统TFSFastDFSMogileFSMooseFSGlusterFSCeph
开发语言C++CPerlCCC++
开源协议GPL V2GPL V3GPLGPL V3GPL V3LGPL
数据存储方式文件/Trunk文件文件/块对象/文件/块
集群节点通信协议私有协议(TCP)私有协议(TCP)HTTP私有协议(TCP)私有协议(TCP)/ RDAM(远程直接访问内存)私有协议(TCP)
专用元数据存储点占用NS占用DB占用MFS占用MDS
在线扩容支持支持支持支持支持支持
冗余备份支持支持-支持支持支持
单点故障存在不存在存在存在不存在存在
跨集群同步支持部分支持--支持不适用
易用性安装复杂,官方文档少安装简单,社区相对活跃-安装简单,官方文档多安装简单,官方文档专业化安装简单,官方文档专业化
适用场景跨集群的小文件单集群的中小文件-单集群的大中文件跨集群云存储单集群的大中小文件
指标适合类型文件分布系统性能复杂度FUSEPOSIX备份机制通讯协议接口社区支持去重开发语言
FastDFS4KB~500MB小文件合并存储不分片处理很高简单不支持不支持组内冗余备份ApiHTTP国内用户群
C语言
TFS所有文件小文件合并,以block组织分片
复杂不支持不支持Block存储多份,主辅灾备APIhttp
C++
MFS大于64K分片存储Master占内存多
支持支持多点备份动态冗余使用fuse挂在较多
Perl
HDFS大文件大文件分片分块存储
简单支持支持多副本原生api较多
java
Ceph对象文件块OSD一主多从
复杂支持支持多副本原生api较少
C++
MogileFS海量小图片
复杂可以支持不支持动态冗余原生api文档少
Perl
ClusterFS大文件

简单支持支持镜像

C
特性cephminioswifthbase/hdfsGlusterFSfastdfs
开发语言Cgopythonjava副本副本
数据冗余副本,纠删码Reed-Solomon code副本副本副本副本
一致性强一致性强一致最终一致最终一致??
动态扩展HASH不支持动态加节点一致性hash???
性能??????
中心节点对象存储无中心,cephFS有元数据服务中心点无中心无中心nameNode单点??
存储方式块、文件、对象对象存储(分块)块存储块存储??
活跃度高,中文社区不算活跃高,没有中文社区
成熟度??
操作系统linux-3.10.0+[1]linux,windows?任何支持java的OS??
文件系统EXT4,XFSEXT4,XFS????
客户端c、python,S3java,s3java,RESTfuljava,RESTful??
断点续传兼容S3,分段上传,断点下载兼容S3,分段上传,断点下载不支持不支持??
学习成本???
前景1089975
开源协议LGPL version 2.1Apache v2.0Apache V2.0???
管理工具ceph-admin,ceph-mgr,zabbix插件[2]web管理工具[3]命令行工具 mc????
存储系统CephGlusterFSSheepdogLustreSwiftCinderTFSHDFSMooseFSFastDFSMogileFS
开发语言C++CCCPythonPythonC++JavaCCPerl
开源协议LGPLGPL V3GPLv2GPLApacheApacheGPL V2ApacheGPL V3GPL V3GPL
数据存储方式对象/文件/块文件/块对象对象文件文件文件/块文件
集群节点通信协议私有协议(TCP)私有协议(TCP)/ RDAM(远程直接访问内存)totem协议私有协议(TCP)/ RDAM(远程直接访问内存)TCP未知TCPTCPTCPTCPHTTP
专用元数据存储点占用MDS双MDS未知占用NS占用MDS占用MFS占用DB
在线扩容支持支持支持支持支持未知支持支持支持支持支持
冗余备份支持支持支持支持未知支持支持支持支持不支持
单点故障存在不存在不存在存在不存在未知存在存在存在不存在存在
跨集群同步不支持支持未知未知未知未知支持不支持不支持部分支持不支持
易用性安装简单,官方文档专业化安装简单,官方文档专业化未知复杂。而且Lustre严重依赖内核,需要重新编译内核未知目前来说框架不算成熟存在一些问题安装复杂,官方文档少安装简单,官方文档专业化安装简单,官方文档多安装简单,社区相对活跃未知
适用场景单集群的大中小文件跨集群云存储弹性块存储虚拟机大文件读写openstack对象存储openstack块存储跨集群的小文件Mapreduce使用的文件存储单集群的大中文件单集群的中小文件未知
FUSE挂载支持支持支持支持支持未知未知支持支持不支持不支持
访问接口POSIXPOSIX未知POSIX/MPIPOSIX未知不支持POSIX不支持POSIXPOSIX不支持POSIX不支持POSIX

综上:在ceph、minio、fastdfs之间选择。

ceph

ceph优点

1.成熟稳定2.功能强大3.支持数千节点4.支持动态增加节点,自动平衡数据分布5.可配置性强,可针对不同场景进行调优6.支持对象存储(OSD)集群,通过CRUSH算法,完成文件动态定位, 处理效率更高7.支持通过FUSE方式挂载,降低客户端的开发成本,通用性高8.支持分布式的MDS/MON,无单点故障9.强大的容错处理和自愈能力10.支持在线扩容和冗余备份,增强系统的可靠性11.高性能、高可用、高可扩展性、特性丰富

ceph缺点

学习成本高,安装运维复杂

ceph应用场景

可应付各种场景

minio

minio优点

1.学习成本低,安装运维简单,开箱即用2.目前minio论坛[4]推广给力,有问必答3.有java客户端、js客户端、常用语言都有

minio缺点

1.社区不够成熟,业界参考资料较少2.不支持动态增加节点,minio创始人的设计理念就是动态增加节点太复杂,后续会采用其它方案来支持扩容。

fastDFS

fastdfs优点

1.系统无需支持POSIX(可移植操作系统),降低了系统的复杂度,处理效率更高2.支持在线扩容机制,增强系统的可扩展性3.实现了软RAID,增强系统的并发处理能力及数据容错恢复能力4.支持主从文件,支持自定义扩展名5.主备Tracker服务,增强系统的可用性

fastdfs缺点

1.不支持断点续传,对大文件将是噩梦(FastDFS不适合大文件存储)2.不支持POSIX通用接口访问,通用性较低3.对跨公网的文件同步,存在较大延迟,需要应用做相应的容错策略4.同步机制不支持文件正确性校验,降低了系统的可用性5.通过API下载,存在单点的性能瓶颈

fastdfs应用场景

1.单集群部署的应用2.存储后基本不做改动3.小中型文件

结论:目前的需求是需要一个分布式文件存储服务器,考虑到现在的人员情况,选择minio作为内部的文件存储服务器。

5、minio使用

MinIO是基于go语言开发的一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,而一个对象文件可以是任意大小,从几kb到最大5T不等。

MinIO是一个非常轻量的服务,可以很简单的和其他应用的结合,类似 NodeJS, Redis 或者 MySQL。

5.1引入依赖

<!-- minio support -->
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version> <!-- 6.0.8 -->
<exclusions>
<exclusion>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 解决版本冲突问题,如果不主动覆盖,默认引入3.8.1会报错 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.13.1</version><!--$NO-MVN-MAN-VER$-->
</dependency>
复制

5.2配置

@Configuration
@ConfigurationProperties(ignoreInvalidFields=true, ignoreUnknownFields=true, prefix="minio")
@Data
public class MinioConfig {
// 地址
private String endpoint;
private String accessKey;
private String secretKey;
}
复制

5.3客户端Template

/**
* minio 交互类
*
* @author zhangwy
*/
@Component
@Repository
public class MinioTemplate implements InitializingBean {
@Autowired
MinioConfig minioConfig;
@Autowired
private FileService fileService;


private MinioClient client;


@Override
public void afterPropertiesSet() throws Exception {
this.client = new MinioClient(minioConfig.getEndpoint(), minioConfig.getAccessKey(), minioConfig.getSecretKey());
}
/**
* 判断bucket是否存在
*
* @param bucketName
* @return boolean
* */
@SneakyThrows
public boolean bucketExists(String bucketName) {
return client.bucketExists(bucketName);
}


/**
* 创建bucket
*
* @param bucketName
*/
@SneakyThrows
public void createBucket(String bucketName) {
if (!bucketExists(bucketName)) {
client.makeBucket(bucketName);
}
}


/**
* 获取全部bucket
*
* @return List<Bucket>
*/
@SneakyThrows
public List<Bucket> getAllBuckets() {
return client.listBuckets();
}


/**
* 获取所有bucket的Object
*
* @param bucketName
* @return List<MinioItem>
* */
@SneakyThrows
public List<MinioItem> getAllObjects(String bucketName) {
List<MinioItem> objectList = new ArrayList<>();
if(this.bucketExists(bucketName)) {
Iterable<Result<Item>> objectsIterator = client.listObjects(bucketName);
while (objectsIterator.iterator().hasNext()) {
objectList.add(new MinioItem(objectsIterator.iterator().next().get()));
}
}
return objectList;
}


/**
* 根据文件前缀查询文件
*
* @param bucketName
* @param prefix
* @param recursive 是否递归查询
* @return List<MinioItem>
*/
@SneakyThrows
public List<MinioItem> getAllObjectsByPrefix(String bucketName, String prefix, boolean recursive) {
List<MinioItem> objectList = new ArrayList<>();
if(this.bucketExists(bucketName)) {
Iterable<Result<Item>> objectsIterator = client.listObjects(bucketName, prefix, recursive);
while (objectsIterator.iterator().hasNext()) {
objectList.add(new MinioItem(objectsIterator.iterator().next().get()));
}
}
return objectList;
}


/**
* 根据名称查找bucket
*
* @param bucketName
* @return Optional<Bucket>
*/
@SneakyThrows
public Optional<Bucket> getBucket(String bucketName) {
return client.listBuckets().stream().filter(b -> b.name().equals(bucketName)).findFirst();
}


/**
* 根据bucket名称移除bucket
*
* @param bucketName
*/
@SneakyThrows
public void removeBucket(String bucketName) {
if(bucketExists(bucketName)) {
client.removeBucket(bucketName);
}
}


/**
* 查询未完成的上传
*
* @param bucketName
* @return List<MinioUpload>
*/
@SneakyThrows
public List<MinioUpload> getIncompleteUploads(String bucketName) {
List<MinioUpload> uploadList = new ArrayList<>();
if(bucketExists(bucketName)) {
Iterable<Result<Upload>> uploadIterator = client.listIncompleteUploads(bucketName);
while (uploadIterator.iterator().hasNext()) {
uploadList.add(new MinioUpload(uploadIterator.iterator().next().get()));
}
}
return uploadList;
}


/**
* 删除未完成的上传
*
* @param bucketName
* @param objectName
*/
@SneakyThrows
public void removeIncompleteUploads(String bucketName, String objectName) {
client.removeIncompleteUpload(bucketName, objectName);
}


/**
* 上传文件
*
* @param bucketName 桶名
* @param objectName 对象名
* @param stream 文件流
* @throws Exception
*/
@SneakyThrows
public void putObject(String bucketName, String objectName, InputStream stream) throws IOException {
putObject(bucketName, objectName, stream, Long.valueOf(stream.available()), null, null, "application/octet-stream");
}
@SneakyThrows
public String putObject(String bucketName, MultipartFile mFile) {
//判断桶名是否存在,不存在则创建
createBucket(bucketName);
SimpleDateFormat sdfOne = new SimpleDateFormat("yyyyMMdd");
//生成日期+随机+原始文件名
String ymd = sdfOne.format(new Date());
String uuid = UUID.randomUUID().toString();
String name = mFile.getOriginalFilename();
String minIoName = ymd+"/"+uuid+"/"+name;
putObject(bucketName, minIoName , mFile.getInputStream(), mFile.getSize(), null, null, mFile.getContentType());
//保存在数据库里
File file = new File();
file.setBucket(bucketName);
file.setDir(ymd);
file.setUuid(uuid);
file.setName(name);
file.setType(name.substring(name.lastIndexOf(".")+1));
file.setSize(mFile.getSize());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String nowTime = sdf.format(new Date());
Date gmtCreate = null;
try {
gmtCreate = new java.sql.Date(sdf.parse(nowTime).getTime());
} catch (ParseException e1) {
e1.printStackTrace();
}
file.setGmtCreate(gmtCreate);
User user = UserContext.getUser();
int userId = 0;
if (user != null) {
userId = user.getId();
}
file.setCreator(userId);
fileService.save(file);
return file.getId().toString();
}
@SneakyThrows
public String getUrl(int id) {
File file = fileService.getOne(id);
String name = file.getDir()+"/"+file.getUuid()+"/"+file.getName();
String bucketName = file.getBucket();
String objectURL = getObjectURL(bucketName, name);
return objectURL;
}


@SneakyThrows
public List<Map<String, Object>> getUrls(String ids) {
List<Integer> idList= Util.string2IntList(ids);
List<Map<String, Object>> maps = new ArrayList<Map<String,Object>>();
for (int i = 0; i < idList.size(); i++) {
int id = idList.get(i);
File file = fileService.getOne(id);
String url = getUrl(id);
Map<String, Object> map = new HashMap<String, Object>();
map.put("id", file.getId());
map.put("fileName", file.getName());
map.put("url", url);
maps.add(map);
}
return maps;
}


@SneakyThrows
public String putObjects(String bucketName, MultipartFile[] files) {
//判断桶名是否存在
createBucket(bucketName);
List<String> list = new ArrayList<String>();
if (files.length != 0) {
for (int i = 0; i < files.length; i++) {
if (!files[i].isEmpty() ) {
String id = putObject(bucketName, files[i]);
list.add(id);
}
}
}
String ids = Util.list2String(list);
return ids;
}
@SneakyThrows
public void putObject(String bucketName,String objectName, MultipartFile file) {
putObject(bucketName, objectName, file.getInputStream(), file.getSize(), null, null, file.getContentType());
}
@SneakyThrows
public void putObject(String bucketName, String objectName, InputStream stream, String contentType) {
putObject(bucketName, objectName, stream, Long.valueOf(stream.available()), null, null, contentType);
}
@SneakyThrows
public void putObject(String bucketName, String objectName, InputStream stream, long size, Map<String,String> headerMap, ServerSideEncryption sse, String contentType) throws IOException {
if("".equals(StringUtil.null2String(contentType))) {
contentType = "application/octet-stream";
}
if(size == 0) {
size = Long.valueOf(stream.available());
}
client.putObject(bucketName, objectName, stream, size, headerMap, sse, contentType);
}


/**
* KMS加密
* Map<String,String> myContext = new HashMap<>();
* myContext.put("key1","value1");
* ServerSideEncryption sse = ServerSideEncryption.withManagedKeys("Key-Id", myContext);
* S3加密
* ServerSideEncryption sse = ServerSideEncryption.atRest();
* AES加密
* KeyGenerator keyGen = KeyGenerator.getInstance("AES");
* keyGen.init(256);
* ServerSideEncryption sse = ServerSideEncryption.withCustomerKey(keyGen.generateKey());
* */
@SneakyThrows
public void putObjectEncrypted(String bucketName, String objectName, InputStream stream, ServerSideEncryption sse) {
putObject(bucketName, objectName, stream,Long.valueOf(stream.available()), null, sse, "application/octet-stream");
}
/**
* 示例
* // Create metadata map
* Map<String, String> headerMap = new HashMap<>();
* // Add custom metadata
* headerMap.put("CustomMeta", "TEST");
* // Add custom content type
* headerMap.put("Content-Type", "application/octet-stream");
* // Add storage class
* headerMap.put("X-Amz-Storage-Class", "REDUCED_REDUNDANCY");
* */
@SneakyThrows
public void putObjectWithHeader(String bucketName, String objectName, InputStream stream, Map<String,String> headerMap) throws Exception {
putObject(bucketName, objectName, stream,Long.valueOf(stream.available()), headerMap, null, "application/octet-stream");
}


public boolean objectExists(String bucketName, String objectName) {
try {
client.statObject(bucketName, objectName);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 复制对象
*
* @param sourceBucketName
* @param sourceObjectName
* @param targetBucketName
* @param targetObjectName
* */
@SneakyThrows
public void copyObject(String sourceBucketName, String sourceObjectName, String targetBucketName, String targetObjectName) {
if(objectExists(sourceBucketName,sourceObjectName)) {
client.copyObject(sourceBucketName, sourceObjectName, targetBucketName, targetObjectName);
}
}


/**
* 获取文件信息
*
* @param bucketName
* @param objectName
* @return ObjectStat
*/
@SneakyThrows
public ObjectStat getObjectInfo(String bucketName, String objectName) {
if (objectExists(bucketName,objectName)) {
return client.statObject(bucketName, objectName);
} else {
return null;
}
}


/**
* 获取多文件信息
*
* @param fileIds
* @param bucketName
* @return ObjectStat
*/
@SneakyThrows
public List<MinioObject> getObjectInfos(String fileIds, String bucketName) {
List<Integer> ids = Util.string2IntList(fileIds);
List<MinioObject> listFile = new ArrayList<MinioObject>();
String objectName = "";
File file = new File();
for (int i = 0; i < ids.size(); i++) {
file = fileService.getOne(ids.get(i));
objectName = file.getDir()+"/"+file.getUuid()+"/"+file.getName();
ObjectStat objectInfo = getObjectInfo(bucketName, objectName);
MinioObject minioObject = new MinioObject(objectInfo);
listFile.add(minioObject);
}
return listFile;
}


/**
* 获取文件
*
* @param bucketName
* @param objectName
* @return InputStream
*/
@SneakyThrows
public InputStream getObject(String bucketName, String objectName) {
if (objectExists(bucketName,objectName)) {
return client.getObject(bucketName, objectName);
} else {
return null;
}
}


/**
* 下载文件保存到本地
*
* @param bucketName
* @param objectName
* @param fileName
*/
@SneakyThrows
public void getObject(String bucketName, String objectName, String fileName) {
if (objectExists(bucketName,objectName)) {
client.getObject(bucketName, objectName, fileName);
}
}


/**
* 删除单个文件
*
* @param bucketName
* @param objectName
*/
@SneakyThrows
public void removeObject(String bucketName, String objectName) {
if(objectExists(bucketName,objectName)) {
client.removeObject(bucketName, objectName);
}
}
/**
* 删除多个文件
*
* @param bucketName
* @param objectNames
*/
@SneakyThrows
public void removeObjects(String bucketName, List<String> objectNames) {
client.removeObjects(bucketName, objectNames);
}
/**
* 获取文件外链,默认过期时间为七天
*
* @param bucketName
* @param objectName
* @return url
*/
@SneakyThrows
public String getObjectURL(String bucketName, String objectName) {
return getObjectURL(bucketName, objectName, 7 * 24 * 3600);
}


/**
* 获取文件外链
*
* @author zhuhui
* @date 2019-07-09
* @param bucketName
* @param objectName
* @param expires
* @return
*/
@SneakyThrows
public String getObjectURL(String bucketName, String objectName,Integer expires) {
if(objectExists(bucketName,objectName)) {
return client.presignedGetObject(bucketName, objectName, expires);
} else {
return "";
}
}
/**
* 单文件上传返回ID
*
* @author zhuhui
* @date 2019-07-08
* @param file
* @param bucketName
* @return id
*/
public String fileUpload(MultipartFile file, String bucketName) {
return putObject(bucketName, file);
}


/**
* 通过spring的多文件上传
*
* @author zhuhui
* @date 2019-07-09
* @param multipartRequest
* @param bucketName
* @return ids
*/
public Map<String, String> fileUploads(DefaultMultipartHttpServletRequest multipartRequest, String bucketName) {
//判断桶名是否存在,不存在则创建
createBucket(bucketName);
String ids = "";
if (multipartRequest != null) {
List<String> list = new ArrayList<String>();
Iterator<String> iterator = multipartRequest.getFileNames();
while (iterator.hasNext()) {
////单文件上传 。
//MultipartFile file = multipartRequest.getFile(iterator.next());//一次传一个文件
//if (StringUtils.hasText(file.getOriginalFilename())) {
// file.transferTo(new File("E:/upload_" + file.getOriginalFilename()));
//}


// 多文件上传
List<MultipartFile> fileList = multipartRequest.getFiles(iterator.next());
for (MultipartFile file : fileList) {
if (StringUtils.hasText(file.getOriginalFilename())) {
String id = putObject(bucketName, file);
list.add(id);
}
}
}
ids = Util.list2String(list);
}
Map<String, String> map = new HashMap<String, String>();
map.put("ids", ids);
return map;
}


public void updateFile(int contactId, String ids) {
fileService.updateFile(contactId, ids);
}
public String fileUploadsByIo(String files, String bucketName) throws IOException {
String ids = "";
List<String> list = new ArrayList<String>();
JSONObject object = JSONObject.fromObject(files);
Map<String, Object> map = object;
for (String fileName : map.keySet()) {
String base64 = map.get(fileName).toString();
ByteArrayInputStream stream =new Base64Convert().base64ToIo(base64); //将字符串转换为byte数组
//判断桶名是否存在,不存在则创建
createBucket(bucketName);
SimpleDateFormat sdfOne = new SimpleDateFormat("yyyyMMdd");
//生成日期+随机+原始文件名
String ymd = sdfOne.format(new Date());
String uuid = UUID.randomUUID().toString();
String minIoName = ymd+"/"+uuid+"/"+fileName;
putObject(bucketName, minIoName, stream);
//保存在数据库里
File file = new File();
file.setBucket(bucketName);
file.setDir(ymd);
file.setUuid(uuid);
file.setName(fileName);
file.setType(fileName.substring(fileName.lastIndexOf(".")+1));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String nowTime = sdf.format(new Date());
Date gmtCreate = null;
try {
gmtCreate = new java.sql.Date(sdf.parse(nowTime).getTime());
} catch (ParseException e1) {
e1.printStackTrace();
}
file.setGmtCreate(gmtCreate);
User user = UserContext.getUser();
int userId = 0;
if (user != null) {
userId = user.getId();
}
file.setCreator(userId);
fileService.save(file);
list.add(file.getId().toString());
}
ids = Util.list2String(list);
return ids;
}


/**
* 多文件上传返回ID
*
* @author zhuhui
* @date 2019-07-05
* @param userName
* @param files
* @param bucketName
*/
/*
public String fileUploads(MultipartFile[] files, String bucketName) {
return putObjects(bucketName, files);
}
*/
}
复制

注意事项

桶的概念相当于文件夹,桶下面还可以有子文件夹,设置子文件夹的方式就是在传递objectName的时候按照文件路径进行传递

minIoName = ymd+"/"+uuid+"/"+fileName;
复制
文章转载自AweSomeBaZinGa,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论