Laravel使用Elasticsearch存儲日志折騰筆記

  1. 為什么要用Elasticsearch存儲Laravel日志而不是直接使用默認的文件存儲?
    1. 當(dāng)PHP部署在多臺服務(wù)器時,如果需要查找日志則要在每臺服務(wù)器上面進行查找。
    2. 通常日志是按天分割的,如果不確定是哪一天還需要在好幾個文件里面進行查找,然后需要查找的文件數(shù)就變成了不確定的天數(shù)*負載均衡的服務(wù)器數(shù)量。
    3. 在服務(wù)器上面直接通過命令行查詢查找日志內(nèi)容真的不方便。
  2. 開始折騰
    1. 首先得有Elasticsearch服務(wù)器,自己在服務(wù)器上面安裝或者使用第三方提供的服務(wù),我這里直接使用AWS的服務(wù)。
    2. 因為Elasticsearch就是通過標準的RESTful接口進行操作,所以PHP也就不用安裝什么擴展了,但是為了方便使用還是要安裝一個Packagist:
    https://packagist.org/packages/elasticsearch/elasticsearch
    composer require elasticsearch/elasticsearch
    
    如果沒使用過可以看中文文檔:
    https://www.elastic.co/guide/cn/elasticsearch/php/current/index.html
    
  3. 這里我就不安裝Laravel的package,畢竟只是把日志寫到Elasticsearch就行了,所以自己動手寫個簡單的ElasticsearchClient類,里面就只有一個getClient方法:
<?php

/**
 *===================================================
 * Filename:ElasticsearchClient.php
 * Author:f4ck_langzi@foxmail.com
 * Date:2018-06-15 18:31
 *===================================================
 **/

namespace App\Libs;

use Elasticsearch\ClientBuilder;

class ElasticsearchClient
{
    private $client;

    public function __construct()
    {
        $hosts = config('elasticsearch.hosts');
        $this->client = ClientBuilder::create()->setHosts($hosts)->build();
    }

    public function getClient()
    {
        return $this->client;
    }
}

為了能夠配置Elasticsearch相關(guān)信息,我創(chuàng)建了配置文件elasticsearch.php,只有hostslog_name(Index):

<?php
/**
 *===================================================
 * Filename:elasticsearch.php
 * Author:f4ck_langzi@foxmail.com
 * Date:2018-06-15 18:32
 *===================================================
 **/
return [
    'hosts'=>[
        env('ELASTIC_HOST')
    ],
    'log_name'=>env('ELASTIC_LOG_NAME')
];

現(xiàn)在就可以在Laravel中通過(new ElasticsearchClient())->getClient()來獲取到Elasticsearch的Client對象了。

  1. 在頁面的每一次請求中肯定會打印多次日志,如果每次打印日志都要創(chuàng)建Elasticsearch的Client對象就會會消耗一定的時間和性能,為了能夠更加優(yōu)雅的使用Client對象,我創(chuàng)建了一個ElasticsearchClientProvider:
<?php

namespace App\Providers;

use App\Libs\ElasticsearchClient;
use Illuminate\Support\ServiceProvider;

class ElasticsearchClientProvider extends ServiceProvider
{
    protected $defer = true;

    /**
     * Bootstrap the application services.
     *
     * @return void
     */
    public function boot()
    {
        //
    }

    /**
     * Register the application services.
     *
     * @return void
     */
    public function register()
    {
        $this->app->singleton('elasticsearch', function () {
            return new ElasticsearchClient();
        });
    }

    public function provides()
    {
        return ['elasticsearch'];
    }

}

有了這個就不用每次都來new一次,使用的時候通過app('elasticsearch')->getClient();直接從容器中拿出來就即可;其實還可以寫個Facade門臉類來進一步簡化代碼,這里就不去麻煩了。

  1. 以上步驟只是把Elasticsearch集成到了Laravel中,要想把日志直接放到Elasticsearch還需要一些工作。
  2. 接下來修改Laravel默認的Log存儲方式為Elasticsearch,通過網(wǎng)上查詢資料發(fā)現(xiàn)有兩種方式可以修改:
http://www.muyesanren.com/2017/09/15/laravel-how-to-store-logging-with-mongodb/

第一種是在bootstrap/app.phpreturn $app之前修改,第二種是在app/providers/AppServiceProvider.php中修改,我采用更加友好的第二種方式。由于參考的文章使用的是MongoDB來存儲日志,因此他的代碼是這樣的:

$monolog = Log::getMonolog();
$mongoHost = env('MONGO_DB_HOST');
$mongoPort = env('MONGO_DB_PORT');
$mongoDsn = 'mongodb://' . $mongoHost . ':' . $mongoPort;
$mongoHandler = new \Monolog\Handler\MongoDBHandler(new \MongoClient($mongoDsn), 'laravel_project_db', 'logs');
$monolog->pushHandler($mongoHandler);

但是我這里不能使用MongoDBHandler,于是自己動手創(chuàng)建一個ElasticsearchLogHandler繼承Monolog\Handler\AbstractProcessingHandler并實現(xiàn)write(別問我怎么知道需要繼承這個類的,我也是看到MongoDBHandler繼承了這個類才知道的):

<?php

/**
 *===================================================
 * Filename:ElasticsearchLogHandler.php
 * Author:f4ck_langzi@foxmail.com
 * Date:2018-06-15 11:57
 *===================================================
 **/

namespace App\Libs;

use App\Jobs\ElasticsearchLogWrite;
use Monolog\Handler\AbstractProcessingHandler;

class ElasticsearchLogHandler extends AbstractProcessingHandler
{
    protected function write(array $record)
    {
        //只
        if ($record['level'] >= 200)
            dispatch((new ElasticsearchLogWrite($record)));
    }
}

調(diào)試過程中我發(fā)現(xiàn)每次打印日志都會執(zhí)行write方法,于是準備在write函數(shù)里面動手寫【吧日志存儲到elasticsearch的邏輯】,我嘗試了,然后發(fā)現(xiàn)每次打印日志就要寫一次,還是同步的.....,所以我搞了一個Job把這個操作放到隊列中執(zhí)行,就是長這個樣子:

<?php

namespace App\Jobs;

use Elasticsearch\Client;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ElasticsearchLogWrite extends Job implements ShouldQueue
{
    use InteractsWithQueue;

    private $params;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct(array $record)
    {
        unset($record['context']);
        unset($record['extra']);
        $record['datetime']=$record['datetime']->format('Y-m-d H:i:s');
        $this->params = [
            'index' => config('elasticsearch.log_name'),
            'type'  => 'log',
            'body'  => $record,
        ];
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        $client = app('elasticsearch')->getClient();
        if ($client instanceof Client) {
            $client->index($this->params);
        }
    }
}

到目前為止基本是實現(xiàn)功能了,這個時候每條日志會同時寫進文件和Elasticsearch,如果你不希望日志還寫進文件中可以在

$monolog->pushHandler(
    new ElasticsearchLogHandler()
);

之前使用$monolog->popHandler();把默認的文件存儲去掉。


上面最然是實現(xiàn)了把日志寫進Elasticsearch中,但是每條日志都要寫一次,就算是放到隊列里面當(dāng)日志量比較大的時候也是可能把redis撐爆的。那么有沒有什么辦法可以在每次請求結(jié)束的時候一次性寫入到Elasticsearch呢?答案肯定是有的,因為我發(fā)現(xiàn)了\vendor\monolog\monolog\src\Monolog\Handler\ElasticSearchHandler.php這個文件,原來Monolog已經(jīng)自帶了把日志寫入到Elasticsearch的功能,我之前居然都沒有去找找....
代碼如下:

<?php

/*
 * This file is part of the Monolog package.
 *
 * (c) Jordi Boggiano <j.boggiano@seld.be>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Monolog\Handler;

use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\ElasticaFormatter;
use Monolog\Logger;
use Elastica\Client;
use Elastica\Exception\ExceptionInterface;

/**
 * Elastic Search handler
 *
 * Usage example:
 *
 *    $client = new \Elastica\Client();
 *    $options = array(
 *        'index' => 'elastic_index_name',
 *        'type' => 'elastic_doc_type',
 *    );
 *    $handler = new ElasticSearchHandler($client, $options);
 *    $log = new Logger('application');
 *    $log->pushHandler($handler);
 *
 * @author Jelle Vink <jelle.vink@gmail.com>
 */
class ElasticSearchHandler extends AbstractProcessingHandler
{
    /**
     * @var Client
     */
    protected $client;

    /**
     * @var array Handler config options
     */
    protected $options = array();

    /**
     * @param Client  $client  Elastica Client object
     * @param array   $options Handler configuration
     * @param int     $level   The minimum logging level at which this handler will be triggered
     * @param Boolean $bubble  Whether the messages that are handled can bubble up the stack or not
     */
    public function __construct(Client $client, array $options = array(), $level = Logger::DEBUG, $bubble = true)
    {
        parent::__construct($level, $bubble);
        $this->client = $client;
        $this->options = array_merge(
            array(
                'index'          => 'monolog',      // Elastic index name
                'type'           => 'record',       // Elastic document type
                'ignore_error'   => false,          // Suppress Elastica exceptions
            ),
            $options
        );
    }

    /**
     * {@inheritDoc}
     */
    protected function write(array $record)
    {
        $this->bulkSend(array($record['formatted']));
    }

    /**
     * {@inheritdoc}
     */
    public function setFormatter(FormatterInterface $formatter)
    {
        if ($formatter instanceof ElasticaFormatter) {
            return parent::setFormatter($formatter);
        }
        throw new \InvalidArgumentException('ElasticSearchHandler is only compatible with ElasticaFormatter');
    }

    /**
     * Getter options
     * @return array
     */
    public function getOptions()
    {
        return $this->options;
    }

    /**
     * {@inheritDoc}
     */
    protected function getDefaultFormatter()
    {
        return new ElasticaFormatter($this->options['index'], $this->options['type']);
    }

    /**
     * {@inheritdoc}
     */
    public function handleBatch(array $records)
    {
        $documents = $this->getFormatter()->formatBatch($records);
        $this->bulkSend($documents);
    }

    /**
     * Use Elasticsearch bulk API to send list of documents
     * @param  array             $documents
     * @throws \RuntimeException
     */
    protected function bulkSend(array $documents)
    {
        try {
            $this->client->addDocuments($documents);
        } catch (ExceptionInterface $e) {
            if (!$this->options['ignore_error']) {
                throw new \RuntimeException("Error sending messages to Elasticsearch", 0, $e);
            }
        }
    }
}

但是很明顯我們是不能直接拿來就使用的,因為它使用的Client并不是Elasticsearch\Client而是Elastica\Client,可是我只安裝了前者...,那么現(xiàn)在怎么辦呢?

  • 目前有兩個解決辦法:
    1. 自己動手改造ElasticSearchHandler,太麻煩。
    2. 再安裝一個包咯。
      采用第二種方法:
https://packagist.org/packages/ruflin/elastica
composer require ruflin/elastica
http://elastica.io/getting-started/

然后在AppServiceProvider中直接:

$client = new \Elastica\Client(['host'=>'127.0.0.1','port'=>9200]);
$options = [
    'index' => 'dating-logs-new',
    'type'  => 'log',
];
$handler = new ElasticSearchHandler($client, $options,200);
$log = Log::getMonolog();
$log->pushHandler($handler);

本來超級簡單的東西被我搞得這么復(fù)雜。


呸呸呸,我還以為自帶的ElasticSearchHandler是在每次請求結(jié)束的時候一次性把日志寫進去的,我看了下源碼發(fā)現(xiàn)還是每條日志都請求了網(wǎng)絡(luò)。
現(xiàn)在還得想辦法實現(xiàn)每次請求結(jié)束后統(tǒng)一寫入日志的功能。自己動手,豐衣足食,既然要改造肯定就不能用自帶的ElasticSearchHandler了,就在之前的ElasticsearchLogHandler動手吧。

  • 我們要實現(xiàn)的功能是在每次請求結(jié)束時批量寫入日志到Elasticsearch中去,所以需要準備兩個條件:
    1. 代碼無論頁面是否報錯都會執(zhí)行。這個我所知道的就是在中間件中完成:https://laravel-china.org/docs/laravel/5.5/middleware/1294#terminable-middleware
    2. 要有批量寫入的接口。這個條件比較簡單實現(xiàn),上面的兩個擴展包都有這個功能,由于之前一直用的elasticsearch/elasticsearch擴展,我們就還是使用這個,文檔:https://www.elastic.co/guide/cn/elasticsearch/php/current/_indexing_documents.html
  1. 首先來創(chuàng)建一個中間件
php artisan make:middleware ElasticsearchBulkWrite

添加一個terminate方法

<?php

namespace App\Http\Middleware;

use Closure;

class ElasticsearchBulkWrite
{
    /**
     * Handle an incoming request.
     *
     * @param  \Illuminate\Http\Request  $request
     * @param  \Closure  $next
     * @return mixed
     */
    public function handle($request, Closure $next)
    {
        return $next($request);
    }

    public function terminate($request, $response)
    {

    }
}

還要添加到app/Http/Kernel.php文件的全局中間件中

/**
 * The application's global HTTP middleware stack.
 *
 * These middleware are run during every request to your application.
 *
 * @var array
 */
protected $middleware = [
    \Illuminate\Foundation\Http\Middleware\CheckForMaintenanceMode::class,
    ElasticsearchBulkWrite::class,
];

等會兒我們直接在terminate函數(shù)中寫批量寫入到Elasticsearch的方法即可。

  1. 改造日志寫入流程。
    之前的流程是:


    原先流程圖
    st=>start: 產(chǎn)生日志
    op1=>operation: My Operation
    A(產(chǎn)生日志) -->B(ElasticsearchLogHandler的write方法分發(fā)隊列)
    B -->C(Job任務(wù)隊列執(zhí)行寫入)
    e=>end:

改造之后的流程:


改造后的流程圖
graph TD
    A(等待產(chǎn)生日志) -->B(ElasticsearchLogHandler的write方法暫存文日志到ElasticsearchClient的$documents屬性中)
    B -->C{請求是否結(jié)束}
    C -->|否| A
    C -->|是| D(請求結(jié)束后中間件terminate拿出之前暫存的日志一次性分發(fā)到隊列中)
    D -->F(Job任務(wù)隊列批量寫入)
  • 要實現(xiàn)后面的流程則要對ElasticsearchLogHandler,ElasticsearchClientApp\Jobs\ElasticsearchLogWrite三個文件進行改造:
    1. ElasticsearchLogHandler只實現(xiàn)添加日志的功能:
    <?php
    /**
     *===================================================
     * Filename:ElasticsearchLogHandler.php
     * Author:f4ck_langzi@foxmail.com
     * Date:2018-06-15 11:57
     *===================================================
     **/
    
    namespace App\Libs;
    
    use Monolog\Handler\AbstractProcessingHandler;
    
    class ElasticsearchLogHandler extends AbstractProcessingHandler
    {
        protected function write(array $record)
        {
            if ($record['level'] >= 200)
                app('elasticsearch')->addDocument($record);
        }
    }
    
    1. ElasticsearchClient實現(xiàn)添加文檔和獲取所有已添加日志的功能:
    <?php
    
    /**
     *===================================================
     * Filename:ElasticsearchClient.php
     * Author:f4ck_langzi@foxmail.com
     * Date:2018-06-15 18:31
     *===================================================
     **/
    
    namespace App\Libs;
    
    use Elasticsearch\ClientBuilder;
    
    class ElasticsearchClient
    {
        protected $client;
    
        protected $documents = [];
    
        public function __construct()
        {
            $hosts = config('elasticsearch.hosts');
            $this->client = ClientBuilder::create()->setHosts($hosts)->build();
        }
    
        public function getClient()
        {
            return $this->client;
        }
    
        /**
         * @function Name addDocument
         * @description 添加日志
         * @param array $document
         */
        public function addDocument(array $document)
        {
            $this->documents[] = $document;
        }
    
        /**
         * @function Name getDocuments
         * @description 獲取所有已添加日志
         * @return mixed
         */
        public function getDocuments()
        {
            return $this->documents;
        }
    }
    
    1. 實現(xiàn)批量寫入到Elasticsearch功能:
    <?php
    
    namespace App\Jobs;
    
    use Elasticsearch\Client;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Contracts\Queue\ShouldQueue;
    
    class ElasticsearchLogWrite extends Job implements ShouldQueue
    {
        use InteractsWithQueue;
    
        private $params;
    
        /**
         * Create a new job instance.
         *
         * @return void
         */
        public function __construct(array $records)
        {
            $this->params['body'] = [];
            //A good start are 500 documents per bulk operation. Depending on the size of your documents you’ve to play around a little how many documents are a good number for your application.
            foreach ($records as $record) {
                unset($record['context']);
                unset($record['extra']);
                $record['datetime'] = $record['datetime']->format('Y-m-d H:i:s');
                $this->params['body'][] = [
                    'index' => [
                        '_index' => config('elasticsearch.log_name'),
                        '_type'  => 'log',
                    ],
                ];
                $this->params['body'][] = $record;
            }
        }
    
        /**
         * Execute the job.
         *
         * @return void
         */
        public function handle()
        {
            $client = app('elasticsearch')->getClient();
            if ($client instanceof Client) {
                $client->bulk($this->params);
            }
        }
    }
    
    
    1. 最后在中間件中加入分發(fā)代碼即可
    public function terminate($request, $response)
    {
        $documents = app('elasticsearch')->getDocuments();
        //需要判斷是否有日志
        if (count($documents) > 0)
            dispatch(new ElasticsearchLogWrite($documents));
    }
    

這樣基本上就是實現(xiàn)功能啦。當(dāng)然其中還有很多細節(jié)是需要去完善的,這里只是記錄了整個折騰過程,看起來可能會比較亂。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容