如何使用 Delphi RestClient 读取流数据?

OANDA网站提供的REST API可以读取账户信息("https://api-fxtrade.oanda.com/v1/accounts),以及提供流报价(streaming,比如"https://stream-fxtrade.oanda.com/v1/prices?accountId=12345&instruments=AUD_CAD%2CAUD_CHF" )。

目前在XE7版本,使用Delphi提供的RestClient组件可以实现读取账户信息。但是,对于服务端的流数据,由于其传输方式是 chunked transfer encoding,按照账户数据读取方式则会在停在最后Excute无法获得返回的Json字符串。

代码如下:

 function GetStreamingPrices:TQJson 
var 
  ....
begin
      Result := TQJson.Create;
            // Get
     _RESTRequest.Method := rmGET;
        //指定资源位置:'/prices'
     _RESTRequest.Resource := TRequestSourceURITemplate.Prices;
         //生成query串:accountId=12345&instruments=AUD_CAD%2CAUD_CHF 
     _RESTRequest.AddParameter(TQueryPairKey.AccountID,accountlist[0],pkGETorPOST);
     _RESTRequest.AddParameter(TQueryPairKey.Instruments,'EUR_USD',pkGETorPOST);
         //URL 
     ._RestClient.BaseURL := 'https://stream-fxtrade.oanda.com/v1';
        //执行
     _RestRequest.Execute;
        //解析为Json
     Result.tryParse(_RESTResponse);
end;

so,请教,如何使用Delphi XE7 的RestClient 获取 chunked transfer encoding 的流数据?请不吝赐教。


多谢一楼 @caozhy 提供的信息,不过没有没有解决此问题。我再描述一下:
1、上面的代码用于正常的request-response过程,是没有问题的。
2、 问题出在,报价流的获取上。报价服务器在工作时段实际是持续传输,一直没有结束,采取chunked transfer encoding模式持续地向外传输当时的即时价格信息。使用上面代码,就会处在一直等待状态下。

因此,需要达到的目标就是:一,不能等到结束标志才去解析收到的字符串。二,即便间歇获取数据,如果中间的缓冲时间过长,也不能忍受。

用TIdeventstream方法效率更高:

ResponseEventStrem:TIdEventStream;

ResponseEventStrem.OnWrite := IdEventStreamWrite;

//这个自定义的过程替代了stream的write函数。
procedure  TRatesStreamWorker.IdEventStreamWrite(const ABuffer: TIdBytes; AOffset: Integer; ACount: Integer; var VResult: Integer);

var
AJson : TQJson;
begin
AJson := TQJson.Create;
AJson.TryParse(BytesToString(ABuffer,Aoffset,ACount,enUTF8)); //buffer是收到的http的content
if AJson.Count >0 then Jsons.Add(AJson);
VResult := ACount; //已经写入流的字节数,这里直接返回为count
end;

http://www.cnblogs.com/xalion/p/3370459.html
http://www.ithao123.cn/content-3462426.html

花了一点事件搜索资料分析,解决了这个问题
1、关于chunked encoding transfer 模式:数据包虽然没有contentlength,但是有数据包长度信息的,位置在content的最前面。数据包的格式的详情chunked encoding transfer的包格式

2、在stackoverflow网站,有一篇2013年的贴子,也提到了相似的需求。有的建议是用从socket做起,作者最终采取了修改idhttp的iohandler中的readbytes和readstream两个函数,并使用继承封装自己的类来实现。

3、我跟踪idhttp实现发现,indy在处理chunked encoding 数据包的处理是每个包读取,并放入stream中,这个过程是没有问题的,并不存在readbyte函数要缓冲所有的数据包之后才送出给stream的问题,因此上面修改indy函数是没有必要的。

4、问题出在,对于报价流类型的数据,虽然每个数据包有长度,但是总长度是无限的,而indy在取数据时 ,总是要等待整个数据包全部传输完成,也就是最后的0长度的chunked包作为结束的标志,才会返回。

5、在RESTClient组件上,理论上RestReponse组件应该有一个Tstream类型的变量,用于存储从RestClient得到的网络包。不过我没有找到,因此采取indyhttp 组件来实现对chunked encoding transfer的读取。

6、实现的逻辑是,正常获取idhttp的流数据,通过idhttp的onwork事件触发读取操作,从stream中取出刚刚收到的数据包。stream中的数据读取完成后清空,idhttp继续向其填充数据,因此无需担心无限长度情况下的溢出。

7、这篇文章提到了indhttp接受无限长度流数据的更多细节。解决方法看起来更好: 使用定制write方法的Tstream。或者使用TIdstream,这个类型自带了一个onwrite event可以用于输出。

我的代码如下

//在idhttp的组件event上,添加事件关联。每次数据包进来这个事件就会触发
IdHTTP.OnWork :=  IdHTTPWork;



rocedure  TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
var
  s:string;
  AJson :TQJson;
begin
     AJson := TQJson.Create;//用于解析包内容的对象
     ResponseStringStream.Position :=0;
     s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;
     ResponseStringStream.Clear;
     AJson.TryParse(s);
     if AJson.Count >0 then jsons.Add(Ajson);
end;


procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject);
var
 AJson :TQJson;
 begin
   AJson := TQJson.Create;
  //这是OTL并行组件提供的函数,用于异步开一个新的Thread,在后台持续读取价格流
  Parallel.async(
    procedure (const task: IOmniTask)
    var
      source:string;
    begin
      source := RatesWorker.RatesURL+'EUR_USD';  //资源请求字符串
      RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream);  //请求数据
    end);

   Parallel.Async( //再开一个新的thread用于从stream取数据,避免阻塞主线程。循环方式比较笨,测试先这样了,以后改成消息触发
   procedure
   begin
       while true do
        begin
           if RatesWorker.Jsons.Count >0 then
             begin
               AJson:= RatesWorker.Jsons.Extract(RatesWorker.Jsons.First); //取一条记录
               Memo1.Lines.Add(AJson.ToString);  //显示到memo。子线程访问主线程不妥,以后再改了
           end;
         end;
     end);

   end;
.....
`