C#エンジニアのためのBigQuery入門(2)
BigQuery API(Client Library)を用いたクエリ実行
BigQueryをより便利に使うために、C#プログラムからAPIを利用する方法を紹介する。
BigQueryをC#プログラムから実行しよう
本連載第1回目ではBigQueryの概要を紹介した。BigQueryには使いやすいWebコンソール画面が用意されており、まず使ってみる分にはWebコンソールで十二分である。しかし、例えば定期的に実行してレポートを作成・送信したい場合や、取得したデータに対してさらに複雑な処理を行いたい場合もある。そのために、BigQueryを含めてGoogleサービスにはAPIが用意されており、その認証方法について前回説明した。
そこで今回は、BigQueryをC#プログラムから利用し、クエリを実行する方法を説明したい。なお今回は、Client Library for .NETのAPIをそのまま利用する方法を紹介する。C#に不可欠といってもよいLINQとBigQueryをつなげたLINQ to BigQueryを次回紹介する予定である。また、解析するためのデータをC#プログラムから投入する方法はその後の回で紹介したい。
クエリをAPIから実行してみよう
プロジェクトの作成と認証の準備
最初にクエリを実行するために、Google Developer Consoleにログインし、プロジェクトを作成する。すでに作成しているプロジェクトがあればそれを利用してもよい。認証のためのクライアントIDを作成するために、作成したプロジェクトの[API と認証]メニューから[同意画面]を開き、[サービス名]を入力して保存する。その後、[API と認証]の[認証情報]を開き、[新しいクライアント ID を作成]ボタンをクリックし、「サービス アカウント」およびキーのタイプで「P12 キー」を選択して、クライアントIDを作成する。作成するとP12ファイルがダウンロードされるとともに、そのパスワードが表示されるため記録しておく。詳細な手順は前回の番外編の記事も参考にしてほしい。
準備ができたところで、連載第1回でも紹介した、publicに公開されているサンプルのデータセットに対するクエリをC#から実行してみよう。
シンプルなクエリ実行
BigQueryのWebコンソールでクエリを実行したときと同じことをC#からやってみよう。まず、コンソールプロジェクトを作成し、NuGetからリスト1のコマンドを実行してBigQueryのクライアントライブラリを取得する。
Install-Package Google.Apis.Bigquery.v2
|
※本稿では執筆時点での最新版である1.9.0.2160を利用した。
次に、認証を行い、API実行の起点となるBigQueryService
クラスのインスタンスを生成するが、これは下記のようなコードになる。
using Google.Apis.Auth.OAuth2;
using Google.Apis.Bigquery.v2;
using Google.Apis.Services;
using System.Security.Cryptography.X509Certificates;
……省略……
static BigqueryService CreateClient()
{
var certificate = new X509Certificate2(@"p12ファイルのパス", "p12ファイルのパスワード", X509KeyStorageFlags.Exportable);
var credential = new ServiceAccountCredential(new ServiceAccountCredential.Initializer("サービスアカウントのメールアドレス")
{
Scopes = new[]
{
BigqueryService.Scope.Bigquery
}
}.FromCertificate(certificate));
return new BigqueryService(new BaseClientService.Initializer
{
ApplicationName = "API Sample",
HttpClientInitializer = credential
});
}
|
P12ファイルのパスとパスワード、およびサービスアカウントのメールアドレス(=先ほどの[認証情報]のページで取得できる)は各自の環境に合わせてほしい。
クエリを実行するには、Jobs: query APIを実行する(リスト3)。
using System;
using System.Threading.Tasks;
using Google;
using Google.Apis.Bigquery.v2.Data;
……省略……
static async Task<QueryResponse> Query(BigqueryService bigquery, string projectId, string query)
{
try
{
return await bigquery.Jobs.Query(new QueryRequest
{
Query = query
}, projectId).ExecuteAsync();
}
catch (GoogleApiException e)
{
Console.WriteLine(e);
throw;
}
}
|
実行するとQueryResponse
型の結果が取得できるので、最後に結果を表示してみよう(リスト4)。
using System.Linq;
……省略……
static void DisplayResult(QueryResponse res)
{
Console.WriteLine("Query Result:");
Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
foreach (var row in res.Rows)
{
Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
}
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
Console.WriteLine("TotalRows: {0}", res.TotalRows);
}
|
クエリの結果のスキーマや結果そのものを表示するだけではなく、スキャンしたバイト数と結果の行数も表示している。BigQueryでは、スキャンしたサイズに応じて課金されるので適宜確認できるようにしておくのがよい。
クエリを指定して、これらのメソッドを実行するコードは次のようになる。
static async Task ExecuteAsync()
{
try
{
var bigquery = CreateClient();
var projectId = "<Google Developer Consoleで作成したプロジェクトID>";
var res = await Query(bigquery, projectId, "SELECT TOP(word, 50), COUNT(*) FROM publicdata:samples.shakespeare");
DisplayResult(res);
}
catch (Exception)
{
}
Console.ReadLine(); // Enterキーを入力すると終了
}
|
※プロジェクトIDはGoogle Developer Consoleのトップページなどで確認できる。
このコードを実行すると、図1のようにクエリの結果が表示されるはずだ。
ちなみに上記のコードを実行するには、ProgramクラスのMainメソッド内にExecuteAsync.Wait();
を書き加えることで、上記のExecuteAsync
メソッドを呼び出せばよい。
DryRun
このコードではいきなりクエリを実行するが、Webコンソール同様、事前にクエリのエラーやスキャンするサイズを確認したいこともあるだろう。そのときは、DryRun
オプション(=実際にはジョブを実行せずに、基本的に空のデータを処理統計情報付きで返すモード)を有効にするとよい。
先ほど(リスト3)のQuery
メソッドに、DryRun用の引数を追加して指定してみよう(リスト6)。
static async Task<QueryResponse> Query(BigqueryService bigquery, string projectId, string query, bool dryrun = false)
{
try
{
return await bigquery.Jobs.Query(new QueryRequest
{
Query = query,
DryRun = dryrun
}, projectId).ExecuteAsync();
}
catch (GoogleApiException e)
{
Console.WriteLine(e);
throw;
}
}
|
DryRun
オプションの指定にかかわらず、クエリに文法エラーがあるとGoogleApiException
がスローされる。文法エラーがない場合、DryRun
オプションが指定されていると、レスポンスの中身が変わる。その中身を確認するために、リスト4のDisplayResult
メソッドを次のように修正してみよう。
static void DisplayResult(QueryResponse res)
{
if (res.TotalRows.HasValue)
{
Console.WriteLine("Query Result:");
Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
foreach (var row in res.Rows)
{
Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
}
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
Console.WriteLine("TotalRows: {0}", res.TotalRows);
}
else
{
Console.WriteLine("DryRun Result:");
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
}
}
|
ドキュメント(英語)には明示されていないが、DryRun
オプションがtrueに指定されていた場合、QueryResponse
オブジェクトのTotalRows
プロパティの値が「null」になること(=HasValue
)を利用して条件分岐している。
クエリキャッシュ
BigQueryはパフォーマンス向上のため、結果が変わらない場合はベストエフォートでキャッシュを利用してレスポンスを返すようにしている。この機能はデフォルトで有効になっているが、使わないように指定することもできる。次のコードのように、UseQueryCache
オプションを指定することで制御でき、実行した結果がキャッシュを使ったものかどうかはCacheHit
プロパティで確認できる。
static async Task<QueryResponse> Query(BigqueryService bigquery, string projectId, string query, bool dryrun = false, bool useQueryCache = true)
{
try
{
return await bigquery.Jobs.Query(new QueryRequest
{
Query = query,
DryRun = dryrun,
UseQueryCache = useQueryCache
}, projectId).ExecuteAsync();
}
catch (GoogleApiException e)
{
Console.WriteLine(e);
throw;
}
}
static void DisplayResult(QueryResponse res)
{
if (res.TotalRows.HasValue)
{
Console.WriteLine("Query Result:");
Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
foreach (var row in res.Rows)
{
Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
}
Console.WriteLine(res.CacheHit.HasValue && res.CacheHit.Value ? "UseCache" : "NoCache");
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
Console.WriteLine("TotalRows: {0}", res.TotalRows);
}
else
{
Console.WriteLine("DryRun Result:");
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
}
}
|
非同期実行
ここまで使ってきた、Jobs: query APIはGoogleサービスとしては同期的に実行するAPIであった(C#コードではasync
/await
で結果が返ってくるのを非同期に待機しているが、Googleサービス側ではAPIリクエストを受けて、結果をそのレスポンスで返している)。BigQueryのAPIには、より長い時間がかかるクエリの実行を想定して、非同期的に問い合わせる仕組みが備わっている。その流れは次の通りである。
- Jobs: insert APIで、クエリを実行するジョブを開始する
- Jobs: get APIで、ジョブの状態が完了になるまで待機する
- Jobs: getQueryResults APIで、ジョブとして実行したクエリの結果を取得する
以下に示すのが、一連の流れをコードにしたものである。
static async Task ExecuteAsync()
{
try
{
var bigquery = CreateClient();
var projectId = "<プロジェクトID>";
var job =
await StartQuery(bigquery, projectId,
"SELECT TOP(word, 50), COUNT(*) FROM publicdata:samples.shakespeare");
var queryRes = await WaitForJobFinish(bigquery, job.JobReference);
DisplayResult(queryRes);
}
catch (Exception)
{
}
Console.ReadLine(); // Enterキーを入力すると終了
}
static Task<Job> StartQuery(BigqueryService bigquery, string projectId, string query)
{
return bigquery.Jobs.Insert(new Job
{
Configuration = new JobConfiguration
{
Query = new JobConfigurationQuery
{
Query = query
}
}
}, projectId).ExecuteAsync();
}
static async Task<GetQueryResultsResponse> WaitForJobFinish(BigqueryService bigquery, JobReference job)
{
while (true)
{
Console.Write(".");
var res = await bigquery.Jobs.Get(job.ProjectId, job.JobId).ExecuteAsync();
if (res.Status.State != "RUNNING" && res.Status.State != "PENDING")
{
Console.WriteLine();
var error = res.Status.ErrorResult;
if (error != null)
{
Console.WriteLine(error.Message);
throw new Exception(error.Message);
}
else
{
// errorsは警告含む
var errors = res.Status.Errors;
if (errors != null)
{
foreach (var errorProto in errors)
{
Console.WriteLine(errorProto);
}
}
return await bigquery.Jobs.GetQueryResults(job.ProjectId, job.JobId).ExecuteAsync();
}
}
await Task.Delay(TimeSpan.FromSeconds(3));
}
}
static void DisplayResult(GetQueryResultsResponse res)
{
if (res.TotalRows.HasValue)
{
Console.WriteLine("Query Result:");
Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
foreach (var row in res.Rows)
{
Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
}
Console.WriteLine(res.CacheHit.HasValue && res.CacheHit.Value ? "UseCache" : "NoCache");
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
Console.WriteLine("TotalRows: {0}", res.TotalRows);
}
else
{
Console.WriteLine("DryRun Result:");
Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
}
}
|
リスト5で記述したstatic async Task ExecuteAsync()
メソッドは、後述の「同期実行でのタイムアウト」の説明で再度利用するので、ここでは/*
~*/
でコメントアウトしておいてほしい。
StartQuery
メソッドでジョブとしてクエリを開始した後、WaitForJobFinish
メソッドでジョブが完了するまで待機している。より正確には、Status
がPENDING(開始前)もしくはRUNNING(実行中)以外の状態になるまで3秒ごとにチェックしながら待機している。
ジョブが完了した後、ErrorResult
プロパティ値がnullでない場合はエラーで終了しているため、エラー情報を取得している。また、エラーではないが、警告が存在しているケースもあり得るため、Errors
プロパティ値もチェックしている。
最後に、Jobs: getQueryResult API(=bigquery.Jobs.GetQueryResults
メソッド)でクエリの結果を取得し、表示している。
DisplayResult
メソッドは、リスト4で示したメソッドと引数の型が違うが、表示する情報は同じ名前のプロパティであるため、Jobs: query APIを使った場合と同じコードになっている。
なお、このコードではWaitForJobFinish
メソッドにタイムアウトの機能はないため、ジョブが完了しない限りこのメソッドは完了しない。実践で活用する場合には、適宜、タイムアウト処理を入れるのがよいだろう。ついでに、前半で説明した同期実行の場合のタイムアウト処理について、次に簡単に説明しておこう。
同期実行でのタイムアウト
上記のようにジョブを使って非同期実行する場合はタイムアウトがないが、Jobs: query APIを使って同期実行する場合、デフォルトで10秒のタイムアウト制限がある。これは、TimeoutMs
プロパティを指定することで、長くもしくは短くすることができる。
指定した時間が経過してタイムアウトになった場合、先ほどの非同期実行のところで紹介したJobs: getQueryResult APIを使うことで、ジョブの状態および、ジョブが完了するまで待機した場合のクエリの結果を取得できる。具体的には、次のコードのように、QueryResponse
オブジェクトのJobComplete
プロパティをチェックし、trueでなければジョブが完了するまで待機するようにすればよい。
static async Task ExecuteAsync()
{
try
{
var bigquery = CreateClient();
var projectId = "<プロジェクトID>";
var res = await Query(bigquery, projectId, "SELECT TOP(word, 50), COUNT(*) FROM publicdata:samples.shakespeare", false, false);
if (res.JobComplete == true)
{
DisplayResult(res);
}
else
{
var job = await WaitForJobFinish(bigquery, res.JobReference);
DisplayResult(job);
}
}
catch (Exception)
{
}
}
|
リスト9でコメントアウトしていたコードを復活させてこのコードに書き換える。逆にリスト9の方のstatic async Task ExecuteAsync()
メソッドをコメントアウトする。
脱線したが、再び非同期実行の説明に戻ろう。
バッチモードでの実行
BigQueryにはクエリの同時実行数の上限が決められている。これを回避するために、ジョブの優先度をバッチモードに変更する方法がある。
デフォルトで指定されるインタラクティブモードが可能な限りすぐにジョブを実行するのに対し、バッチモードではジョブをキュー(待ち行列)にためておき、BigQueryの空きリソースが出たタイミングで実行する。そのため、同時実行数の制限は受けない。ただし、日次の実行上限数の制限は受けることに注意してほしい。
バッチモードでジョブを実行するためには、JobConfigurationQuery
オブジェクトのPriority
プロパティでBATCHと指定すればよい。その後のジョブの完了の待機と結果の取得・表示は、インタラクティブモードのときと同じである。
static Task<Job> StartQuery(BigqueryService bigquery, string projectId, string query, bool isBatch = false)
{
return bigquery.Jobs.Insert(new Job
{
Configuration = new JobConfiguration
{
Query = new JobConfigurationQuery
{
Query = query,
Priority = isBatch ? "BATCH" : "INTERACTIVE"
}
}
}, projectId).ExecuteAsync();
}
|
再び、リスト9のstatic async Task ExecuteAsync()
メソッドの方を復活させてから、このコードを記述・実行する。
クエリ結果をテーブルとして保存する
クエリの結果に対し、さらに複数のクエリを実行する場合などでは、クエリの結果をいったんBigQueryの独自テーブルに保存しておくのが便利なときがある(正確には、ここまで出てきたクエリも24時間で消える一時テーブルに保存されているが、「削除しない限り、永続的にテーブルに保存する」という意味である)。また、BigQueryの場合、クエリの出力サイズに制限があるが、テーブルに保存する場合はこの制限を受けない。
なお、ここから先のデータセットの作成およびクエリ結果をテーブルとして保存する処理はGoogle Cloud Platformへの課金を有効にしていないと実行できない。Google Developer Consoleの請求先アカウントページで新しい請求先を登録の上、Developer Console右上の[設定]アイコンから[プロジェクト課金設定]を開き、プロジェクトの課金設定を有効にしてほしい(図2)。以降の処理は課金対象となるため、実行には注意だが筆者の環境では10回ほど実行したが、無償の範囲内に収まっている。Google Cloud Platformの無料トライアルも本稿執筆時点では行われているので活用してほしい。
[課金を有効にする]ボタンを押すと、そのプロジェクトに対して課金が有効になる。
クエリ結果をテーブルとして保存する場合はJobConfigurationQuery
オブジェクトのDestinationTable
に保存するテーブルを指定する。このとき、プロジェクトおよびデータセットは存在している必要がある(ちなみに、後述のコードで必要となるDataset IDは第1回で示している図9のページから取得できる。テーブルはクエリ実行時に作成される)。また、AllowLargeResults
プロパティをtrueにすることでクエリの上限サイズの制限を超えられるが、クエリ構文に一部制限が入る。例えば、ここまでのサンプルで使っていたTOP
関数は使用できないので、GROUP BY
とCOUNT
を使ったクエリにする必要がある(リスト12)。
static async Task ExecuteAsync()
{
try
{
var bigquery = CreateClient();
var projectId = "<プロジェクトID>";
var job =
await StartQueryToTable(bigquery, projectId,
"SELECT word, COUNT(*)as count FROM publicdata:samples.shakespeare GROUP BY word▲");
var queryRes = await WaitForJobFinish(bigquery, job.JobReference);
DisplayResult(queryRes);
}
catch (Exception)
{
}
Console.ReadLine(); // Enterキーを入力すると終了
}
static Task<Job> StartQueryToTable(BigqueryService bigquery, string projectId, string query, string datasetId, string tableId, bool isBatch = false)
{
return bigquery.Jobs.Insert(new Job
{
Configuration = new JobConfiguration
{
Query = new JobConfigurationQuery
{
Query = query,
Priority = isBatch ? "BATCH" : "INTERACTIVE",
AllowLargeResults = true,
DestinationTable = new TableReference
{
ProjectId = projectId,
DatasetId = datasetId,
TableId = tableId
}
}
}
}, projectId).ExecuteAsync();
}
|
■
ここまでC#コードからBigQueryのクエリを実行し、結果を取得する方法を紹介してきた。しかし、取得した結果をC#コードで処理することを考えたとき、今のままでは1レコードがobject
の配列となっており、非常に使い勝手が悪い。特に、C#でデータ処理をするのであれば、LINQ to Objectsの活用を検討したいものである。そこで、次回はLINQの使い勝手のままにBigQueryのクエリが利用できる、LINQ to BigQueryを紹介したい。
※以下では、本稿の前後を合わせて5回分(第1回~第5回)のみ表示しています。
連載の全タイトルを参照するには、[この記事の連載 INDEX]を参照してください。
1. 誰でも簡単に超高速なクエリができるBigQueryとは?
知らないと損! 使わないと損! これからのデータ解析に必須のBigQueryの概要を紹介。また、Webコンソールからのクエリ実行の基礎を解説する。
2. Google API Client Library for .NETの使い方
BigQueryをはじめ、GoogleのほとんどのサービスはAPIが提供されている。これを.NETから利用するためのライブラリの基本的な使用方法を解説する。
5. LINQでBigQuery: データスキャン量を抑えたクエリの実行方法
膨大なデータへのクエリで、スキャン量を減らしてクエリの課金額を抑えるには? テーブルワイルドカード関数とテーブルデコレーターを説明する。