脚本宝典收集整理的这篇文章主要介绍了HDFS -- hadoop的FileSystem 与 Flink的HadoopFileSystem样例,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.BufferedWrITer;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
public class FileSystemtest {
public static void main(String[] args) throws Exception{
//Configuration conf = new Configuration();
//conf.set("fs.defaultFS","hdfs://node01:8020/");
//FileSystem fileSystem = FileSystem.get(conf);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root");
Remoteiterator<LocatedFileStatus> itr = fileSystem.listFiles(new Path("/test/"), true);
Path outPath = new Path("/filesystemdata02/");
BufferedWriter writer;
FSDataOutputStream out = fileSystem.create(outPath);
FSDataInputStream in;
while (itr.hasNext()){
LocatedFileStatus next = itr.next();
Path path = next.getPath();
in = fileSystem.oPEn(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8"));
String line;
while((line = reader.readLine()) != null) {
writer.write(line);
writer.newLine();
writer.flush();
}
in.close();
}
out.close();
}
}
结果:
结论:Resource内有无core-site.XMl和hdfs-site.xML对使用FileSytem操作HDFS无影响
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
public class HadoopFileSystemTest {
public static void main(String[] args) throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root");
HadoopFileSystem hadoopFileSystem = new HadoopFileSystem(fileSystem);
org.apache.flink.core.fs.FileStatus[] fileStatuses = hadoopFileSystem
.listStatus(new org.apache.flink.core.fs.Path("/test/buketingsink/20200608/"));
Path outPath = new org.apache.flink.core.fs.Path("/hadoopFileSystemData02/");
HadoopDataInputStream in;
HadoopDataOutputStream out = hadoopFileSystem.create(outPath, org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE);
BufferedWriter writer;
for (FileStatus fileStatus : fileStatuses) {
Path filePath = fileStatus.getPath();
in = hadoopFileSystem.open(filePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8"));
String line;
while((line = reader.readLine()) != null) {
writer.write(line);
writer.newLine();
writer.flush();
}
in.close();
}
out.close();
}
}
结果:
以上是脚本宝典为你收集整理的HDFS -- hadoop的FileSystem 与 Flink的HadoopFileSystem样例全部内容,希望文章能够帮你解决HDFS -- hadoop的FileSystem 与 Flink的HadoopFileSystem样例所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。