yarn logs -applicationId 命令 Java 版本简单实现

2019/04/25 Yarn

yarn logs -applicationId 命令 Java 版本简单实现


前言

有个需求是需要把将 yarn logs 日志获得然后在前端页面上显示出来。

我一开始是直接读取 /tmp/logs 下面 log 文件。读取出来排版有点丑。而且开头和结尾处有乱码。

花了大量时间再纠结乱码的去除。被折腾的不要不要的。

最后在 任我行的yarn logs -applicationId命令java版本简单实现 看到实现的可能性。

任我行代码

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.PrintStream;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class GetYarnLog {
    public static void main(String[] args) {
        run("application_1535700682133_0496");
    }
    
    public static int run(String appIdStr) throws Throwable{
 
    
         Configuration conf = new YarnConfiguration();
         conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/core-site.xml"));
         conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/yarn-site.xml"));
         conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/hdfs-site.xml"));
         if(appIdStr == null || appIdStr.equals(""))
          {
             System.out.println("appId is null!");
             return -1;
          }
         PrintStream out=new PrintStream(appIdStr); 
         ApplicationId appId = null;
         appId = ConverterUtils.toApplicationId(appIdStr);
         
         Path remoteRootLogDir = new Path(conf.get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));

         String user =  UserGroupInformation.getCurrentUser().getShortUserName();;
         String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
         
         Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user, logDirSuffix);
         RemoteIterator<FileStatus> nodeFiles;
         try
         {
           Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
           nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
         }
         catch (FileNotFoundException fnf)
         {
           logDirNotExist(remoteAppLogDir.toString());
           return -1;
         }
         
         boolean foundAnyLogs = false;
         while (nodeFiles.hasNext())
         {
           FileStatus thisNodeFile = (FileStatus)nodeFiles.next();
           if (!thisNodeFile.getPath().getName().endsWith(".tmp"))
           {
               System.out.println("NodeFileName = "+thisNodeFile.getPath().getName());
             AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
             try
             {
               AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
               DataInputStream valueStream = reader.next(key);
               for (;;)
               {
                 if (valueStream != null)
                 {
                   String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
                   
                   out.println(containerString);
                   out.println(StringUtils.repeat("=", containerString.length()));
                   try
                   {
                     for (;;)
                     {
                       AggregatedLogFormat.LogReader.readAContainerLogsForALogType(valueStream, out, thisNodeFile.getModificationTime());
                       
                       foundAnyLogs = true;
                     }
                         
                   }
                   catch (EOFException eof)
                   {
                     key = new AggregatedLogFormat.LogKey();
                     valueStream = reader.next(key);
                      
                   }
                   
                 }else{
                     break;
                 }
               }
             }
             finally
             {
               reader.close();
             }
           }
         }
         if (!foundAnyLogs)
         {
           emptyLogDir(remoteAppLogDir.toString());
           return -1;
         }
         return 0;
       }
}

代码

package com.jdb.bigdatams.util;

import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @author lihm
 * @date 2019-04-19 14:51
 * @description TODO
 */

@Component
public class YarnLogUtil {

    @Value("${hadooprequest.nn1Address}")
    private String nn1;

    @Value("${hadooprequest.nn2Address}")
    private String nn2;

    @Value("${hadooprequest.nameservices}")
    private String HdfsNameservices;

    public String readLog(String applicationId, String userName) throws Exception {
        Configuration conf=new Configuration(false);
        String nameservices = HdfsNameservices;
        String[] namenodesAddr = {nn1, nn2};
        String[] namenodes = {"nn1","nn2"};
        conf.set("fs.defaultFS", "hdfs://" + nameservices);
        conf.set("dfs.nameservices",nameservices);
        conf.set("dfs.ha.namenodes." + nameservices, namenodes[0]+","+namenodes[1]);
        conf.set("dfs.namenode.rpc-address." + nameservices + "." + namenodes[0], namenodesAddr[0]);
        conf.set("dfs.namenode.rpc-address." + nameservices + "." + namenodes[1], namenodesAddr[1]);
        conf.set("dfs.client.failover.proxy.provider." + nameservices,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        String hdfsRPCUrl = "hdfs://" + nameservices + ":" + 8020;

        ByteArrayOutputStream os = new ByteArrayOutputStream();
        PrintStream out = new PrintStream(os);
        try {
            FileSystem fs = FileSystem.get(new URI(hdfsRPCUrl), conf, userName);
            FileStatus[]  paths = fs.listStatus(new Path("/tmp/logs/" + userName + "/logs/" + applicationId));

            if (paths == null || paths.length==0) {
                throw new FileNotFoundException("Cannot access " + "/tmp/logs/" + userName + "/logs/" + applicationId +
                        ": No such file or directory.");
            }

            long sizeLength = 0;
            for (FileStatus fileStatus : paths) {
                sizeLength += fs.getContentSummary(fileStatus.getPath()).getLength();
            }

            if (sizeLength > 1024 * 1024 * 1024) {
                return "文件大于 1 G,请自行到集群上查看";
            }

            for (int i = 0 ; i < paths.length ; ++i)
            {
                Configuration yarnConfiguration = new YarnConfiguration();
                // yarnConfiguration.addResource(new Path("/Users/tu/Public/ZaWu/conf.cloudera.yarn/core-site.xml"));
                // yarnConfiguration.addResource(new Path("/Users/tu/Public/ZaWu/conf.cloudera.yarn/yarn-site.xml"));
                // yarnConfiguration.addResource(new Path("/Users/tu/Public/ZaWu/conf.cloudera.yarn/hdfs-site.xml"));

                yarnConfiguration.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/core-site.xml"));
                yarnConfiguration.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/yarn-site.xml"));
                yarnConfiguration.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/hdfs-site.xml"));
                AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(yarnConfiguration, paths[i].getPath());
                try {
                    AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
                    DataInputStream valueStream = reader.next(key);
                    for (;;) {
                        if (valueStream != null) {
                            String containerString = "\n\nContainer: " + key + " on " + paths[i].getPath().getName();
                            out.println(containerString);
                            out.println(StringUtils.repeat("=", containerString.length()));

                            try {
                                for (;;) {
                                    AggregatedLogFormat.LogReader.readAContainerLogsForALogType(valueStream, out, paths[i].getModificationTime());

                                }
                            } catch (EOFException eof) {
                                key = new AggregatedLogFormat.LogKey();
                                valueStream = reader.next(key);
                            }
                        } else {
                            break;
                        }
                    }
                } finally {
                    reader.close();
                }
            }

        } catch (FileNotFoundException e) {
            throw e;
        } catch (Exception e) {
            e.printStackTrace();
        }
        out.close();
        return new String(os.toByteArray(), StandardCharsets.UTF_8);
    }
}

遇到的问题

1. 权限问题

一开始遇到权限问题,尝试过在代码中解决,想以 application 的 appOwner 去读取。

没找到突破口。最后授权给运行的账号拥有 /tmp/logs 路径下的读权限解决。

HDFS 开启了 ACL 权限
hadoop fs -setfacl -R -m default:user:hadoop:rwx /tmp/logs

开启 default ACL 的话后续生成的文件。hadoop 都有权限去读。HDFS ACL 权限管理

2. 输出 String

按 任我行的代码。logs 日志一直是输出到本地代码的源路径下。

最后仔细研读代码后发现是跟 PrintStream 打印流有关。

问题就变成如何把 PrintStream 转为 String 了

具体参考 PrintStream 里的内容转为 String


参考链接

Search

    Table of Contents